在 Java 1.5 之前,开发者处理并发同步的唯一武器是 synchronized 关键字。虽然它简单、安全,且随着 JVM 的优化(偏向锁、轻量级锁等)在性能上已表现出色,但它本质上是一辆 “自动挡汽车”:虽然好开,却无法实现精准的制动、复杂的坡道起步或是极速下的换挡操控。
随着业务场景复杂度的飙升——我们需要锁的公平性、需要超时放弃机制、需要可中断的等待,以及在海量读取场景下实现更细粒度的读写分离。于是,java.util.concurrent.locks 体系应运而生。
体系介绍 AQS:万锁之魂 AQS (AbstractQueuedSynchronizer) 是支撑整个并发大厦的钢筋骨架。它巧妙地利用了一个 volatile 修饰的 state 变量和一个基于 CHL 锁(一种公平自旋锁)的双向链表,优雅地解决了线程排队、阻塞与唤醒的难题。理解了 AQS,就等于掌握了 JUC 体系的底层密码。
显式锁:掌控力的重塑 显式锁不再依赖 JVM 隐式的指令拦截,而是通过 Java 代码构建了一套完整的同步协议。它将 “进入临界区” 和 “离开临界区” 的控制权彻底交还给了开发者。
ReentrantLock :赋予了我们尝试拿锁(tryLock)和响应中断的能力。
ReentrantReadWriteLock :通过“读写分离”大幅提升了高并发读场景下的吞吐量。
StampedLock :以 “乐观读” 的黑科技,将无锁编程的思想引入了锁的体系。
同步器:协作的艺术 除了互斥,并发编程的另一大主题是 “协作”。JUC 提供的同步器组件:
CountDownLatch :发令枪(不可复用)。
CyclicBarrier :循环栅栏(可复用)。
Semaphore :车位管理员。
Exchanger :接头人。
Phaser :移相器。
LockSupport :线程阻塞原语,AQS 的底层核心。
本文将带你撕开 AQS 的神秘面纱,从底层源码到生产实战,深度剖析显式锁与同步器是如何在多核时代,通过精准的节律控制,将乱序的线程编织成高效执行的交响乐。
AQS AQS 是整个并发包 juc 的灵魂,它为构建锁(Lock)和同步器(Synchronizer)提供了一套通用的多线程排队与状态管理框架。在 AbstractQueuedSynchronizer (AQS)中,所有复杂的同步逻辑最终都坍缩为两件事:
对 state 的状态维护。
对 Node(head/tail)双向链表的精细操作。
AQS 的地基 AQS 的真正 “地基” 是由 volatile state、CAS (Unsafe) 和 LockSupport 构成的 “三驾马车” 以及 Java 层面的的 CLH 变体双向链表。它平衡了多核 CPU 的缓存可见性与操作系统线程调度的沉重开销,是 Java 并发编程史上最牛逼的工业设计之一。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 private static final Unsafe U = Unsafe.getUnsafe();private static final long STATE = U.objectFieldOffset(AbstractQueuedSynchronizer.class, "state" );private static final long HEAD = U.objectFieldOffset(AbstractQueuedSynchronizer.class, "head" );private static final long TAIL = U.objectFieldOffset(AbstractQueuedSynchronizer.class, "tail" );static { Class<?> ensureLoaded = LockSupport.class; } abstract static class Node { volatile Node prev; volatile Node next; Thread waiter; volatile int status; final boolean casPrev (Node c, Node v) { return U.weakCompareAndSetReference(this , PREV, c, v); } final boolean casNext (Node c, Node v) { return U.weakCompareAndSetReference(this , NEXT, c, v); } final int getAndUnsetStatus (int v) { return U.getAndBitwiseAndInt(this , STATUS, ~v); } final void setPrevRelaxed (Node p) { U.putReference(this , PREV, p); } final void setStatusRelaxed (int s) { U.putInt(this , STATUS, s); } final void clearStatus () { U.putIntOpaque(this , STATUS, 0 ); } private static final long STATUS = U.objectFieldOffset(Node.class, "status" ); private static final long NEXT = U.objectFieldOffset(Node.class, "next" ); private static final long PREV = U.objectFieldOffset(Node.class, "prev" ); } static final class ExclusiveNode extends Node { } static final class SharedNode extends Node { } static final class ConditionNode extends Node implements ForkJoinPool .ManagedBlocker { ConditionNode nextWaiter; public final boolean isReleasable () { return status <= 1 || Thread.currentThread().isInterrupted(); } public final boolean block () { while (!isReleasable()) LockSupport.park(); return true ; } } private transient volatile Node head;private transient volatile Node tail;private volatile int state;protected final int getState () { return state; }protected final void setState (int newState) { state = newState; }protected final boolean compareAndSetState (int expect, int update) { return U.compareAndSetInt(this , STATE, expect, update); }
AQS 工作流程的形象拆解 我们可以把 AQS 的工作流程拆解为四个极简的步骤。
Step1:核心大管家 State(那把唯一的钥匙) :
state = 0:钥匙在桌上,谁都能抢。
state = 1:钥匙被某人拿走了。
state > 1:那个人不仅拿了钥匙,还怕弄丢,又给自己多套了几层(重入锁)。
它是怎么抢的?线程会用 CAS(原子操作)去改这个 state。就像几个人同时伸手抓钥匙,但物理定律保证只有一个人能抓到。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 protected final boolean compareAndSetState (int expect, int update) { return U.compareAndSetInt(this , STATE, expect, update); }
让我们看看 ReentrantLock 是怎么调用这个方法的。这是非公平锁(NonfairSync)尝试拿锁的代码片段:
1 2 3 4 5 6 7 8 9 final void lock () { if (compareAndSetState(0 , 1 )) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1 ); }
Step2:CLH 候车大厅(那个排队的双向链表)
如果没抢到钥匙怎么办?AQS 会把你领到一个双向链表队列里排队。这就是你看到的 Node head 和 Node tail。入队流程:
你(线程)被打包成一个 Node;
你走到队尾,拍拍前一个人的肩膀(prev 指向他),说:“兄弟,我排你后面。”
前一个人也转头确认一下(next 指向你)。
这个队列被设计成双向的,这样的话,如果你排队排一半不想排了(超时或被中断),你可以告诉前后的兄弟:“我撤了,你们直接连上吧。”
在 AQS 中,入队是自荐(CAS 抢 tail),出队是上位(抢锁成功后替代 head)。
入队:新节点通过 casTail 实现多线程下的有序排队,prev 链接是入队的通行证。
阻塞:入队后,线程在 for(;;) 循环中通过 LockSupport.park 闭目养神。
出队:前驱释放锁并 unpark 后继。后继苏醒,抢锁成功,通过 head = node 完成出队,老 head 被 GC 回收。
入队流程 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 final void enqueue (Node node) { if (node != null ) { for (;;) { Node t = tail; node.setPrevRelaxed(t); if (t == null ) tryInitializeHead(); else if (casTail(t, node)) { t.next = node; if (t.status < 0 ) LockSupport.unpark(node.waiter); break ; } } } }
获取锁过程中的入队:acquire 里的隐式入队
1 2 3 4 5 6 7 8 9 10 11 12 else if (pred == null ) { node.waiter = current; Node t = tail; node.setPrevRelaxed(t); if (t == null ) tryInitializeHead(); else if (!casTail(t, node)) node.setPrevRelaxed(null ); else t.next = node; }
出队流程:acquire 成功后的 “晋升”。在 AQS 中,并没有一个专门叫 dequeue 的方法。所谓 “出队”,其实是排在队首的节点抢锁成功后,将自己设为新的 head,并断开与旧 head 的连接。
1 2 3 4 5 6 7 8 9 10 11 12 13 if (acquired) { if (first) { node.prev = null ; head = node; pred.next = null ; node.waiter = null ; } return 1 ; }
释放唤醒:signalNext(Node h)
1 2 3 4 5 6 7 private static void signalNext (Node h) { Node s; if (h != null && (s = h.next) != null && s.status != 0 ) { s.getAndUnsetStatus(WAITING); LockSupport.unpark(s.waiter); } }
Step3:阻塞逻辑 - LockSupport(闭目养神)
线程在 AQS 里的生活就像是:“醒了抢锁 -> 抢不到继续睡 -> 睡醒了再抢”。这种机制被称为 “自旋 + 阻塞” 的混合模式,兼顾了低延迟和低 CPU 损耗。
排好队后,线程不会一直盯着窗口看(那太累了,浪费 CPU),而是会进入 “挂起” 状态。逻辑:
你会问前一个人:“你是老大(Head)吗?”
如果是,你会最后尝试抢一次钥匙(万一老大办完事刚走呢)。
如果抢不到,你就在自己位子上坐下,闭上眼睛睡觉(LockSupport.park()),把自己的状态设为 WAITING。
阻塞逻辑的核心源码:这段逻辑嵌套在 acquire 方法的 for(;;) 循环中。请看 Java 17 的具体实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 else if (node.status == 0 ) { node.status = WAITING; } else { long nanos; spins = postSpins = (byte )((postSpins << 1 ) | 1 ); if (!timed) LockSupport.park(this ); else if ((nanos = time - System.nanoTime()) > 0L ) LockSupport.parkNanos(this , nanos); else break ; node.clearStatus(); if ((interrupted |= Thread.interrupted()) && interruptible) break ; }
新建节点:status = 0。
准备阻塞:线程在 acquire 循环里执行 node.status = WAITING。
进入睡眠:执行 LockSupport.park()。
被唤醒:执行 node.clearStatus(),状态回滚到 0。
出现意外:如果等待过程中被中断,执行 node.status = CANCELLED。
Step4:唤醒逻辑 - 叫号系统
当窗口的人(Thread-A)办完事了:
交还钥匙:把 state 改回 0。
叫号:他看一眼身后(head.next)。如果发现你在睡觉,他会拍拍你的肩膀(LockSupport.unpark())。
接班:你醒了,发现自己就在 head 后面,于是顺理成章地拿走钥匙,把 head 变成你自己。
手撕 AQS 独占模式
AQS 独占模式交互演示
重置
ExclusiveOwnerThread
None
Thread A Lock
Thread B Lock
Unlock
等待操作...
实现自己的 AQS:MyAQS
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 import sun.misc.Unsafe;import java.lang.reflect.Field;import java.util.concurrent.locks.LockSupport;public abstract class MyAQS { private static final Unsafe U; private static final long STATE, HEAD, TAIL; static { try { Field f = Unsafe.class.getDeclaredField("theUnsafe" ); f.setAccessible(true ); U = (Unsafe) f.get(null ); STATE = U.objectFieldOffset(MyAQS.class.getDeclaredField("state" )); HEAD = U.objectFieldOffset(MyAQS.class.getDeclaredField("head" )); TAIL = U.objectFieldOffset(MyAQS.class.getDeclaredField("tail" )); } catch (Exception e) { throw new ExceptionInInitializerError (e); } } private volatile int state; protected final int getState () { return state; } protected final void setState (int s) { state = s; } protected final boolean compareAndSetState (int expect, int update) { return U.compareAndSwapInt(this , STATE, expect, update); } static final int WAITING = 1 ; static final int CANCELLED = -1 ; static class Node { volatile Node prev; volatile Node next; Thread waiter; volatile int status; Node() {} Node(Thread waiter) { this .waiter = waiter; } private static final long PREV; private static final long NEXT; private static final long STATUS; static { try { PREV = U.objectFieldOffset(Node.class.getDeclaredField("prev" )); NEXT = U.objectFieldOffset(Node.class.getDeclaredField("next" )); STATUS = U.objectFieldOffset(Node.class.getDeclaredField("status" )); } catch (NoSuchFieldException e) { throw new ExceptionInInitializerError (e); } } final void setPrevRelaxed (Node p) { U.putObject(this , PREV, p); } } private transient volatile Node head; private transient volatile Node tail; public final void acquire (int arg) { if (!tryAcquire(arg)) { Node node = enqueue(); boolean interrupted = false ; for (;;) { Node pred = node.prev; if (pred == head && tryAcquire(arg)) { setHead(node); pred.next = null ; if (interrupted) { Thread.currentThread().interrupt(); } return ; } if (node.status == 0 ) { node.status = WAITING; } else { LockSupport.park(this ); if (Thread.interrupted()) { interrupted = true ; } node.status = 0 ; } } } } public final void release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.next != null ) { signalNext(h); } } } private Node enqueue () { Node node = new Node (Thread.currentThread()); for (;;) { Node t = tail; if (t == null ) { Node h = new Node (); if (U.compareAndSwapObject(this , HEAD, null , h)) { tail = h; } } else { node.setPrevRelaxed(t); if (U.compareAndSwapObject(this , TAIL, t, node)) { t.next = node; return node; } } } } private void setHead (Node node) { head = node; node.waiter = null ; node.prev = null ; } private void signalNext (Node h) { Node s = h.next; if (s == null || s.status < 0 ) { s = null ; for (Node t = tail; t != null && t != h; t = t.prev) { if (t.status >= 0 ) { s = t; } } } if (s != null ) { if (U.compareAndSwapInt(s, Node.STATUS, WAITING, 0 )) { LockSupport.unpark(s.waiter); } } } protected abstract boolean tryAcquire (int arg) ; protected abstract boolean tryRelease (int arg) ; }
编写简单的 lock 验证 AQS 的逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public class MyLock extends MyAQS { public void lock () { acquire(1 ); } public void unlock () { release(1 ); } @Override protected boolean tryAcquire (int arg) { return compareAndSetState(0 , 1 ); } @Override protected boolean tryRelease (int arg) { setState(0 ); return true ; } public static void main (String[] args) throws InterruptedException { MyLock lock = new MyLock (); int [] count = {0 }; Thread t1 = new Thread (() -> { for (int i = 0 ; i < 100000 ; i++) { lock.lock(); count[0 ]++; lock.unlock(); } }); Thread t2 = new Thread (() -> { for (int i = 0 ; i < 100000 ; i++) { lock.lock(); count[0 ]++; lock.unlock(); } }); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println("最终结果 (应为 200000): " + count[0 ]); } }
AQS 共享模式又当如何? 共享模式最复杂的地方在于 “唤醒传播”(Propagating):在独占模式下,一个人出去了,只需叫醒后继的一个人;而在共享模式下(如 Semaphore 或 CountDownLatch),如果满足条件,一个人的释放会引发连锁反应,瞬间叫醒队列中所有排队的共享线程。
为了增加共享模式的逻辑,我们需要区分节点类型。共享节点不仅要能排队,还要知道自己是 “共享” 的。完整代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 import sun.misc.Unsafe;import java.lang.reflect.Field;import java.util.concurrent.locks.LockSupport;public abstract class MyAQS { private static final Unsafe U; private static final long STATE, HEAD, TAIL; private static final long NODE_STATUS, NODE_NEXT, NODE_PREV; static { try { Field f = Unsafe.class.getDeclaredField("theUnsafe" ); f.setAccessible(true ); U = (Unsafe) f.get(null ); STATE = U.objectFieldOffset(MyAQS.class.getDeclaredField("state" )); HEAD = U.objectFieldOffset(MyAQS.class.getDeclaredField("head" )); TAIL = U.objectFieldOffset(MyAQS.class.getDeclaredField("tail" )); NODE_STATUS = U.objectFieldOffset(Node.class.getDeclaredField("status" )); NODE_NEXT = U.objectFieldOffset(Node.class.getDeclaredField("next" )); NODE_PREV = U.objectFieldOffset(Node.class.getDeclaredField("prev" )); } catch (Exception e) { throw new ExceptionInInitializerError (e); } } private volatile int state; protected final int getState () { return state; } protected final void setState (int s) { state = s; } protected final boolean compareAndSetState (int expect, int update) { return U.compareAndSwapInt(this , STATE, expect, update); } static final int WAITING = 1 ; static final int CANCELLED = -1 ; static final int PROPAGATE = -2 ; static class Node { volatile Node prev; volatile Node next; volatile int status; volatile Thread waiter; volatile Node nextWaiter; static final Node SHARED = new Node (); static final Node EXCLUSIVE = null ; Node() {} Node(Thread waiter, Node mode) { this .waiter = waiter; this .nextWaiter = mode; } final boolean isShared () { return nextWaiter != EXCLUSIVE; } final void setPrevRelaxed (Node p) { U.putObject(this , NODE_PREV, p); } } private transient volatile Node head; private transient volatile Node tail; public final void acquire (int arg) { if (!tryAcquire(arg)) { Node node = enqueue(Node.EXCLUSIVE); boolean interrupted = false ; for (;;) { Node pred = node.prev; if (pred == head && tryAcquire(arg)) { setHead(node); pred.next = null ; if (interrupted) { Thread.currentThread().interrupt(); } return ; } if (shouldParkAfterFailedAcquire(pred, node)) { LockSupport.park(this ); if (Thread.interrupted()) { interrupted = true ; } } } } } public final void acquireShared (int arg) { if (tryAcquireShared(arg) < 0 ) { doAcquireShared(arg); } } private void doAcquireShared (int arg) { final Node node = enqueue(Node.SHARED); boolean interrupted = false ; try { for (;;) { final Node p = node.prev; if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); node.next = null ; if (interrupted) { Thread.currentThread().interrupt(); } return ; } } if (shouldParkAfterFailedAcquire(p, node)) { LockSupport.park(this ); if (Thread.interrupted()) { interrupted = true ; } } } } catch (Throwable t) { throw t; } } private Node enqueue (Node mode) { Node node = new Node (Thread.currentThread(), mode); for (;;) { Node t = tail; if (t == null ) { Node h = new Node (); if (U.compareAndSwapObject(this , HEAD, null , h)) { tail = h; } } else { node.setPrevRelaxed(t); if (U.compareAndSwapObject(this , TAIL, t, node)) { t.next = node; return node; } } } } private void setHead (Node node) { head = node; node.waiter = null ; node.prev = null ; } private void setHeadAndPropagate (Node node, int propagate) { Node h = head; setHead(node); if (propagate > 0 || h == null || h.status < 0 || (h = head) == null || h.status < 0 ) { Node s = node.next; if (s == null || s.isShared()) { doReleaseShared(); } } } private boolean shouldParkAfterFailedAcquire (Node pred, Node node) { int ws = pred.status; if (ws == WAITING) return true ; if (ws > 0 ) { do { node.prev = pred = pred.prev; } while (pred.status > 0 ); pred.next = node; } else { U.compareAndSwapInt(pred, NODE_STATUS, ws, WAITING); } return false ; } public final void release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.status != 0 ) { signalNext(h); } } } public final void releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); } } private void doReleaseShared () { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.status; if (ws == WAITING) { if (!U.compareAndSwapInt(h, NODE_STATUS, WAITING, 0 )) { continue ; } signalNext(h); } else if (ws == 0 && !U.compareAndSwapInt(h, NODE_STATUS, 0 , PROPAGATE)) { continue ; } } if (h == head) { break ; } } } private void signalNext (Node h) { Node s = h.next; if (s == null || s.status < 0 ) { s = null ; for (Node t = tail; t != null && t != h; t = t.prev) { if (t.status >= 0 ) { s = t; } } } if (s != null ) { LockSupport.unpark(s.waiter); } } protected abstract boolean tryAcquire (int arg) ; protected abstract boolean tryRelease (int arg) ; protected abstract int tryAcquireShared (int arg) ; protected abstract boolean tryReleaseShared (int arg) ; }
对应的 MyLock 更新:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 public class MyLock extends MyAQS { private volatile Thread exclusiveOwnerThread; public void lock () { acquire(1 ); } public void unlock () { release(1 ); } @Override protected boolean tryAcquire (int arg) { Thread current = Thread.currentThread(); int s = getState(); if (s == 0 ) { if (compareAndSetState(0 , 1 )) { exclusiveOwnerThread = current; return true ; } } else if (current == exclusiveOwnerThread) { setState(s + arg); return true ; } return false ; } @Override protected boolean tryRelease (int arg) { if (Thread.currentThread() != exclusiveOwnerThread) { throw new IllegalMonitorStateException (); } int nextc = getState() - arg; if (nextc == 0 ) { exclusiveOwnerThread = null ; setState(nextc); return true ; } setState(nextc); return false ; } @Override protected int tryAcquireShared (int arg) { return 0 ; } @Override protected boolean tryReleaseShared (int arg) { return false ; } public static void main (String[] args) throws InterruptedException { MyLock lock = new MyLock (); int [] count = {0 }; Thread t1 = new Thread (() -> { for (int i = 0 ; i < 100000 ; i++) { lock.lock(); count[0 ]++; lock.unlock(); } }); Thread t2 = new Thread (() -> { for (int i = 0 ; i < 100000 ; i++) { lock.lock(); count[0 ]++; lock.unlock(); } }); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println("最终结果 (应为 200000): " + count[0 ]); } }
用我们的 MyAQS 实现一个门栓所(CountDownLatch 简化版)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 public class MyCountDownLatch { private static class Sync extends MyAQS { Sync(int count) { setState(count); } @Override protected int tryAcquireShared (int arg) { return (getState() == 0 ) ? 1 : -1 ; } @Override protected boolean tryReleaseShared (int arg) { for (;;) { int c = getState(); if (c == 0 ) { return false ; } int nextc = c - 1 ; if (compareAndSetState(c, nextc)) { return nextc == 0 ; } } } @Override protected boolean tryAcquire (int arg) { return false ; } @Override protected boolean tryRelease (int arg) { return false ; } } private final Sync sync; public MyCountDownLatch (int count) { if (count < 0 ) throw new IllegalArgumentException ("count < 0" ); this .sync = new Sync (count); } public void await () { sync.acquireShared(1 ); } public void countDown () { sync.releaseShared(1 ); } public static void main (String[] args) throws InterruptedException { int runnerCount = 5 ; MyCountDownLatch latch = new MyCountDownLatch (1 ); for (int i = 0 ; i < runnerCount; i++) { final int id = i; new Thread (() -> { System.out.println("运动员 " + id + " 已就位,等待发令枪..." ); latch.await(); System.out.println("运动员 " + id + " 冲出起跑线!" ); }).start(); } Thread.sleep(2000 ); System.out.println("--- 砰!发令枪响 ---" ); latch.countDown(); } }
测试结果:
1 2 3 4 5 6 7 8 9 10 11 运动员 2 已就位,等待发令枪... 运动员 0 已就位,等待发令枪... 运动员 3 已就位,等待发令枪... 运动员 4 已就位,等待发令枪... 运动员 1 已就位,等待发令枪... --- 砰!发令枪响 --- 运动员 0 冲出起跑线! 运动员 1 冲出起跑线! 运动员 3 冲出起跑线! 运动员 2 冲出起跑线! 运动员 4 冲出起跑线!
你可以将独占模式 (Exclusive Mode) 想象成只有一个坑位的单人洗手间(ReentrantLock),它的核心逻辑是:一次只能进一个人,其他人只能在门口排队。主要规则是:
抢占 (Lock):门锁着吗?没锁我就冲进去,反手把门锁上(CAS 修改 state 为 1)。
重入 (Reentrancy):如果你已经在里面了,你还可以进出洗手间里的隔断(state 从 1 变成 2),毕竟里面只有你。
阻塞 (Park):如果门锁着,外面的人(Thread B/C)只能在门口排队(进入 CLH 队列),并且闭目养神(LockSupport.park)。
唤醒 (Unlock):你出来时,必须彻底打开门锁(state 归零),然后拍拍后面那个人的肩膀:“喂,该你了。”(unparkSuccessor)。
而对共享模式 (Shared Mode),可以把它想象成电影院的检票闸机(Semaphore 或 CountDownLatch),它的核心逻辑是:只要票没检完,大家可以一起进,且一个人进去了会招呼后面的人赶紧进。主要规则是:
获取 (Acquire):闸机显示还有 5 张票(state = 5)。
传播 (Propagate) —— 这是最关键的区别:第一个人刷票进去后,他不会像独占锁那样进完就关门,而是会回头招手喊:票还有,后面的快来!(setHeadAndPropagate)。
并发执行:只要票没发完,Thread A、B、C 可以同时在放映厅里看电影。他们之间不是互斥的,而是共享这个资源。
释放 (Release):当一个观众看完出来(state++),他释放了一个空位,这同样可能引发 “连锁反应”,唤醒还在门口等票的人。
总结一句话:独占锁是 “我锁门,你等着”,强调的是绝对的安全;共享锁是 “我进门,你跟上”,强调的是极致的效率。
CAS 和 LockSupport 以上我们已经知道 AQS 的根基是 volatile state 以及 CAS and LockSupport 操作。我们现在将目光移到 CAS 和 LockSupport,打破砂锅问到底,这 TM 究竟都是什么东西?
CAS 的底层实现
当你在 Java 中调用 compareAndSetInt 时,HotSpot C++ 代码最终会生成一条特定的 CPU 指令。在 x86 架构下,CAS 使用 lock cmpxchg 指令来实现。这条指令会触发总线锁(Bus Lock)或缓存锁(Cache Lock,告诉 CPU:在这个指令执行期间,锁定这块内存地址,任何其他核心都不准改它。所以 CAS 的原子性并不是靠软件实现的,而是靠 CPU 硬件电路保证的。
1 2 3 4 5 6 public final class Unsafe { @IntrinsicCandidate public final native boolean compareAndSetInt (Object o, long offset, int expected, int x) ; }
第一层:JNI 映射 (unsafe.cpp)。源码路径 src/hotspot/share/prims/unsafe.cpp
1 2 3 4 5 6 7 8 9 10 11 12 13 UNSAFE_ENTRY (jboolean, Unsafe_CompareAndSetInt (JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x)) { oop p = JNIHandles::resolve (obj); void * addr = index_oop_from_field_offset_long (p, offset); return Atomic::cmpxchg ((jint*)addr, e, x) == e; } UNSAFE_END
第二层:原子操作抽象层 (atomic.hpp)。JVM 为了跨平台,定义了一个 Atomic 类。它会根据不同的操作系统和 CPU 架构(如 x86, ARM, RISC-V)选择不同的实现。
1 2 3 4 5 template <typename T, typename D, typename U>inline D Atomic::cmpxchg (D* dest, T compare_value, U exchange_value) { return PlatformCmpxchg <sizeof (D)>()(dest, compare_value, exchange_value); }
第三层:硬件级别实现 (以 x86 为例)。这是最关键的一层。在 Linux x86 架构下,JVM 会直接写一段内联汇编。源码路径:src/hotspot/os_cpu/linux_x86/atomic_linux_x86.hpp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 template <>template <typename T>inline T Atomic::PlatformCmpxchg<4 >::operator ()(T* dest, T compare_value, T exchange_value) const { __asm__ volatile ( "lock cmpxchgl %1,(%3)" : "=a" (exchange_value) : "r" (exchange_value), "a" (compare_value), "r" (dest) : "cc" , "memory" ) ; return exchange_value; }
相比于传统的锁,CAS 的 “节省” 体现在避免了内核态切换(Context Switch)。
传统锁(重量级):如果拿不到锁,CPU 会把当前线程挂起(Park),这涉及到从 “用户态” 切换到 “内核态”,保存寄存器快照,存入调度队列。这套流程需要消耗数千个 CPU 周期。
CAS(无锁):它只是一条普通的汇编指令。如果失败了,线程还在运行,只需在循环里再执行一次指令。在竞争不激烈时,它比 “挂起再唤醒” 快得多。
CAS(无锁)
底层物理原理:MESI 协议与总线仲裁 。CAS 能够实现原子性且高效,依赖的是 CPU 硬件内部的缓存一致性协议,最常见的是 MESI 协议。在早期的 CPU 中,lock 前缀真的会锁住总线(Bus),导致其他核心无法访问内存,这确实很浪费。但现代 CPU 使用的是缓存锁定:
当一个核心要执行 lock cmpxchgl 时,它会尝试获取该内存地址所在的缓存行(Cache Line)的独占权(Exclusive)。
如果该缓存行已经在其 L1/L2 缓存中,CPU 只需在内部标记该行被锁定,而不需要去锁总线。
真正物理成本在硬件冲突判定 ,这是 CAS 真正消耗资源的地方(电子信号在总线上的往返时间):
在执行 lock cmpxchgl 指令时,Core 1 的指令流水线(Pipeline)会卡住。
Core 1 发出 RFO。
它必须等待总线仲裁,等待 Core 2/3 的 ACK 回执。这个过程可能消耗几十或上百个时钟周期(几十纳秒),相比传统的重量级锁动不动消耗几千甚至上万个时钟周期(~10 微秒),效率有百倍的提升。
在此期间,Core 1 的这个特定执行单元是停滞的。这就是物理意义上的消耗。
如果 RFO 拿到了,执行了 cmpxchg,但发现内存里的值已经被别人改了(Compare 失败):
指令结束:这条 cmpxchg 指令就算执行完了,它会返回一个 “失败” 的结果。
Java层面:在我们的 MyAQS 自旋锁里,你会看到一个 for(;;)。CAS 失败后,程序会立刻发起下一次循环,重新尝试执行指令。
如果 Core 1 和 Core 2 同时发送 RFO 想改同一个地址,总线仲裁器(Bus Arbiter)会根据算法(如轮询或固定优先级)判定谁赢。输掉的核心必须撤回请求,等赢家改完并释放缓存行后,再重新发起 RFO。在高并发下,大量的 CPU 周期都浪费在了 “等回执” 和 “等仲裁” 上。虽然 CPU 占用率看着是 100%,但实际在做有用功(有效指令执行)的时间比例很低。这就是总线风暴(Bus Storm)。
传统重量级锁:钱花在哪了?
用户态到内核态的切换 (Trap):当 synchronized 升级为重量级锁,或者 ReentrantLock 竞争失败导致线程挂起时,JVM 会调用操作系统的底层接口(如 Linux 的 futex)。
CPU 必须停止当前指令流,执行一条 syscall 指令。
硬件要进行特权级检查。
上下文切换 (Context Switch):这是最沉重的负担。
保存现场:把当前线程的寄存器值、程序计数器(PC)、堆栈指针(SP)存入内存(PCB 进程控制块)。
调度决策:操作系统内核决定下一个该谁运行。
恢复现场:从内存读取另一个线程的状态,装载进寄存器。
TLB/缓存失效:新的线程运行在不同的内存地址空间,CPU 的 TLB(地址转换快照) 和 L1/L2 缓存可能全部作废,导致新线程刚开始跑的时候极慢。
需要注意:CAS 也不是所有场景下都有优势。如果互斥方法执行时间较长(排队时间很长),CAS 情况下 CPU 会一直处于 100% 的忙碌状态,做无效的自旋。如果 100 个线程自旋 1 毫秒,消耗的能量和对总线的压力是灾难性的。而对重量级锁而言,虽然切换的那一下很贵,但一旦线程睡着了,它完全不消耗 CPU 资源。CAS 适合 ‘短平快’ 的场景,即预期竞争很快就能结束。如果你预见到这把锁会被持有很多秒,请毫不犹豫地使用阻塞锁(重量级锁),因为保护 CPU 的‘体力’(时钟周期)比那点切换开销更重要。
LockSupport 的底层实现
实际上,LockSupport 也是 Unsafe 的 “高级包装”,而 Unsafe 是 JVM 留给 Java 通往 C++ 与操作系统的 “后门”。翻开 LockSupport 的源码,你会发现它的每一个核心方法(park, unpark)最终都调用了 Unsafe:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class LockSupport { private static final Unsafe U = Unsafe.getUnsafe(); private static final long PARKBLOCKER = U.objectFieldOffset(Thread.class, "parkBlocker" ); private static final long TID = U.objectFieldOffset(Thread.class, "tid" ); public static void park (Object blocker) { Thread t = Thread.currentThread(); U.putReferenceOpaque(t, PARKBLOCKER, blocker); U.park(false , 0L ); U.putReferenceOpaque(t, PARKBLOCKER, null ); } public static void unpark (Thread thread) { if (thread != null ) { U.unpark(thread); } } }
执行链条:
Java 层调用 U.park()。
JVM 根据方法名找到对应的 C++ 实现函数(在 HotSpot 源码中通常是 Unsafe_Park)。
C++ 函数内部获取当前线程对应的 C++ Parker 对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 package jdk.internal.misc; public final class Unsafe { @IntrinsicCandidate public native void park (boolean isAbsolute, long time) ; @IntrinsicCandidate public native void unpark (Object thread) ; }
park 和 unpark 在 C++ 层面是基于线程级许可机制实现的每个 Java 线程在 JVM 内部都关联一个 Parker 实例。它的核心是一个计数器 _counter(即我们常说的 Permit)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 class Parker : public os::PlatformParker {private : volatile int _counter ; public : Parker () : _counter(0 ) {} void park (bool isAbsolute, jlong time) ; void unpark () ; }; void Parker::park (bool isAbsolute, jlong time) { if (Atomic::xchg (0 , &_counter) > 0 ) return ; Thread* thread = Thread::current (); assert (thread != NULL , "invariant" ); if (Thread::is_interrupted (thread, false )) return ; pthread_mutex_lock (_mutex); if (_counter > 0 ) { _counter = 0 ; pthread_mutex_unlock (_mutex); return ; } if (time == 0 ) { pthread_cond_wait (_cond, _mutex); } else { os::Linux::safe_cond_timedwait (_cond, _mutex, &absTime); } _counter = 0 ; pthread_mutex_unlock (_mutex); } void Parker::unpark () { int s ; pthread_mutex_lock (_mutex); s = _counter; _counter = 1 ; if (s < 1 ) { pthread_cond_signal (_cond); } pthread_mutex_unlock (_mutex); }
unpark(thread) :将该线程的 Permit 置为 1。如果线程正在 park 阻塞中,通过底层操作系统的条件变量(Condition Variable)叫醒它。
park() :检查 Permit。
如果是 1,立刻消费掉它(变回 0)并返回(这就是为什么先 unpark 再 park 不会阻塞)。
如果是 0,调用操作系统的 pthread_cond_wait(Linux/Unix)或 WaitForSingleObject(Windows)。
显示锁们 ReentrantLock 实际案例
解决超卖问题的库存扣减(并发资源竞争)
这是分布式系统或电商场景中最经典的案例。虽然现在常用 Redis 分布式锁,但在单机环境下,ReentrantLock 是性能和可靠性的首选。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class StockService { private int stock = 100 ; private final ReentrantLock lock = new ReentrantLock (); public void deductStock (int count) { lock.lock(); try { if (stock >= count) { Thread.sleep(10 ); stock -= count; System.out.println(Thread.currentThread().getName() + " 扣减成功,剩余:" + stock); } else { System.out.println(Thread.currentThread().getName() + " 库存不足" ); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } }
带有超时的尝试锁(避免长时间阻塞)
在高性能接口中,我们不希望线程因为拿不到锁而无限期挂起(这会导致 Tomcat 线程池耗尽)。利用 tryLock() 可以实现 “拿不到就跑” 或者 “等一会再跑” 的逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public void tryOrder (long timeout, TimeUnit unit) { try { if (lock.tryLock(timeout, unit)) { try { doCreateOrder(); } finally { lock.unlock(); } } else { throw new RuntimeException ("系统繁忙,请稍后再试" ); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }
多条件唤醒的阻塞队列(Condition 的精准接力)
这是 ReentrantLock 相比 synchronized 最强大的地方——支持多个 Condition。它允许你精准地唤醒某类线程(比如只唤醒消费者,或者只唤醒生产者),这也是 Java ArrayBlockingQueue 的底层实现原理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public class MyBuffer { private final ReentrantLock lock = new ReentrantLock (); private final Condition producerCondition = lock.newCondition(); private final Condition consumerCondition = lock.newCondition(); private final Object[] items = new Object [10 ]; private int count = 0 ; public void put (Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) { producerCondition.await(); } items[count++] = x; consumerCondition.signal(); } finally { lock.unlock(); } } public Object take () throws InterruptedException { lock.lock(); try { while (count == 0 ) { consumerCondition.await(); } Object x = items[--count]; producerCondition.signal(); return x; } finally { lock.unlock(); } } }
灵活性:tryLock() 可以防止死锁和长时间卡顿。 公平性:new ReentrantLock(true) 可以开启公平锁(按排队顺序拿锁,虽然性能略低)。 精准度:Condition 可以实现比 notifyAll() 更细粒度的线程调度。
关于 Condition 的介绍 在上面 MyBuffer 的案例中,我们已经看到了Condition 的使用。实际上,在没有 Condition 之前,我们只能用 Object.wait() 和 notify()。但它们有一个致命缺点:无法定向唤醒。如果你调用 notifyAll(),所有在等待的线程(无论是生产者还是消费者)都会被唤醒去抢锁,这会造成严重的 “锁竞争” 和“无效唤醒”。Condition 的出现解决了这个问题:
多休息室机制:一个 Lock 可以创建多个 Condition 实例。
精准唤醒:你可以让生产者去 conditionA 等待,消费者去 conditionB 等待。当空间足够时,只唤醒 conditionA 里的生产者,互不干扰。
特性
Object (wait/notify)
Condition (await/signal)
关联锁
对象的 Monitor 锁
ReentrantLock 实例
等待队列数量
每个对象仅 1 个
每个 Lock 可拥有多个
定向唤醒
不支持(只能 notify 或 notifyAll)
支持
超时/中断响应
支持
支持更丰富的 API (如 awaitNanos)
性能
高并发下竞争激烈
细粒度控制,性能更好
要看透 Condition,你只需要理解 AQS 维护的两套队列:
同步队列 (Sync Queue) :这就是我们之前研究的 CLH 队列,存放的是正在抢锁的线程。
等待队列 (Wait Queue / Condition Queue) :这是 Condition 对象内部维护的一个单向链表。
await() 的过程:
入队:当前线程(已持有锁)被封装成 Node,放入该 Condition 的等待队列中。
释放锁:彻底释放当前持有的锁(state 归零),以便其他线程能拿到锁。
挂起:调用 LockSupport.park(this),线程在这一行停住。
signal() 的过程:
出队:将 Condition 等待队列中的第一个节点移除。
转移:将这个节点重新放入 AQS 的同步队列(Sync Queue)的队尾。
唤醒:此时该线程并没有立即执行,而是开始排队抢锁。抢到锁后,它会从 await() 方法处醒来并继续执行。
避坑指南:
永远在 while 循环中使用 await():这是为了防止虚假唤醒(Spurious Wakeup)。线程醒来后必须再次检查条件(如 count == 0),确保条件真的满足。
必须持有锁:调用 await() 或 signal() 之前必须先 lock.lock(),否则会抛出 IllegalMonitorStateException(这和调用 wait() 必须先进入 synchronized 块是一个道理)。
ReentrantReadWriteLock 我们已经知道,在 AQS 中,state 是一个 32 位的整数。ReentrantReadWriteLock 将其 “切” 成了两半:State = (读锁次数 << 16)+ 写锁重入次数
高 16 位:代表读锁(Shared)的状态,记录持有读锁的次数。
低 16 位:代表写锁(Exclusive)的状态,记录写锁的重入次数。
这种设计利用了位运算的极速特性:
获取读状态:state >>> 16(无符号右移)。
获取写状态:state & 0x0000FFFF(位与掩码)。
读写锁的核心原则是:读读兼容,读写互斥,写写互斥 。适用于 读多写少 (如缓存、配置项)的场景。
写锁(独占模式) ,当线程尝试获取写锁时:
检查 state 是否为 0。
如果 state != 0 且 读锁次数 > 0,获取失败(读锁占着位,写锁进不来)。
如果 state != 0 且 写锁次数 > 0,但持有者不是自己,获取失败。
否则,CAS 增加低 16 位的值。
读锁(共享模式) ,当线程尝试获取读锁时:
检查写锁次数(低 16 位)。
如果写锁次数不为 0,且持有写锁的不是当前线程,获取失败(写锁占着位,读锁进不来)。
否则,CAS 增加高 16 位的值。由于是共享模式,多个线程可以同时成功。
特殊机制,锁降级 (Lock Downgrading):持有写锁的线程,可以再去获取读锁,然后释放写锁。
流程:获取写锁 → 获取读锁 → 释放写锁 → 依然持有读锁。
目的:保证数据可见性。如果你先释放写锁再抢读锁,中间可能被别的写线程插足修改了数据。通过锁降级,可以在不释放 “独占权” 的情况下,平滑过渡到 “共享权”。
注意: 不支持锁升级(持有读锁去抢写锁会造成死锁,因为多个读线程可能都在等对方放锁)。
案例一:高并发本地缓存(最经典)
如果你在 Spring Boot 中自己实现一个简单的缓存池(不使用 Redis),用 HashMap 配合 ReentrantLock 会导致所有读操作也排队,性能极差。而读写锁可以让上千个线程同时读取,只有在数据更新时才阻塞。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class CacheDataCenter { private final Map<String, Object> map = new HashMap <>(); private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock (); private final Lock r = rwl.readLock(); private final Lock w = rwl.writeLock(); public Object get (String key) { r.lock(); try { return map.get(key); } finally { r.unlock(); } } public void put (String key, Object value) { w.lock(); try { map.put(key, value); } finally { w.unlock(); } } }
案例二:锁降级(保证数据可见性)
常用于 “读取-判断-更新” 的逻辑中。比如:你读取了一个配置,发现过期了,需要更新它,但在更新完后你还要继续使用这个配置进行业务处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public void processCachedData () { r.lock(); if (!cacheValid) { r.unlock(); w.lock(); try { if (!cacheValid) { data = fetchFromDB(); cacheValid = true ; } r.lock(); } finally { w.unlock(); } } try { use(data); } finally { r.unlock(); } }
注意:虽然读写锁看起来很美,但它有三个著名的缺陷:开销更大 :它的内部逻辑(处理位运算、两个 Lock 对象、复杂的传播逻辑)比 ReentrantLock 复杂得多。如果读操作耗时极短(比如只是简单的 return a + b),读写锁的额外开销反而会让吞吐量下降。写饥饿(Write Starvation) :如果读线程源源不断,写线程可能永远抢不到锁,一直等下去。不支持锁升级 :持有读锁时申请写锁会导致死锁。
StampedLock 原理和案例 StampedLock 在 JDK 8 中被引入,它并不是 AQS 的子类,而是通过一个 long 类型的 Stamp(戳记)和一套完全不同的状态机来实现的。它的精髓在于引入了 “乐观读” ,这让读操作在大多数情况下完全不加锁。
① 乐观读 (Optimistic Reading) —— 性能杀手锏,这是它快的根本原因。
逻辑:它并不真正加锁,而是返回一个 stamp。所以读操作不会阻塞写操作。
过程:线程读取数据,读完后通过 validate(stamp) 检查。如
果期间没有写操作,校验成功,直接返回;
如果校验失败(说明有写操作进来了),它会升级为传统的悲观读锁。
② 悲观读 (Read Lock)——与 ReentrantReadWriteLock 的读锁类似,会阻塞写操作,但允许多个读线程并发。
③ 写锁 (Write Lock)——独占锁,阻塞一切读写操作。
StampedLock 的状态表示(它使用一个 long state):
低 7 位:记录读锁的数量。
第 8 位:写锁标志位。
其余高位:版本号(每次写操作都会增加版本号,用于 validate 校验)。
典型案例:高性能坐标系点位读取
这是 StampedLock 官方文档中最经典的案例。假设我们要读取一个二维坐标 (x, y),由于 x 和 y 必须保持一致性,使用乐观读可以极大提升效率。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class Point { private double x, y; private final StampedLock sl = new StampedLock (); void moveIfAtOrigin (double newX, double newY) { long stamp = sl.tryOptimisticRead(); double curX = x, curY = y; if (!sl.validate(stamp)) { stamp = sl.readLock(); try { curX = x; curY = y; } finally { sl.unlockRead(stamp); } } if (curX == 0.0 && curY == 0.0 ) { long ws = sl.writeLock(); try { x = newX; y = newY; } finally { sl.unlockWrite(ws); } } } }
同步器们 CountDownLatch 案例 CountDownLatch基于 AQS 的共享模式实现。它的逻辑非常纯粹:就是一个倒计时门闩。
状态定义 (State):AQS 的 state 在这里被定义为计数器,new CountDownLatch(N) 会将 state 设置为 N。
阻塞逻辑 (await):
线程调用 await() 时,会触发 tryAcquireShared。
判断准则:只要 state != 0,tryAcquireShared 就返回负数(表示获取失败)。
结果:线程进入 AQS 同步队列阻塞(LockSupport.park),就像在门口等待起跑指令的运动员。
释放逻辑 (countDown):
线程调用 countDown() 时,会触发 tryReleaseShared。
操作:通过 CAS 将 state 减 1。
关键点:只有当 state 减到 0 的那一刻,tryReleaseShared 才会返回 true。
传播(Propagate):一旦 state 归零,AQS 会调用 doReleaseShared,从头节点开始,依次唤醒所有在队列中等待的线程。
案例一:多服务并行调用的“数据聚合”
在微服务架构中,一个详情页可能需要调用:商品服务、库存服务、评价服务、营销服务。为了提高响应速度,我们会并行调用,最后统一汇总。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public class DataAggregationDemo { public static void main (String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch (4 ); ExecutorService executor = Executors.newFixedThreadPool(4 ); System.out.println("--- 页面请求开始,启动并行查询 ---" ); long start = System.currentTimeMillis(); executor.execute(new Task ("商品详情" , 200 , latch)); executor.execute(new Task ("库存状态" , 150 , latch)); executor.execute(new Task ("用户评价" , 500 , latch)); executor.execute(new Task ("营销活动" , 300 , latch)); boolean success = latch.await(1 , TimeUnit.SECONDS); if (success) { System.out.println("--- 所有数据获取成功,耗时: " + (System.currentTimeMillis() - start) + "ms ---" ); } else { System.out.println("--- 部分接口超时,进行降级处理 ---" ); } executor.shutdown(); } static class Task implements Runnable { private final String name; private final int cost; private final CountDownLatch latch; Task(String name, int cost, CountDownLatch latch) { this .name = name; this .cost = cost; this .latch = latch; } @Override public void run () { try { Thread.sleep(cost); System.out.println(name + " 查询完成" ); } catch (InterruptedException e) { e.printStackTrace(); } finally { latch.countDown(); } } } }
模拟高并发抢购(发令枪模式)
这种模式下,CountDownLatch 的计数器设为 1。所有的“运动员”线程都在等待这一个信号,从而实现瞬间的爆发。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class HighConcurrencyDemo { public static void main (String[] args) throws InterruptedException { CountDownLatch fireGun = new CountDownLatch (1 ); int runners = 5 ; for (int i = 0 ; i < runners; i++) { new Thread (() -> { try { System.out.println(Thread.currentThread().getName() + " 已就位..." ); fireGun.await(); System.out.println(Thread.currentThread().getName() + " 冲击秒杀接口!" ); } catch (InterruptedException e) { e.printStackTrace(); } }, "运动员-" + i).start(); } Thread.sleep(2000 ); System.out.println("--- 砰!发令枪响 ---" ); fireGun.countDown(); } }
分段大数据处理(任务拆解)
假设有一个巨大的任务需要拆分成多个子任务,主线程需要汇总所有子任务的结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class BatchProcessDemo { public static void main (String[] args) throws InterruptedException { int totalRows = 1000 ; int threadCount = 10 ; int rowsPerThread = totalRows / threadCount; CountDownLatch latch = new CountDownLatch (threadCount); AtomicInteger totalProcessed = new AtomicInteger (0 ); for (int i = 0 ; i < threadCount; i++) { final int startRow = i * rowsPerThread; new Thread (() -> { try { System.out.println(Thread.currentThread().getName() + " 正在处理行: " + startRow + " 到 " + (startRow + rowsPerThread)); Thread.sleep((long ) (Math.random() * 1000 )); totalProcessed.addAndGet(rowsPerThread); } catch (InterruptedException e) { e.printStackTrace(); } finally { latch.countDown(); } }).start(); } latch.await(); System.out.println("--- 任务全部完成,总计入库: " + totalProcessed.get() + " 行数据 ---" ); } }
Semaphore 原理和案例 如果说 CountDownLatch 是一个 “倒计时发令枪”,那么 Semaphore 就是一个 “共享资源池的管理员”,它同样是基于 AQS 的 共享模式实现的。Semaphore 维护了一组 “许可”(Permits)。线程在访问资源前必须先获得许可,访问结束后释放许可。不同于 CountDownLatch 的一次性使用,Semaphore 是可以复用的(借了还,还了再借)。
状态定义 (State):AQS 的 state 在这里代表 “当前剩余的许可数量”。new Semaphore(5) 会将 state 设置为 5。
获取许可 (acquire):线程调用 acquire() 时,会触发 tryAcquireShared,它会尝试用 CAS 将 state 减去请求的数量。如果剩余 state >= 0 则 CAS 成功,线程继续执行;如果剩余 state < 0 则 CAS 失败,线程进入 AQS 同步队列阻塞(挂起),等待别人归还许可。
释放许可 (release),线程调用 release() 时,触发 tryReleaseShared。通过 CAS 将 state 加 1(归还许可)。
逻辑:通过 CAS 将 state 加 1(归还许可)。
传播(Propagate):一旦许可归还成功,它会调用 doReleaseShared,唤醒同步队列中排在最前面的线程,让它再次尝试抢票。
CountDownLatch最核心的价值在于 “限流”(Control Concurrency)。
案例:数据库连接池 / 资源池限流
假设数据库只允许 10 个并发连接,如果 100 个线程同时去连,数据库会宕机。我们可以用 Semaphore 挡在前面。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class DatabasePoolDemo { public static void main (String[] args) { Semaphore semaphore = new Semaphore (3 ); ExecutorService executor = Executors.newFixedThreadPool(10 ); for (int i = 0 ; i < 10 ; i++) { final int threadId = i; executor.execute(() -> { try { System.out.println("用户 " + threadId + " 正在排队等连接..." ); semaphore.acquire(); System.out.println("用户 " + threadId + " >>> 成功获取连接,开始查询数据..." ); Thread.sleep((long ) (Math.random() * 3000 )); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("用户 " + threadId + " <<< 释放连接" ); semaphore.release(); } }); } executor.shutdown(); } }
CyclicBarrier 原理和案例 CyclicBarrier(循环栅栏)基于 ReentrantLock 和 Condition 构建,它的核心逻辑是:人齐了才发车,且这辆车可以反复开。其工作机制更像是一个 “关卡”。
计数器逻辑 (parties & count):
parties:初始设定的参与人数(比如 5 人组队)。
count:当前还在等待的人数。每当一个线程调用 await(),count 就会减 1。
阻塞与唤醒 (Condition.await / signalAll):
当线程调用 await() 时,获取独占锁 ReentrantLock,count–。
如果 count > 0,调用 trip.await(),线程进入 Condition 的等待队列并释放锁。
如果 count == 0,说明最后一个人到了!此时会执行可选的 barrierAction(发车指令),然后调用 trip.signalAll() 唤醒所有人。
循环的奥秘 (Generation):
一旦所有人被唤醒,CyclicBarrier 会开启一个 “新一代 (New Generation)”。
它会把 count 重新重置为 parties。这就是为什么它叫 Cyclic(循环)。
CyclicBarrier 通常用于 “多线程分步协同计算” 的场景。
案例:并行计算结果汇总(分阶段任务)
假设一个复杂的计算任务分为三个阶段,必须所有线程都完成第一阶段,才能开始第二阶段。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public class CyclicBarrierDemo { public static void main (String[] args) { int players = 3 ; CyclicBarrier barrier = new CyclicBarrier (players, () -> { System.out.println("--- 所有人都到齐了,关卡开启! ---" ); }); for (int i = 0 ; i < players; i++) { final int id = i; new Thread (() -> { try { System.out.println("玩家 " + id + " 正在加载地图..." ); Thread.sleep((long ) (Math.random() * 2000 )); System.out.println("玩家 " + id + " 已到达第一关栅栏。" ); barrier.await(5 , TimeUnit.SECONDS); System.out.println("玩家 " + id + " 开始打怪..." ); Thread.sleep((long ) (Math.random() * 2000 )); System.out.println("玩家 " + id + " 已到达第二关栅栏。" ); barrier.await(); System.out.println("玩家 " + id + " 顺利通关!" ); } catch (Exception e) { e.printStackTrace(); } }).start(); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 玩家 2 正在加载地图... 玩家 1 正在加载地图... 玩家 0 正在加载地图... 玩家 1 已到达第一关栅栏。 玩家 2 已到达第一关栅栏。 玩家 0 已到达第一关栅栏。 --- 所有人都到齐了,关卡开启! --- 玩家 0 开始打怪... 玩家 1 开始打怪... 玩家 2 开始打怪... 玩家 1 已到达第二关栅栏。 玩家 2 已到达第二关栅栏。 玩家 0 已到达第二关栅栏。 --- 所有人都到齐了,关卡开启! --- 玩家 0 顺利通关! 玩家 1 顺利通关! 玩家 2 顺利通关!
处理方式
特点
适用场景
死等 (await)
简单但危险
确定任务百分百不会失败且不超时的环境
超时 (timed await)
安全,自动唤醒
最通用的后端接口处理方式
手动 reset
暴力重置,全员失败
需要统一回滚或重新开始的任务
Phaser 动态注销
智能降级,存活者继续
复杂的分布式计算,允许部分失败
Phaser 的原理和典型用法 Phaser(移相器)是 Java 7 引入的并发工具,堪称 JUC 家族中最复杂但也最强大的同步辅助类。
简单来说,Phaser = CyclicBarrier + CountDownLatch + 动态调整 + 分层架构 。
Phaser 的核心在于 Phase(阶段/相位)和 Parties(参与者)的动态解耦。
动态注册(Dynamic Registration):CyclicBarrier 的人数在构造时就定死了。而 Phaser 允许你在运行过程中随时 register()(增加人)或 arriveAndDeregister()(减少人)。
状态推进(Phase Advance):Phaser 内部维护一个 phase 数字。当所有注册的参与者都到达(arrive)时,phase 会自动加 1。这就像是游戏里的 “关卡数”。
非阻塞与阻塞的灵活选择:
arrive():我到了,但我还要去干别的,不用等我。
arriveAndAwaitAdvance():我到了,我要在这里等大家都到齐再进下一关。
典型用法:解决“队友掉线”与“动态增减”
案例:支持“自动减员”的任务协同
这个例子展示了:如果一个线程在执行过程中崩溃,它如何 “优雅退出”,让剩下的线程继续运行,而不是大家一起卡死。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 public class PhaserDemo { public static void main (String[] args) { Phaser phaser = new Phaser (3 ) { @Override protected boolean onAdvance (int phase, int registeredParties) { System.out.println("\n=== 第 " + phase + " 阶段完成,当前剩余人数: " + registeredParties + " ===\n" ); return registeredParties == 0 ; } }; for (int i = 0 ; i < 3 ; i++) { final int id = i; new Thread (() -> { try { System.out.println("玩家 " + id + " 正在加载地图..." ); Thread.sleep((long ) (Math.random() * 2000 )); if (id == 1 ) { throw new RuntimeException ("玩家 1 网络断开..." ); } System.out.println("玩家 " + id + " 到达第一关。" ); phaser.arriveAndAwaitAdvance(); System.out.println("玩家 " + id + " 开始打怪..." ); Thread.sleep((long ) (Math.random() * 2000 )); phaser.arriveAndAwaitAdvance(); System.out.println("玩家 " + id + " 顺利通关!" ); phaser.arriveAndDeregister(); } catch (Exception e) { System.err.println("!!! 捕获到异常: " + e.getMessage()); phaser.arriveAndDeregister(); } }).start(); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 玩家 2 正在加载地图... 玩家 0 正在加载地图... 玩家 1 正在加载地图... 玩家 2 到达第一关。 !!! 捕获到异常: 玩家 1 网络断开... 玩家 0 到达第一关。 === 第 0 阶段完成,当前剩余人数: 2 === 玩家 0 开始打怪... 玩家 2 开始打怪... === 第 1 阶段完成,当前剩余人数: 2 === 玩家 2 顺利通关! 玩家 0 顺利通关! === 第 2 阶段完成,当前剩余人数: 0 ===
Exchanger 原理和案例 做一个形象的类比,Exchanger 就像两个人在公园长椅上交换情报:只有当两个人都坐下时,交换才会发生;如果只有一个人到了,他必须一直等着另一个人。Exchanger 是一个用于线程间协作的同步点,主要应用于两个线程在一个点上交换彼此的数据。
核心方法为 exchange(V x),这是一个阻塞式方法。配对逻辑如下:
线程 A 到达交换点,调用 exchange(“情报A”)。
由于 线程 B 还没到,线程 A 进入等待状态(WAITING)。
线程 B 终于到了,调用 exchange(“情报B”)。
关键时刻:Exchanger 内部将 A 的数据给 B,将 B 的数据给 A。
两个线程同时被唤醒,继续执行。
底层机制:槽位 (Slot) 与 消除 (Elimination)
单槽位:在低并发下,内部使用一个 Slot(槽位)。第一个到的线程把数据放进去,第二个到的线程取走并留下自己的数据。
多槽位(Arena):在高并发下,为了减少竞争,Exchanger 内部会使用类似 LongAdder 的 Arena(竞技场)数组。不同的线程对会在不同的槽位上进行交换,极大地提高了吞吐量。
典型案例:双缓冲区(Pipeline Buffer)
这是 Exchanger 最经典、最无可替代的场景:生产者-消费者模型的双缓冲优化。
通常的生产者-消费者模型使用 BlockingQueue。但如果数据量极大,频繁的入队/出队会产生大量的锁竞争。对于这种情况,我们可以准备两个缓冲区(Buffer A 和 Buffer B)。生产者往 A 里塞数据,消费者从 B 里取数据。当生产者塞满了,消费者取空了,两人交换缓冲区。
特性
BlockingQueue
Exchanger
数据流向
单向(生产 → 消费)
双向(互换)
参与人数
任意多个
必须成对 (2个)
性能优势
频繁锁竞争
极低的内存开销(直接引用交换)
典型比喻
传送带
秘密交易
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 public class ExchangerDemo { public static void main (String[] args) { Exchanger<List<String>> exchanger = new Exchanger <>(); new Thread (() -> { List<String> buffer = new ArrayList <>(); try { for (int i = 0 ; i < 2 ; i++) { System.out.println("生产者: 正在填充缓冲区..." ); Thread.sleep(2000 ); buffer.add("数据为 " + Math.random()); System.out.println("生产者: 缓冲区已满,大小 " + buffer.size() + ",等待交换..." ); buffer = exchanger.exchange(buffer); System.out.println("生产者: 交换成功,得到新缓冲区,大小: " + buffer.size()); } } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread (() -> { List<String> buffer = new ArrayList <>(); try { for (int i = 0 ; i < 2 ; i++) { System.out.println("消费者: 正在处理现有的空缓冲区 (等待中)..." ); buffer = exchanger.exchange(buffer); System.out.println("消费者: 拿到数据了,开始消费: " + buffer.get(0 )); Thread.sleep(1000 ); buffer.clear(); } } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } }
1 2 3 4 5 6 7 8 9 10 生产者: 正在填充缓冲区... 消费者: 正在处理现有的空缓冲区 (等待中)... 生产者: 缓冲区已满,大小 1,等待交换... 生产者: 交换成功,得到新缓冲区,大小: 0 生产者: 正在填充缓冲区... 消费者: 拿到数据了,开始消费: 数据为 0.23883715938894112 消费者: 正在处理现有的空缓冲区 (等待中)... 生产者: 缓冲区已满,大小 1,等待交换... 生产者: 交换成功,得到新缓冲区,大小: 0 消费者: 拿到数据了,开始消费: 数据为 0.6445937735171946
Exchanger 需要注意的问题:死锁风险 :如果你的线程数是奇数(比如 3 个线程调用 exchange),那么必然有一个线程会永远等下去,除非你使用带超时版本的 exchange(V x, long timeout, TimeUnit unit)。垃圾回收 :由于两个线程互相持有对方之前的对象引用,如果不注意手动清理,在某些复杂场景下可能会对 GC 造成微弱影响。
该怎么选同步组件?
JUC 同步组件的选择
你的并发场景属于哪种?
独占资源/互斥
限流/资源池
多线程协同等待
两线程数据交换