JUC 并发容器与工具

发布时间:2026/7/5 2:12:06
JUC 并发容器与工具 JUC — — java.util.concurrentjava的并发容器与工具接下来列出一些工作中高频使用的不列冷门类。一、并发CollectionConcurrentHashMap数组 链表 红黑树可以看出底层数据结构和HashMap是一致的。初始容量16每个数据发生数据冲突后挂在数据后面形成链表。挂的多了链表长度 ≥ 8且数组长度 ≥ 64时链表转换为红黑树增加查询效率 O(log n)两者数据结构基本一样但是并发安全的处理上天差地别。以jdk 8为准// HashMap整个数组加锁 synchronized (map) { map.put(key, value); // 其他线程都不能put和get } // ConcurrentHashMap只锁当前桶 final V putVal(K key, V value, boolean onlyIfAbsent) { // 桶为空 → CAS 无锁插入 if (tabAt(tab, i) null) { casTabAt(tab, i, null, newNode); // 无锁 } else { // 桶非空 → 锁住桶的头节点 synchronized (f) { // 在链表或红黑树中插入 } } }操作HashMapConcurrentHashMap写操作put全表锁整个数组锁单个桶链表头或红黑树根节点读操作get无锁但线程不安全无锁volatile 保证可见性HashMap是一间只有一个门的房间一次只能进一个人ConcurrentHashMap是一间有无数个隔间的仓库每个人可以进自己的隔间互不打扰。使用案例本地缓存场景缓存从数据库查询的商品信息避免频繁查库。public class ProductCache { // 商品ID - 商品信息的映射 private final ConcurrentHashMapLong, Product cache new ConcurrentHashMap(); public Product getProduct(Long productId) { // 1. 先查缓存 Product product cache.get(productId); if (product ! null) { return product; } // 2. 缓存未命中查数据库 product queryFromDB(productId); if (product ! null) { cache.put(productId, product); // 放入缓存 } return product; } // 商品信息变更时更新缓存 public void updateProduct(Product product) { cache.put(product.getId(), product); } // 商品下架时移除缓存 public void removeProduct(Long productId) { cache.remove(productId); } private Product queryFromDB(Long productId) { // 模拟数据库查询 return new Product(productId, 商品 productId, 100.0); } }CopyOnWriteArrayList写时复制读无锁写加锁全量复制适用于CopyOnWriteArrayList的情况。public class CopyOnWriteArrayListE implements ... { /** 底层存储永远是 volatile 数组 */ private transient volatile Object[] array; /** 独占锁保护写操作 */ final transient ReentrantLock lock new ReentrantLock(); }public boolean add(E e) { final ReentrantLock lock this.lock; // ★ 1. 加锁 —— 同一时间只允许一个线程写 lock.lock(); try { Object[] elements getArray(); // 拿到旧数组 int len elements.length; // ★ 2. 写时复制 —— 拷贝一份新数组长度1 Object[] newElements Arrays.copyOf(elements, len 1); // ★ 3. 在新数组上修改 newElements[len] e; // ★ 4. 原子替换 —— volatile write读线程立即可见 setArray(newElements); return true; } finally { lock.unlock(); // 释放锁 } }使用案例黑白名单过滤器场景网关服务需要维护一个动态的黑名单IP列表管理员可以随时增删但每次请求都要检查。Component public class IpBlacklistFilter { // 黑名单列表读多写极少 private final CopyOnWriteArrayListString blacklist new CopyOnWriteArrayList(); PostConstruct public void init() { // 启动时加载初始黑名单 blacklist.addAll(Arrays.asList(192.168.1.100, 10.0.0.50)); } /** * 每次请求都会调用读操作无锁性能极高 */ public boolean isBlocked(String ip) { return blacklist.contains(ip); // 读操作不加锁 } /** * 管理员手动添加黑名单写操作较少 */ public void addToBlacklist(String ip) { blacklist.addIfAbsent(ip); // 不存在才添加 System.out.println(黑名单添加: ip); } /** * 管理员手动移除黑名单 */ public void removeFromBlacklist(String ip) { blacklist.remove(ip); System.out.println(黑名单移除: ip); } /** * 获取当前黑名单列表遍历快照 */ public ListString getBlacklist() { return new ArrayList(blacklist); // 返回快照避免外部修改 } } // 使用 RestController public class GatewayController { Autowired private IpBlacklistFilter blacklistFilter; GetMapping(/api/**) public ResponseEntity? handleRequest(HttpServletRequest request) { String clientIp request.getRemoteAddr(); if (blacklistFilter.isBlocked(clientIp)) { return ResponseEntity.status(403).body(Forbidden); } // 正常处理请求 return ResponseEntity.ok(Success); } }二、并发工具CountDownLatchCountDownLatch是 Java 并发包中的一个倒计数器同步工具它允许一个或多个线程等待直到其他线程完成一组操作。核心概念计数器初始值设置一个正数的计数await()调用此方法会阻塞等待直到计数器变成0countDown()计数器不断减1工作原理图示初始化: CountDownLatch latch new CountDownLatch(3) 主线程调用 latch.await() → 阻塞等待 线程1 完成任务 → latch.countDown() // 计数器: 3 → 2 线程2 完成任务 → latch.countDown() // 计数器: 2 → 1 线程3 完成任务 → latch.countDown() // 计数器: 1 → 0 计数器归零 → 主线程被唤醒 → 继续执行后续代码案例线程池创建任务使用计数器保证所有线程完成后继续执行后续代码public class SimpleCountDownLatchDemo { public static void main(String[] args) throws InterruptedException { int taskCount 5; CountDownLatch latch new CountDownLatch(taskCount); ExecutorService executor Executors.newFixedThreadPool(10); for (int i 0; i taskCount; i) { int taskId i; executor.execute(() - { try { System.out.println(任务 taskId 执行中...); Thread.sleep(1000); System.out.println(任务 taskId 完成); } catch (InterruptedException e) { e.printStackTrace(); } finally { latch.countDown(); // 每个任务完成时计数器减1 } }); } System.out.println(主线程等待所有任务完成...); latch.await(); // 阻塞直到计数器归零 System.out.println(所有任务完成主线程继续执行); executor.shutdown(); } }SemaphoreSemaphore 本质上就是基于AQSAbstractQueuedSynchronizer的一个共享锁实现。Semaphore(3) │ ▼ AQS.state 3 ← 许可证数量保存在 state 中 │ ├── acquire() → CAS 尝试将 state 减 1 │ ├── 成功state 0→ 获得许可继续执行 │ └── 失败state 0 → 进入 CLH 队列阻塞等待 │ └── release() → CAS 将 state 加 1 └── 唤醒队列中等待的第一个线程源码简化// acquire() 的底层逻辑 public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); // 调用 AQS 的共享模式获取 } // AQS 内部 public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (tryAcquireShared(arg) 0) { // 尝试获取返回负数表示失败 doAcquireSharedInterruptibly(arg); // 失败则入队阻塞 } } // Semaphore 的非公平实现 final int nonfairTryAcquireShared(int acquires) { for (;;) { int available getState(); // 当前可用许可数 int remaining available - acquires; if (remaining 0 || // 许可不够 compareAndSetState(available, remaining)) { // CAS 更新 return remaining; } } } // release() 的底层逻辑 protected final boolean tryReleaseShared(int releases) { for (;;) { int current getState(); int next current releases; if (compareAndSetState(current, next)) { // CAS 增加许可 return true; } } }本质就是用cas保证并发的情况下使用aqs的阻塞队列等待state是volatile保证线程可见性的当state 0时不许可当state 0时可以执行。限流这种需求很重要是控制并发请求数的以下为Semaphore的案例public class ApiRateLimiter { private final Semaphore semaphore; public ApiRateLimiter(int maxConcurrentRequests) { this.semaphore new Semaphore(maxConcurrentRequests); } public void handleRequest(String requestId) { // 尝试获取许可获取不到直接返回失败 if (!semaphore.tryAcquire()) { System.out.println(请求 requestId 被限流返回 503); return; } try { System.out.println(请求 requestId 开始处理当前并发 (maxConcurrent - semaphore.availablePermits())); // 实际的业务处理 Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { semaphore.release(); System.out.println(请求 requestId 处理完成); } } public static void main(String[] args) { ApiRateLimiter limiter new ApiRateLimiter(3); // 最多3个并发 // 模拟10个并发请求 for (int i 0; i 10; i) { int id i; new Thread(() - limiter.handleRequest(req- id)).start(); } } }案例2数据库连接池更真实的限流public class DatabasePool { private final Semaphore semaphore; private final int maxConnections; public DatabasePool(int maxConnections) { this.maxConnections maxConnections; this.semaphore new Semaphore(maxConnections, true); // 公平模式 } public void executeQuery(String sql) { boolean acquired false; try { // 尝试获取连接最多等2秒 acquired semaphore.tryAcquire(2, TimeUnit.SECONDS); if (!acquired) { System.out.println(获取连接超时请稍后重试); return; } // 模拟执行SQL System.out.println(Thread.currentThread().getName() 执行: sql 活跃连接数: (maxConnections - semaphore.availablePermits())); Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { if (acquired) { semaphore.release(); } } } public static void main(String[] args) { DatabasePool pool new DatabasePool(3); // 最多3个数据库连接 // 模拟20个查询请求 for (int i 0; i 20; i) { int queryId i; new Thread(() - pool.executeQuery(SELECT * FROM orders WHERE id queryId)).start(); } } }案例3文件上传限流控制磁盘IO并发public class FileUploadLimiter { private final Semaphore semaphore; public FileUploadLimiter(int maxConcurrentUploads) { this.semaphore new Semaphore(maxConcurrentUploads); } public void uploadFile(String fileName, byte[] data) { try { semaphore.acquire(); System.out.println(开始上传: fileName); // 模拟文件写入 try (FileOutputStream fos new FileOutputStream(new File(fileName))) { fos.write(data); Thread.sleep(2000); // 模拟上传耗时 } System.out.println(上传完成: fileName); } catch (Exception e) { System.out.println(上传失败: fileName); } finally { semaphore.release(); } } public static void main(String[] args) { FileUploadLimiter limiter new FileUploadLimiter(2); // 最多同时上传2个文件 for (int i 0; i 10; i) { int fileId i; new Thread(() - limiter.uploadFile(file_ fileId .txt, data.getBytes())).start(); } } }到目前为止我们学的所有并发知识——锁、CAS、AQS、并发容器——解决的都是线程之间抢资源的问题。这些问题的特点是冲突发生在 CPU 内部速度极快。一个 CAS 操作几十纳秒一个锁的获取和释放几微秒一个线程上下文切换几微秒这些开销虽然存在但对于业务系统来说基本可以接受。你写十个线程抢一个AtomicInteger性能损失微乎其微。当线程遇到 I/O 时画风突变。来看一组数据操作耗时比例CPU 执行一条指令0.3 纳秒1x一次内存访问100 纳秒300x一次 SSD 随机读10 微秒30,000x一次网络请求同机房0.5 毫秒1,500,000x一次数据库查询10 毫秒30,000,000x一次磁盘寻道10 毫秒30,000,000x一个数据库查询的时间足够 CPU 执行 3000 万条指令。这意味着什么意味着如果你的线程在等数据库返回结果它基本上就是在睡觉——而且是深度睡眠。下一站BIO、NIO、AIO当线程遇上 I/O才是并发真正的战场。