Java 并发容器详解
引言并发容器是java.util.concurrent包的核心组件专为多线程环境设计替代了传统的Collections.synchronizedXxx()同步包装器。本文将深入讲解三类最常用的并发容器。一、为什么需要并发容器1.1 同步容器的局限// 同步容器所有方法加 synchronizedListStringlistCollections.synchronizedList(newArrayList());// 问题 1复合操作不原子// ❌ 先检查再操作中间可能被其他线程修改if(!list.contains(key)){list.add(key);// 可能重复添加}// 问题 2迭代时需手动加锁// ❌ ConcurrentModificationExceptionfor(Strings:list){// 其他线程修改 list → 异常}// ✅ 必须手动加锁synchronized(list){for(Strings:list){// 安全}}1.2 并发容器的优势优势说明细粒度锁分段锁或 CAS减少锁竞争弱一致性迭代迭代时不抛 ConcurrentModificationException原子复合操作putIfAbsent / computeIfAbsent 等无锁读取ConcurrentHashMap 读操作不需要锁二、ConcurrentHashMap2.1 数据结构演进Java 7分段锁Segment┌─────────────────────────────────────────────┐ │ Segment[0] │ Segment[1] │ ... │ Segment[15]│ │ ┌─────────┐ │ ┌─────────┐│ │ ┌─────────┐│ │ │ HashEntry│ │ │ HashEntry││ │ HashEntry││ │ │ 链表 │ │ │ 链表 ││ │ 链表 ││ │ └─────────┘ │ └─────────┘│ │ └─────────┘│ │ 锁1 │ 锁2 │ │ 锁16 │ └─────────────────────────────────────────────┘ 每个 Segment 一把锁默认 16 个 Segment 并发度 Segment 数量Java 8CAS synchronized细粒度┌──────────────────────────────────────────────┐ │ Node[] table │ │ ┌────┬────┬────┬────┬────┬────┬────┬────┐ │ │ │ N │ N │ N │ N │ N │ N │ N │ N │ │ │ └────┴────┴─┬──┴────┴────┴────┴────┴────┘ │ │ │ │ │ ┌────┴────┐ │ │ │ 链表/红黑树│ │ │ └─────────┘ │ │ │ │ 锁粒度每个桶数组位置一把 synchronized 锁 │ │ 读操作无锁volatile CAS │ │ 写操作锁单个桶 │ └──────────────────────────────────────────────┘2.2 核心操作ConcurrentHashMapString,IntegermapnewConcurrentHashMap();// 基本操作与 HashMap 类似map.put(a,1);map.get(a);// 1map.remove(a);map.size();map.containsKey(a);// 原子复合操作map.putIfAbsent(b,2);// 不存在才放入map.remove(b,2);// 键值都匹配才删除map.replace(b,2,3);// 旧值匹配才替换map.computeIfAbsent(c,k-3);// 不存在则计算并放入map.computeIfPresent(c,(k,v)-v1);// 存在则计算替换map.compute(d,(k,v)-vnull?1:v1);// 无论如何计算map.merge(e,1,Integer::sum);// 存在则合并不存在则放入2.3 computeIfAbsent 的妙用// 场景缓存计算结果ConcurrentHashMapString,ExpensiveResultcachenewConcurrentHashMap();// ❌ 非原子操作if(!cache.containsKey(key)){cache.put(key,expensiveCompute(key));// 可能重复计算}// ✅ 原子操作ExpensiveResultresultcache.computeIfAbsent(key,k-expensiveCompute(k));// ✅ 映射到子 Map多级缓存ConcurrentHashMapString,MapString,IntegermultiMapnewConcurrentHashMap();multiMap.computeIfAbsent(group1,k-newConcurrentHashMap()).put(item1,100);2.4 批量操作Java 8// forEachmap.forEach(1,(k,v)-System.out.println(kv));// 第一个参数是并行度// search找到第一个非 null 的结果Stringfoundmap.search(1,(k,v)-v10?k:null);// reduceIntegersummap.reduceValues(1,Integer::sum);2.5 弱一致性// ConcurrentHashMap 的迭代器是弱一致性的// - 不会抛 ConcurrentModificationException// - 可能反映也可能不反映迭代开始后的修改// - 适用于最终一致可接受的场景ConcurrentHashMapString,IntegermapnewConcurrentHashMap();map.put(a,1);map.put(b,2);// 迭代过程中其他线程可以修改for(Map.EntryString,Integerentry:map.entrySet()){// 不会抛异常但可能看不到最新的修改}2.6 常见陷阱① size() 和 isEmpty() 是近似值// ConcurrentHashMap 的 size() 是各桶的近似求和// 在并发修改时可能不准确intsizemap.size();// 近似值不保证精确② key/value 不能为 nullmap.put(null,1);// ❌ NullPointerExceptionmap.put(key,null);// ❌ NullPointerException③ computeIfAbsent 中不要修改自身 Map// ❌ 死锁 / 无限循环Java 8 已部分修复但仍不推荐map.computeIfAbsent(a,k-{map.put(b,2);// 不要在计算函数中修改同一个 mapreturn1;});三、CopyOnWriteArrayList3.1 核心原理写操作复制整个数组 → 修改副本 → 替换引用 读操作直接读取无锁无等待 ┌──────────────────────────────────────────┐ │ 读线程1 → array ──→ [A, B, C, D, E] │ │ 读线程2 → array ──→ [A, B, C, D, E] │ │ │ │ 写线程add(F) │ │ 1. 复制 → [A, B, C, D, E, F] │ │ 2. 替换引用 → array 指向新数组 │ │ 3. 旧数组由 GC 回收 │ └──────────────────────────────────────────┘3.2 基本操作CopyOnWriteArrayListStringlistnewCopyOnWriteArrayList();list.add(a);// 复制添加list.add(0,b);// 复制插入list.set(0,c);// 复制替换list.remove(a);// 复制删除list.get(0);// 无锁读取list.size();// 无锁读取list.contains(a);// 无锁遍历3.3 适用场景读远多于写如 99:1 ├── 配置列表 ├── 监听器列表Listener ├── 白名单/黑名单 └── 数据字典 不适合的场景 ├── 写操作频繁 → 每次复制开销巨大 ├── 列表很大 → 复制耗时 内存压力 └── 需要强一致性 → 读写可能不一致3.4 CopyOnWriteArraySet// 基于 CopyOnWriteArrayList 实现的 Set// add 时遍历检查重复性能更差CopyOnWriteArraySetStringsetnewCopyOnWriteArraySet();set.add(a);set.add(b);set.contains(a);// O(n) 遍历3.5 注意事项注意说明写操作开销大每次写都复制整个数组O(n)内存占用写期间同时存在两个数组最终一致性迭代器使用创建时的快照不反映后续修改不支持 listIterator迭代器不支持 add/set/remove四、BlockingQueue4.1 核心接口publicinterfaceBlockingQueueEextendsQueueE{// 抛异常booleanadd(Ee);// 队列满抛 IllegalStateExceptionEremove();// 队列空抛 NoSuchElementException// 返回布尔booleanoffer(Ee);// 队列满返回 falseEpoll();// 队列空返回 null// 阻塞等待voidput(Ee)throwsInterruptedException;// 队列满则阻塞等待Etake()throwsInterruptedException;// 队列空则阻塞等待// 超时等待booleanoffer(Ee,longtimeout,TimeUnitunit);Epoll(longtimeout,TimeUnitunit);}4.2 四种操作行为操作抛异常返回值阻塞超时插入add()offer()put()offer(timeout)移除remove()poll()take()poll(timeout)检查element()peek()--4.3 七种 BlockingQueue 实现实现数据结构有界特点ArrayBlockingQueue数组✅ 有界公平/非公平可选LinkedBlockingQueue链表可选默认无界吞吐量通常高于 ArrayPriorityBlockingQueue堆无界按优先级出队SynchronousQueue无-不存储元素直接交付DelayQueue堆无界元素到期才能出队LinkedTransferQueue链表无界transfer 直接交付给消费者LinkedBlockingDeque双向链表可选双端队列4.4 ArrayBlockingQueue// 有界阻塞队列创建时必须指定容量BlockingQueueTaskqueuenewArrayBlockingQueue(1000);// 公平模式等待最久的消费者/生产者优先BlockingQueueTaskfairQueuenewArrayBlockingQueue(1000,true);// 生产者queue.put(newTask());// 队列满时阻塞queue.offer(newTask(),5,SECONDS);// 超时返回 false// 消费者Tasktaskqueue.take();// 队列空时阻塞Tasktaskqueue.poll(5,SECONDS);// 超时返回 null4.5 生产者-消费者模式publicclassProducerConsumer{privatefinalBlockingQueueStringqueuenewArrayBlockingQueue(100);publicvoidstart(){newThread(this::producer).start();newThread(this::consumer).start();}voidproducer(){try{while(true){StringdatafetchData();queue.put(data);// 队列满时自动阻塞}}catch(InterruptedExceptione){Thread.currentThread().interrupt();}}voidconsumer(){try{while(true){Stringdataqueue.take();// 队列空时自动阻塞processData(data);}}catch(InterruptedExceptione){Thread.currentThread().interrupt();}}}4.6 线程池 BlockingQueue// 自定义线程池使用有界队列ThreadPoolExecutorexecutornewThreadPoolExecutor(10,20,60,SECONDS,newArrayBlockingQueue(500),// 有界队列newNamedThreadFactory(worker),newCallerRunsPolicy()// 队列满时由调用者线程执行);4.7 DelayQueue —— 延迟队列// 元素必须实现 Delayed 接口publicclassDelayedTaskimplementsDelayed{privatefinalStringname;privatefinallongexecuteTime;publicDelayedTask(Stringname,longdelayMs){this.namename;this.executeTimeSystem.currentTimeMillis()delayMs;}OverridepubliclonggetDelay(TimeUnitunit){returnunit.convert(executeTime-System.currentTimeMillis(),MILLISECONDS);}OverridepublicintcompareTo(Delayedo){returnLong.compare(this.executeTime,((DelayedTask)o).executeTime);}}// 使用DelayQueueDelayedTaskdelayQueuenewDelayQueue();delayQueue.put(newDelayedTask(task1,5000));// 5秒后执行delayQueue.put(newDelayedTask(task2,3000));// 3秒后执行DelayedTasktaskdelayQueue.take();// 阻塞直到最早的任务到期4.8 SynchronousQueue —— 零容量队列// 不存储元素生产者必须等待消费者接收// 适用于直接交付场景CachedThreadPool 使用BlockingQueueStringqueuenewSynchronousQueue();// 线程1生产者queue.put(data);// 阻塞直到有消费者来取// 线程2消费者Stringdataqueue.take();// 阻塞直到有生产者放入五、其他并发容器5.1 ConcurrentSkipListMap / ConcurrentSkipListSet// 基于跳表的有序并发 Map/Set// 插入/删除/查找 O(log n)ConcurrentSkipListMapString,IntegersortedMapnewConcurrentSkipListMap();sortedMap.put(c,3);sortedMap.put(a,1);sortedMap.put(b,2);// 自然排序a1, b2, c3ConcurrentSkipListSetStringsortedSetnewConcurrentSkipListSet();5.2 ConcurrentLinkedQueue / ConcurrentLinkedDeque// 无界非阻塞队列基于 CAS 实现// 适合高并发场景但不支持阻塞操作ConcurrentLinkedQueueStringqueuenewConcurrentLinkedQueue();queue.offer(a);queue.offer(b);Stringheadqueue.poll();// a非阻塞六、选型指南场景推荐原因高并发键值存储ConcurrentHashMap分段锁/CAS读无锁缓存计算结果ConcurrentHashMap computeIfAbsent原子复合操作读多写少列表CopyOnWriteArrayList写时复制读无锁监听器管理CopyOnWriteArrayList注册/注销少遍历多生产者-消费者ArrayBlockingQueue有界 阻塞任务调度DelayQueue延迟出队有序并发 MapConcurrentSkipListMap跳表 CAS高并发无阻塞队列ConcurrentLinkedQueueCAS无锁总结容器核心机制读性能写性能一致性ConcurrentHashMapCAS synchronized优良弱一致CopyOnWriteArrayList写时复制优差快照一致ArrayBlockingQueueReentrantLock-良强一致ConcurrentSkipListMapCAS 跳表良良弱一致

相关新闻