JUC AQS 及显式锁与同步器

在 Java 1.5 之前,开发者处理并发同步的唯一武器是 synchronized 关键字。虽然它简单、安全,且随着 JVM 的优化(偏向锁、轻量级锁等)在性能上已表现出色,但它本质上是一辆 “自动挡汽车”:虽然好开,却无法实现精准的制动、复杂的坡道起步或是极速下的换挡操控。

随着业务场景复杂度的飙升——我们需要锁的公平性、需要超时放弃机制、需要可中断的等待,以及在海量读取场景下实现更细粒度的读写分离。于是,java.util.concurrent.locks 体系应运而生。


体系介绍

AQS:万锁之魂

AQS (AbstractQueuedSynchronizer) 是支撑整个并发大厦的钢筋骨架。它巧妙地利用了一个 volatile 修饰的 state 变量和一个基于 CHL 锁(一种公平自旋锁)的双向链表,优雅地解决了线程排队、阻塞与唤醒的难题。理解了 AQS,就等于掌握了 JUC 体系的底层密码。


显式锁:掌控力的重塑

显式锁不再依赖 JVM 隐式的指令拦截,而是通过 Java 代码构建了一套完整的同步协议。它将 “进入临界区” 和 “离开临界区” 的控制权彻底交还给了开发者。

  • ReentrantLock:赋予了我们尝试拿锁(tryLock)和响应中断的能力。
  • ReentrantReadWriteLock:通过“读写分离”大幅提升了高并发读场景下的吞吐量。
  • StampedLock:以 “乐观读” 的黑科技,将无锁编程的思想引入了锁的体系。


同步器:协作的艺术

除了互斥,并发编程的另一大主题是 “协作”。JUC 提供的同步器组件:

  • CountDownLatch:发令枪(不可复用)。
  • CyclicBarrier:循环栅栏(可复用)。
  • Semaphore:车位管理员。
  • Exchanger:接头人。
  • Phaser:移相器。
  • LockSupport:线程阻塞原语,AQS 的底层核心。

本文将带你撕开 AQS 的神秘面纱,从底层源码到生产实战,深度剖析显式锁与同步器是如何在多核时代,通过精准的节律控制,将乱序的线程编织成高效执行的交响乐。


AQS

AQS 是整个并发包 juc 的灵魂,它为构建锁(Lock)和同步器(Synchronizer)提供了一套通用的多线程排队与状态管理框架。在 AbstractQueuedSynchronizer (AQS)中,所有复杂的同步逻辑最终都坍缩为两件事:

  • 对 state 的状态维护。
  • 对 Node(head/tail)双向链表的精细操作。


AQS 的地基

AQS 的真正 “地基” 是由 volatile stateCAS (Unsafe)LockSupport 构成的 “三驾马车” 以及 Java 层面的的 CLH 变体双向链表。它平衡了多核 CPU 的缓存可见性与操作系统线程调度的沉重开销,是 Java 并发编程史上最牛逼的工业设计之一。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
/**
* Unsafe:获得 “上帝权限”
* 通常 JVM 像是一个被围墙围起来的 “安全区”,你不能直接碰内存。但 Unsafe 是一把钥匙,允许你越过 JVM,直接操作物理内存。
* 因为 AQS 需要极高的执行效率,它需要像 C++ 一样直接对内存中的变量进行 CAS (原子比较交换),而不能通过普通的 Java 赋值(因为普通赋值不具备多线程下的原子性)。
*/
private static final Unsafe U = Unsafe.getUnsafe();
/**
* 假设一个 AQS 对象是一栋大楼,state 是大楼里的一个保险柜。那么 objectFieldOffset 就是在计算:“从大楼的正门进去,走多少米能准确走到保险柜门口”
* 底层 CAS 指令(如 U.compareAndSetInt)不认识 state 这个变量名,它只认识地址。有了这个偏移量(Offset),CPU 就能直接定位到那个特定的 4 字节(int)位置,瞬间完成值的修改。
*/
private static final long STATE = U.objectFieldOffset(AbstractQueuedSynchronizer.class, "state");
private static final long HEAD = U.objectFieldOffset(AbstractQueuedSynchronizer.class, "head");
private static final long TAIL = U.objectFieldOffset(AbstractQueuedSynchronizer.class, "tail");

static {
// AQS 的运行高度依赖 LockSupport 类(用来挂起和唤醒线程)。
// 为了防止在抢锁的关键时刻,因为 LockSupport 类还没被 JVM 加载而导致短暂的延迟或初始化异常,AQS 在自己被初始化时,触发 JVM 去加载 LockSupport 类。
Class<?> ensureLoaded = LockSupport.class;
}

abstract static class Node {
volatile Node prev; // 指向前驱节点:入队时通过 CAS Tail 挂载
volatile Node next; // 指向后继节点:释放锁时,通过它找到下一个唤醒目标
Thread waiter; // 负载:当前正在队列中 “排队” 的线程对象

/**
* 主要的枚举值(常量)只有四个:
* 0:【默认初始状态】,刚入队或刚醒来时的状态。
* 1:【等待信号】线程准备挂起前,会将自己设为该状态。只有处于此状态,前驱释放锁时才会 unpark 它。
* 2:【条件等待】线程在 Condition 队列中等待(调用了 await)。
* 0x80000000:【已取消】这是一个负数(最高位为 1)。代表线程因为超时、中断或异常,已经放弃了争抢。
* 通过将 CANCELLED 设为负数,程序只需要一条简单的 CPU 指令(判断符号位)就能识别出节点是否失效,而不需要进行复杂的数值比较。
*/
volatile int status; // 节点状态:由外部 owner 修改,或通过原子位运算操作

// 它在清理队列(cleanQueue)时使用,比强一致性的 CAS 性能更好,允许在失败时重试。
final boolean casPrev(Node c, Node v) { // for cleanQueue
return U.weakCompareAndSetReference(this, PREV, c, v);
}
final boolean casNext(Node c, Node v) { // for cleanQueue
return U.weakCompareAndSetReference(this, NEXT, c, v);
}

// 原子性地清除某个状态位并返回旧值。例如,在唤醒前先清除 WAITING 标记,确保唤醒操作的幂等性。
final int getAndUnsetStatus(int v) { // for signalling
return U.getAndBitwiseAndInt(this, STATUS, ~v);
}

// Relaxed 系列方法:
// 告诉编译器和 CPU:“这里不需要强可见性屏障(Fence)”。
// 因为在入队等特定上下文下,单线程语义或随后的 CAS 已经保证了安全性,这样可以减少总线开销。
final void setPrevRelaxed(Node p) { // for off-queue assignment
U.putReference(this, PREV, p);
}
final void setStatusRelaxed(int s) { // for off-queue assignment
U.putInt(this, STATUS, s);
}
final void clearStatus() { // for reducing unneeded signals
U.putIntOpaque(this, STATUS, 0);
}

private static final long STATUS = U.objectFieldOffset(Node.class, "status");
private static final long NEXT = U.objectFieldOffset(Node.class, "next");
private static final long PREV = U.objectFieldOffset(Node.class, "prev");
}

// 抢占失败,线程构造 ExclusiveNode 或 SharedNode,通过 casTail 挂载到 tail 指向的双向链表中。
static final class ExclusiveNode extends Node { } // 独占模式:ReentrantLock 专用
static final class SharedNode extends Node { } // 共享模式:Semaphore/CountDownLatch 专用

// 当你在 ForkJoinPool 的线程中使用 Condition 阻塞时,FJP 能识别出这个 ConditionNode 并自动补偿增加一个工作线程,防止整个池子因为等待而卡死。
static final class ConditionNode extends Node implements ForkJoinPool.ManagedBlocker {
ConditionNode nextWaiter; // 指向 Condition 队列中的下一个等待者
public final boolean isReleasable() {
return status <= 1 || Thread.currentThread().isInterrupted();
}

public final boolean block() {
while (!isReleasable()) LockSupport.park();
return true;
}
}

/**
* CLH 变体双向链表(同步队列):这是线程的 “候车大厅”。
* 队列头:代表当前持有锁的线程(或者 Dummy 节点),在 AQS 队列里,head 永远代表 “那个正在屋里办业务的人”。
* 队列尾:所有新来的抢锁失败线程都 CAS 到这里
*/
private transient volatile Node head;
private transient volatile Node tail;

/**
* 全局同步状态:JUC 各种工具类的逻辑核心,同步器的 “红绿灯”
* state 变量在不同的锁实现中扮演着完全不同的角色:
* 1、在 ReentrantLock 中:它代表 “锁的持有计数”。0 表示无人占用;1 表示被占用;>1 表示重入次数。
* 2、在 Semaphore 中:它代表 “剩余许可证数量”。
* 3、在 CountDownLatch 中:它代表 “还需要等待的事件数量”。
*/
private volatile int state;
protected final int getState() { return state; }
protected final void setState(int newState) { state = newState; }

/**
* 基于 Unsafe 的 CAS 原子操作
* AQS 拒绝使用 synchronized 这种重型工具。它所有的状态切换(如抢锁、入队、出队)都通过 CAS 实现,确保在无锁状态下完成线程安全的原子更新。
*/
protected final boolean compareAndSetState(int expect, int update) {
return U.compareAndSetInt(this, STATE, expect, update);
}


AQS 工作流程的形象拆解

我们可以把 AQS 的工作流程拆解为四个极简的步骤。

Step1:核心大管家 State(那把唯一的钥匙)

  • state = 0:钥匙在桌上,谁都能抢。
  • state = 1:钥匙被某人拿走了。
  • state > 1:那个人不仅拿了钥匙,还怕弄丢,又给自己多套了几层(重入锁)。

它是怎么抢的?线程会用 CAS(原子操作)去改这个 state。就像几个人同时伸手抓钥匙,但物理定律保证只有一个人能抓到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 这个简单的 compareAndSetState 动作,就是 AQS 乃至整个 JUC 并发包的物理基石。
* 它可以原子地将同步状态 state 设置为给定的更新值(update),前提是当前值等于期望值(expect)。该操作具有 volatile 读和写的内存语义。
*
* 你可以把 U.compareAndSetInt 想象成一个在微观世界里发生的 “瞬间判定”。
* 它在 CPU 指令层面(通常是 cmpxchg 指令)完成了三件事:
* ① 看(Read):先看一眼内存中 state 的当前值是不是我以为的那个 expect(通常是 0)。
* ② 比(Compare):如果确实是 0,说明钥匙还在桌上,没被人拿走。
* ③ 换(Swap):瞬间把值改成 update(通常是 1),并返回 true。
*
* 为什么物理定律保证只有一个人能抓到?
* 因为 CPU 会锁住总线或者使用缓存一致性协议(MESI),确保在同一个时刻,只有一个核心能完成这套“看、比、换”动作。
* 如果有两个线程同时伸手,第一个成功的会把 state 从 0 变成 1,第二个线程再去“看”的时候,发现 state 已经是 1 了,不符合它预期的 0,于是“抓取失败”,返回 false。
*/
protected final boolean compareAndSetState(int expect, int update) {
// 使用 Unsafe (U) 直接操作内存中的 STATE 偏移量
return U.compareAndSetInt(this, STATE, expect, update);
}

让我们看看 ReentrantLock 是怎么调用这个方法的。这是非公平锁(NonfairSync)尝试拿锁的代码片段:

1
2
3
4
5
6
7
8
9
final void lock() {
// 线程上来直接 “伸手抓钥匙”
if (compareAndSetState(0, 1))
// 抓到了,把自己标记为锁的主人
setExclusiveOwnerThread(Thread.currentThread());
else
// 没抓到,老老实实去排队(走 acquire 流程)
acquire(1);
}


Step2:CLH 候车大厅(那个排队的双向链表)

如果没抢到钥匙怎么办?AQS 会把你领到一个双向链表队列里排队。这就是你看到的 Node head 和 Node tail。入队流程:

  • 你(线程)被打包成一个 Node;
  • 你走到队尾,拍拍前一个人的肩膀(prev 指向他),说:“兄弟,我排你后面。”
  • 前一个人也转头确认一下(next 指向你)。

这个队列被设计成双向的,这样的话,如果你排队排一半不想排了(超时或被中断),你可以告诉前后的兄弟:“我撤了,你们直接连上吧。”

在 AQS 中,入队是自荐(CAS 抢 tail),出队是上位(抢锁成功后替代 head)。

  1. 入队:新节点通过 casTail 实现多线程下的有序排队,prev 链接是入队的通行证。
  2. 阻塞:入队后,线程在 for(;;) 循环中通过 LockSupport.park 闭目养神。
  3. 出队:前驱释放锁并 unpark 后继。后继苏醒,抢锁成功,通过 head = node 完成出队,老 head 被 GC 回收。

入队流程 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 当线程抢锁失败,或者在 Condition 唤醒后,会调用此方法进入同步队列。
final void enqueue(Node node) {
if (node != null) {
for (;;) { // 自旋:保证在高并发竞争下最终成功入队
Node t = tail;
node.setPrevRelaxed(t); // 1. 设置前驱:由于还没入队,此处不需要屏障
if (t == null) // 2. 懒加载:如果队列为空,先初始化 Head
tryInitializeHead();
else if (casTail(t, node)) { // 3. 抢占队尾:原子地将自己设为新的 tail
t.next = node; // 4. 建立后继连接
if (t.status < 0) // 5. 特殊处理:如果前驱已取消,唤醒自己去清理
LockSupport.unpark(node.waiter);
break;
}
}
}
}

获取锁过程中的入队:acquire 里的隐式入队

1
2
3
4
5
6
7
8
9
10
11
12
// acquire 方法中的片段
else if (pred == null) { // 尚未入队
node.waiter = current;
Node t = tail;
node.setPrevRelaxed(t); // 关联前驱
if (t == null)
tryInitializeHead();
else if (!casTail(t, node)) // CAS 抢占队尾
node.setPrevRelaxed(null); // 失败了回退,下次循环重试
else
t.next = node; // 成功了,正式连接到链表
}

出队流程:acquire 成功后的 “晋升”。在 AQS 中,并没有一个专门叫 dequeue 的方法。所谓 “出队”,其实是排在队首的节点抢锁成功后,将自己设为新的 head,并断开与旧 head 的连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
// 核心源码 (在 acquire 循环内)
// 晋升机制:当你(Thread-B)被唤醒并抢到锁后,你会执行 head = node。此时,原来的 head(Thread-A)就被踢出了队列。
// 你变成了新的 head,但为了保持队列结构,你的 waiter(线程引用)会被置空,变成一个新的“虚拟头节点”。
if (acquired) { // 抢锁成功(tryAcquire 返回 true)
if (first) { // 只有当前驱是 head 的节点才能“出队”
node.prev = null; // 1. 断开与前驱(旧head)的连接
head = node; // 2. 自己上位成为新的 head
pred.next = null; // 3. 协助 GC 回收旧 head
node.waiter = null; // 4. head 节点不存储线程(Dummy Node 逻辑)
// ... 其他逻辑
}
return 1;
}

释放唤醒:signalNext(Node h)

1
2
3
4
5
6
7
private static void signalNext(Node h) {
Node s;
if (h != null && (s = h.next) != null && s.status != 0) {
s.getAndUnsetStatus(WAITING); // 原子地清除等待状态
LockSupport.unpark(s.waiter); // 拍拍肩膀:兄弟,醒醒,该你了
}
}


Step3:阻塞逻辑 - LockSupport(闭目养神)

线程在 AQS 里的生活就像是:“醒了抢锁 -> 抢不到继续睡 -> 睡醒了再抢”。这种机制被称为 “自旋 + 阻塞” 的混合模式,兼顾了低延迟和低 CPU 损耗。

排好队后,线程不会一直盯着窗口看(那太累了,浪费 CPU),而是会进入 “挂起” 状态。逻辑:

  • 你会问前一个人:“你是老大(Head)吗?”
  • 如果是,你会最后尝试抢一次钥匙(万一老大办完事刚走呢)。
  • 如果抢不到,你就在自己位子上坐下,闭上眼睛睡觉(LockSupport.park()),把自己的状态设为 WAITING。

阻塞逻辑的核心源码:这段逻辑嵌套在 acquire 方法的 for(;;) 循环中。请看 Java 17 的具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// ... 前期尝试抢锁失败,且已入队 ...

else if (node.status == 0) { // 当一个 Node 被创建并加入队列时,其 status 默认就是 0,它表示 “既没有取消,也没有进入正式阻塞预备” 的中间状态
// 1. 尝试将状态改为 WAITING (1)
// 只有状态变为 WAITING,前驱释放锁时才会信号通知我
node.status = WAITING;
} else {
// 2. 准备阻塞
long nanos;
// 计算自旋次数(为了减少 park/unpark 的上下文切换开销,新版 AQS 会先短时间自旋)
spins = postSpins = (byte)((postSpins << 1) | 1);

if (!timed)
// 【核心点】调用 LockSupport 挂起当前线程,不再向下执行
LockSupport.park(this);
else if ((nanos = time - System.nanoTime()) > 0L)
// 如果是带超时的获取,则挂起指定时间
LockSupport.parkNanos(this, nanos);
else
break;

// 3. 【线程苏醒点】
// 当其它线程调用 unpark(this) 后,线程会从这里醒来
node.clearStatus(); // 醒来后第一件事:清除 WAITING 状态

// 检查是否是因为被中断才醒来的
if ((interrupted |= Thread.interrupted()) && interruptible)
break;
}
  • 新建节点:status = 0。
  • 准备阻塞:线程在 acquire 循环里执行 node.status = WAITING。
  • 进入睡眠:执行 LockSupport.park()。
  • 被唤醒:执行 node.clearStatus(),状态回滚到 0。
  • 出现意外:如果等待过程中被中断,执行 node.status = CANCELLED。


Step4:唤醒逻辑 - 叫号系统

当窗口的人(Thread-A)办完事了:

  • 交还钥匙:把 state 改回 0。
  • 叫号:他看一眼身后(head.next)。如果发现你在睡觉,他会拍拍你的肩膀(LockSupport.unpark())。
  • 接班:你醒了,发现自己就在 head 后面,于是顺理成章地拿走钥匙,把 head 变成你自己。


手撕 AQS 独占模式

AQS 独占模式交互演示
volatile int state
0
ExclusiveOwnerThread
None
Sync Queue (CLH):
Head
等待操作...

实现自己的 AQS:MyAQS

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
import sun.misc.Unsafe;
import java.lang.reflect.Field;
import java.util.concurrent.locks.LockSupport;

/**
* @author KJ
* @description 实现自己的 AQS
*/
public abstract class MyAQS {

/*-------------------【1】核心骨架与 Unsafe 地址偏移-----------------*/
private static final Unsafe U;
private static final long STATE, HEAD, TAIL;
static {
try {
// 静态初始化 Unsafe 偏移量
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
U = (Unsafe) f.get(null);
STATE = U.objectFieldOffset(MyAQS.class.getDeclaredField("state"));
HEAD = U.objectFieldOffset(MyAQS.class.getDeclaredField("head"));
TAIL = U.objectFieldOffset(MyAQS.class.getDeclaredField("tail"));
} catch (Exception e) {
throw new ExceptionInInitializerError(e);
}
}

/*-----------------【2】volatile state 和 CAS update---------------*/
// 状态位:volatile 保证可见性
private volatile int state;
protected final int getState() { return state; }
protected final void setState(int s) { state = s; }

// 原子修改状态的方法(CAS)
protected final boolean compareAndSetState(int expect, int update) {
return U.compareAndSwapInt(this, STATE, expect, update);
}


/*----------------------【3】定义 Node 与状态常量--------------------*/
static final int WAITING = 1;
static final int CANCELLED = -1;

static class Node {
volatile Node prev;
volatile Node next;
Thread waiter;
volatile int status;

Node() {} // 用于 Head 节点
Node(Thread waiter) {
this.waiter = waiter;
}

// 预先计算好偏移量
private static final long PREV;
private static final long NEXT;
private static final long STATUS;
static {
try {
// JDK 17 中获取偏移量的标准姿势
PREV = U.objectFieldOffset(Node.class.getDeclaredField("prev"));
NEXT = U.objectFieldOffset(Node.class.getDeclaredField("next"));
STATUS = U.objectFieldOffset(Node.class.getDeclaredField("status"));
} catch (NoSuchFieldException e) {
throw new ExceptionInInitializerError(e);
}
}
// 入队时调用
final void setPrevRelaxed(Node p) {
// JDK 17 中 putObject 依然可用,但建议明确偏移量
U.putObject(this, PREV, p);
}
}

// 队列头尾指针
private transient volatile Node head;
private transient volatile Node tail;


/*------------【4】自旋&阻塞:灵魂逻辑-尝试抢锁-抢不到就入队睡觉----------*/
public final void acquire(int arg) {
// 1. 尝试直接抢锁 (非公平体现)
if (!tryAcquire(arg)) {
// 2. 抢不到,构造节点并入队
Node node = enqueue();
boolean interrupted = false; // 引入局部变量追踪中断状态
// 3. 进入自旋+阻塞循环
for (;;) {
Node pred = node.prev;
// 如果前驱是 head,说明轮到我试一下了
if (pred == head && tryAcquire(arg)) {
// 抢锁成功,自己变成新的 head (出队)
setHead(node);
// 助力 GC 回收旧 head
pred.next = null;

//【补偿中断】如果在等待过程中被中断过,但抢锁成功了,我们要把中断状态补回去
if (interrupted) {
// AQS 的设计目标是 “不响应中断”,这意味着即便你中断了我,我也要坚持排队直到拿到锁。
// 但 Java 线程有一个原则:你不处理中断,也要保留中断的证据。
// 这样,用户在调用 lock.lock() 之后,紧接着调用 curThread.isInterrupted() 依然能得到 true,从而由用户决定是否要处理这个信号。
Thread.currentThread().interrupt();
}
return;
}
// 4. 准备睡觉
if (node.status == 0) {
node.status = WAITING; // 挂上 “请叫醒我” 的牌子
} else {
LockSupport.park(this); // 闭目养神

// 醒来后检查:是不是因为中断才醒的?
if (Thread.interrupted()) { // 当前线程标记 volatile boolean interrupted
//【注】
// thead.interrupt():发信号:给目标线程打上中断标记。结果 interrupted 被设为 true。
// thread.isInterrupted():看信号:查看目标线程是否中断,不影响标记。对 interrupted 无影响。
// Thread.interrupted():清信号:查看当前线程是否中断,并重置标记。结果 interrupted 被设为 false。
interrupted = true;
}
node.status = 0; // 被唤醒后重置状态,继续循环抢锁
}
}
}
}

/*-------------------【5】释放逻辑 release (叫号系统)------------------*/
public final void release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.next != null) {
signalNext(h); // 唤醒后继节点
}
}
}

private Node enqueue() {
Node node = new Node(Thread.currentThread());
// 自旋:如果 CAS 失败了,说明有人抢先一步,我得重新尝试,直到成功。
for (;;) {
// 获取当前的队尾
Node t = tail;
// 如果 tail 为空,说明我是第一个排队的人,此时队列还没建立
if (t == null) {
// 创建一个虚拟头节点
Node h = new Node();
// 尝试把这个新节点设为 HEAD
if (U.compareAndSwapObject(this, HEAD, null, h)) {
// 成功后,尾巴也指向它。注意这里没有 break,会进入下一次循环去挂载自己。
tail = h;
}
} else {
// 设置 node 的前驱为当前的 tail(由于存在执行间隙,所以从 tail 往前找是唯一可靠的)
node.setPrevRelaxed(t);
// 尝试把 TAIL 指针从 t 改为我自己(node)
if (U.compareAndSwapObject(this, TAIL, t, node)) {
// 成功则把老 tail 的 next 指向我,双向链接达成
// 注意在进入分支和执行之前,没有任何其他线程能替你执行 t.next = node,你拥有绝对的执行权。
// 但是,你必须接受在这一行代码执行完之前,从 head 往后看的视角里,这个队列是“断”的。
t.next = node;
// 退出循环,入队成功
return node;
}
}
// 如果执行到这里,说明 CAS 失败,别的线程抢了 tail,继续 for 循环。
}
}

private void setHead(Node node) {
head = node;
node.waiter = null;
node.prev = null;
}

private void signalNext(Node h) {
Node s = h.next;
//【深度优化】:处理 “断裂时刻”
// 如果 s 为空,或者 s 的状态是 CANCELLED (负数),我们需要从后往前找
if (s == null || s.status < 0) {
s = null;
// 从 tail 开始向前遍历,直到找到离 h 最近的一个合法节点
for (Node t = tail; t != null && t != h; t = t.prev) {
if (t.status >= 0) { // 发现非取消节点
s = t;
}
}
}
// 唤醒逻辑
if (s != null) {
// 原子清除 WAITING 标记,如果清除成功(说明之前确实在等),则唤醒
if (U.compareAndSwapInt(s, Node.STATUS, WAITING, 0)) {
LockSupport.unpark(s.waiter);
}
}
}

/*-------------------【x】留给子类实现的钩子方法------------------*/
protected abstract boolean tryAcquire(int arg);
protected abstract boolean tryRelease(int arg);
}

编写简单的 lock 验证 AQS 的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/**
* @author KJ
* @description 编写简单的 lock 验证 AQS 的逻辑
*/
public class MyLock extends MyAQS {

public void lock() {
acquire(1);
}

public void unlock() {
release(1);
}

@Override
protected boolean tryAcquire(int arg) {
return compareAndSetState(0, 1);
}

@Override
protected boolean tryRelease(int arg) {
setState(0);
return true;
}

public static void main(String[] args) throws InterruptedException {
MyLock lock = new MyLock();
int[] count = {0};

Thread t1 = new Thread(() -> {
for (int i = 0; i < 100000; i++) {
lock.lock();
count[0]++;
lock.unlock();
}
});

Thread t2 = new Thread(() -> {
for (int i = 0; i < 100000; i++) {
lock.lock();
count[0]++;
lock.unlock();
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("最终结果 (应为 200000): " + count[0]); // 200000
}
}


AQS 共享模式又当如何?

共享模式最复杂的地方在于 “唤醒传播”(Propagating):在独占模式下,一个人出去了,只需叫醒后继的一个人;而在共享模式下(如 SemaphoreCountDownLatch),如果满足条件,一个人的释放会引发连锁反应,瞬间叫醒队列中所有排队的共享线程。

为了增加共享模式的逻辑,我们需要区分节点类型。共享节点不仅要能排队,还要知道自己是 “共享” 的。完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
import sun.misc.Unsafe;
import java.lang.reflect.Field;
import java.util.concurrent.locks.LockSupport;

/**
* @author KJ
* @description 实现自己的 AQS
*/
public abstract class MyAQS {

/*-------------------【1】核心骨架与 Unsafe 地址偏移-----------------*/
private static final Unsafe U;
private static final long STATE, HEAD, TAIL;
private static final long NODE_STATUS, NODE_NEXT, NODE_PREV; // 将 Node 的偏移量也拿到外层来
static {
// 为了彻底解决唤醒传播中断,请将所有偏移量(包括 Node 的)全部移动到顶层静态块中,并确保 Node 的状态字段是 volatile 的。
try {
// 静态初始化 Unsafe 偏移量
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
U = (Unsafe) f.get(null);

STATE = U.objectFieldOffset(MyAQS.class.getDeclaredField("state"));
HEAD = U.objectFieldOffset(MyAQS.class.getDeclaredField("head"));
TAIL = U.objectFieldOffset(MyAQS.class.getDeclaredField("tail"));

// 统一管理 Node 偏移量
NODE_STATUS = U.objectFieldOffset(Node.class.getDeclaredField("status"));
NODE_NEXT = U.objectFieldOffset(Node.class.getDeclaredField("next"));
NODE_PREV = U.objectFieldOffset(Node.class.getDeclaredField("prev"));
} catch (Exception e) {
throw new ExceptionInInitializerError(e);
}
}

/*-----------------【2】volatile state 和 CAS update---------------*/
// 状态位:volatile 保证可见性
private volatile int state;
protected final int getState() { return state; }
protected final void setState(int s) { state = s; }

// 原子修改状态的方法(CAS)
protected final boolean compareAndSetState(int expect, int update) {
return U.compareAndSwapInt(this, STATE, expect, update);
}


/*----------------------【3】定义 Node 与状态常量--------------------*/
static final int WAITING = 1;
static final int CANCELLED = -1;
static final int PROPAGATE = -2;

static class Node {
volatile Node prev;
volatile Node next;
volatile int status;
volatile Thread waiter;
volatile Node nextWaiter;

// 模式常量
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;


Node() {}
Node(Thread waiter, Node mode) {
this.waiter = waiter;
this.nextWaiter = mode;
}

final boolean isShared() {
// 只要不是独占模式(null),就认为是共享模式
return nextWaiter != EXCLUSIVE;
}

// 入队时调用
final void setPrevRelaxed(Node p) {
// JDK 17 中 putObject 依然可用,但建议明确偏移量
U.putObject(this, NODE_PREV, p);
}
}

// 队列头尾指针
private transient volatile Node head;
private transient volatile Node tail;


/*------------【4.1】尝试抢锁----------*/

/**
* 独占模式抢锁
*/
public final void acquire(int arg) {
// 1. 尝试直接抢锁 (非公平体现)
if (!tryAcquire(arg)) {
// 2. 抢不到,构造节点并入队
Node node = enqueue(Node.EXCLUSIVE);
boolean interrupted = false; // 引入局部变量追踪中断状态
// 3. 进入自旋+阻塞循环
for (;;) {
Node pred = node.prev;
// 如果前驱是 head,说明轮到我试一下了
if (pred == head && tryAcquire(arg)) {
// 抢锁成功,自己变成新的 head (出队)
setHead(node);
// 助力 GC 回收旧 head
pred.next = null;

//【补偿中断】如果在等待过程中被中断过,但抢锁成功了,我们要把中断状态补回去
if (interrupted) {
// AQS 的设计目标是 “不响应中断”,这意味着即便你中断了我,我也要坚持排队直到拿到锁。
// 但 Java 线程有一个原则:你不处理中断,也要保留中断的证据。
// 这样,用户在调用 lock.lock() 之后,紧接着调用 curThread.isInterrupted() 依然能得到 true,从而由用户决定是否要处理这个信号。
Thread.currentThread().interrupt();
}
return;
}
// 4. 准备睡觉
if (shouldParkAfterFailedAcquire(pred, node)) {
LockSupport.park(this);
if (Thread.interrupted()) { // 醒来后检查:是不是因为中断才醒的?
//【注】
// thead.interrupt():发信号:给目标线程打上中断标记。结果 interrupted 被设为 true。
// thread.isInterrupted():看信号:查看目标线程是否中断,不影响标记。对 interrupted 无影响。
// Thread.interrupted():清信号:查看当前线程是否中断,并重置标记。结果 interrupted 被设为 false。
interrupted = true;
}
}
}
}
}

/**
* 共享模式抢锁
*/
public final void acquireShared(int arg) { //
// tryAcquireShared 由子类实现:返回负数表示失败,0或正数表示成功
if (tryAcquireShared(arg) < 0) {
doAcquireShared(arg);
}
}

private void doAcquireShared(int arg) {
final Node node = enqueue(Node.SHARED); // 入队并标记为 SHARED
boolean interrupted = false;
try {
for (;;) {
final Node p = node.prev;
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
// 【核心】上位并尝试唤醒后继
setHeadAndPropagate(node, r);
node.next = null; // help GC
if (interrupted) {
Thread.currentThread().interrupt();
}
return;
}
}
// 阻塞逻辑与独占模式一致
if (shouldParkAfterFailedAcquire(p, node)) {
LockSupport.park(this);
if (Thread.interrupted()) {
interrupted = true;
}
}
}
} catch (Throwable t) {
// 实际 AQS 这里会有 cancelAcquire(node);
throw t;
}
}

private Node enqueue(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 自旋:如果 CAS 失败了,说明有人抢先一步,我得重新尝试,直到成功。
for (;;) {
// 获取当前的队尾
Node t = tail;
// 如果 tail 为空,说明我是第一个排队的人,此时队列还没建立
if (t == null) {
// 创建一个虚拟头节点
Node h = new Node();
// 尝试把这个新节点设为 HEAD
if (U.compareAndSwapObject(this, HEAD, null, h)) {
// 成功后,尾巴也指向它。注意这里没有 break,会进入下一次循环去挂载自己。
tail = h;
}
} else {
// 设置 node 的前驱为当前的 tail(由于存在执行间隙,所以从 tail 往前找是唯一可靠的)
node.setPrevRelaxed(t);
// 尝试把 TAIL 指针从 t 改为我自己(node)
if (U.compareAndSwapObject(this, TAIL, t, node)) {
// 成功则把老 tail 的 next 指向我,双向链接达成
// 注意在进入分支和执行之前,没有任何其他线程能替你执行 t.next = node,你拥有绝对的执行权。
// 但是,你必须接受在这一行代码执行完之前,从 head 往后看的视角里,这个队列是“断”的。
t.next = node;
// 退出循环,入队成功
return node;
}
}
// 如果执行到这里,说明 CAS 失败,别的线程抢了 tail,继续 for 循环。
}
}

private void setHead(Node node) {
head = node;
node.waiter = null;
node.prev = null;
// 注意:不要清空 nextWaiter,因为 isShared() 依赖它判断传播模式
}

private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);

// 原版 AQS 的逻辑非常严密:
// 只要有剩余资源 (propagate > 0),或者旧的/新的 head 状态显示需要传播
if (propagate > 0 || h == null || h.status < 0 ||
(h = head) == null || h.status < 0) {

Node s = node.next;
// 【核心修复】:如果 s == null,绝对不能停止,必须去 doReleaseShared
// 因为 s == null 可能意味着后继节点正在入队,next 指针还没连上
if (s == null || s.isShared()) {
doReleaseShared();
}
}
}

private boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 判定是否应该阻塞:只有前驱节点状态为 WAITING 时,当前线程才安心睡觉。
int ws = pred.status;
if (ws == WAITING) return true;
if (ws > 0) {
// 前驱被取消了,跳过它
do {
node.prev = pred = pred.prev;
} while (pred.status > 0);
pred.next = node;
} else {
// 将前驱状态设为 WAITING,暗示它待会叫醒我
U.compareAndSwapInt(pred, NODE_STATUS, ws, WAITING); // 使用顶层定义的偏移量 NODE_STATUS
}
return false;
}


/*-------------------【5】释放逻辑 release (叫号系统)------------------*/
public final void release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// if (h != null && h.next != null) {
if (h != null && h.status != 0) { // 修复点:只要 head 不为空且状态不是 0,就说明可能有人在排队,必须尝试唤醒
signalNext(h); // 唤醒后继节点
}
}
}

public final void releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
}
}

private void doReleaseShared() {
for (;;) {
Node h = head;
// 队列里至少要有两个节点(虚拟头 + 等待者)才需要唤醒
if (h != null && h != tail) {
int ws = h.status;
if (ws == WAITING) {
// 必须 CAS 成功才唤醒,防止并发下多次 signalNext
if (!U.compareAndSwapInt(h, NODE_STATUS, WAITING, 0)) {
continue;
}
signalNext(h);
}
// 如果状态已经是 0,说明唤醒正在进行中,我们需要把它设为 PROPAGATE,确保后续的 setHeadAndPropagate 能读到这个负数状态,从而继续传播
else if (ws == 0 && !U.compareAndSwapInt(h, NODE_STATUS, 0, PROPAGATE)) { // 如果 head 没变,说明没有新节点入队,传播可以暂时结束。-2 对应 PROPAGATE
continue;
}
}
// 只有当 head 绝对没有变动时,才认为这一轮唤醒任务完成了
if (h == head) {
break;
}
}
}

private void signalNext(Node h) {
Node s = h.next;
//【深度优化】:处理 “断裂时刻”
// 如果 s 为空,或者 s 的状态是 CANCELLED (负数),我们需要从后往前找
if (s == null || s.status < 0) {
s = null;
// 从 tail 开始向前遍历,直到找到离 h 最近的一个合法节点
for (Node t = tail; t != null && t != h; t = t.prev) {
if (t.status >= 0) { // 发现非取消节点
s = t;
}
}
}
// 唤醒逻辑
if (s != null) {
LockSupport.unpark(s.waiter);
}
}

/*-------------------【x】留给子类实现的钩子方法------------------*/
protected abstract boolean tryAcquire(int arg);
protected abstract boolean tryRelease(int arg);
protected abstract int tryAcquireShared(int arg); // 返回负数表示获取失败、返回0表示获取成功但不需要传播、返回正数表示获取成功且可以叫醒后继共享节点
protected abstract boolean tryReleaseShared(int arg); // 返回 true:释放成功,可能需要叫醒后继
}

对应的 MyLock 更新:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
/**
* @author KJ
* @description 编写简单的 lock 验证 AQS 的逻辑
*/
public class MyLock extends MyAQS {
// 记录当前占用锁的线程,这是实现排他锁的关键,volatile 保证多线程可见性
private volatile Thread exclusiveOwnerThread;

public void lock() {
acquire(1);
}

public void unlock() {
release(1);
}

@Override
protected boolean tryAcquire(int arg) {
Thread current = Thread.currentThread();
int s = getState();
if (s == 0) {
if (compareAndSetState(0, 1)) {
exclusiveOwnerThread = current;
return true;
}
} else if (current == exclusiveOwnerThread) {
// 支持重入
setState(s + arg);
return true;
}
return false;
}

@Override
protected boolean tryRelease(int arg) {
if (Thread.currentThread() != exclusiveOwnerThread) {
throw new IllegalMonitorStateException();
}
int nextc = getState() - arg;
if (nextc == 0) {
// 先清空 owner,再改 state
exclusiveOwnerThread = null;
setState(nextc);
return true;
}
setState(nextc);
return false;
}

@Override
protected int tryAcquireShared(int arg) {
return 0;
}

@Override
protected boolean tryReleaseShared(int arg) {
return false;
}

public static void main(String[] args) throws InterruptedException {
MyLock lock = new MyLock();
int[] count = {0};

Thread t1 = new Thread(() -> {
for (int i = 0; i < 100000; i++) {
lock.lock();
count[0]++;
lock.unlock();
}
});

Thread t2 = new Thread(() -> {
for (int i = 0; i < 100000; i++) {
lock.lock();
count[0]++;
lock.unlock();
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("最终结果 (应为 200000): " + count[0]); // 200000
}
}

用我们的 MyAQS 实现一个门栓所(CountDownLatch 简化版)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
/**
* @author KJ
* @description 实现一个简易版“发车器”
*/
public class MyCountDownLatch {

private static class Sync extends MyAQS {
Sync(int count) {
setState(count);
}

/**
* 共享模式获取:
* 返回 1 (>=0) 表示门闩已开,线程可以通行。
* 返回 -1 (<0) 表示门闩未开,线程需要入队阻塞。
*/
@Override
protected int tryAcquireShared(int arg) {
// AQS 规范:返回正数表示不仅我成功了,后面的人可能也能成功
// 只要计数器归零,我们就返回 1,给 setHeadAndPropagate 充足的动力
return (getState() == 0) ? 1 : -1;
}

/**
* 共享模式释放:
* 每调用一次,state 减 1。
* 只有当 state 减到 0 的那一刻,才返回 true,触发 MyAQS 的 doReleaseShared。
*/
@Override
protected boolean tryReleaseShared(int arg) {
for (;;) {
int c = getState();
if (c == 0) {
return false; // 已经归零了,不需要再释放
}
int nextc = c - 1;
if (compareAndSetState(c, nextc)) {
// 只有当计数器变为 0 时,才返回 true 以触发唤醒传播
return nextc == 0;
}
}
}

// 独占模式钩子,对于 Latch 来说不需要实现
@Override
protected boolean tryAcquire(int arg) { return false; }
@Override
protected boolean tryRelease(int arg) { return false; }
}

private final Sync sync; // 内部同步器
public MyCountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

/**
* 等待计数归零
*/
public void await() {
sync.acquireShared(1);
}

/**
* 计数减一
*/
public void countDown() {
sync.releaseShared(1);
}

public static void main(String[] args) throws InterruptedException {
int runnerCount = 5;
MyCountDownLatch latch = new MyCountDownLatch(1); // 初始计数为 1

for (int i = 0; i < runnerCount; i++) {
final int id = i;
new Thread(() -> {
System.out.println("运动员 " + id + " 已就位,等待发令枪...");
latch.await(); // 所有运动员在此阻塞
System.out.println("运动员 " + id + " 冲出起跑线!");
}).start();
}

Thread.sleep(2000); // 模拟准备时间
System.out.println("--- 砰!发令枪响 ---");
latch.countDown(); // 计数归零,唤醒所有线程
}
}

测试结果:

1
2
3
4
5
6
7
8
9
10
11
运动员 2 已就位,等待发令枪...
运动员 0 已就位,等待发令枪...
运动员 3 已就位,等待发令枪...
运动员 4 已就位,等待发令枪...
运动员 1 已就位,等待发令枪...
--- 砰!发令枪响 ---
运动员 0 冲出起跑线!
运动员 1 冲出起跑线!
运动员 3 冲出起跑线!
运动员 2 冲出起跑线!
运动员 4 冲出起跑线!

你可以将独占模式 (Exclusive Mode) 想象成只有一个坑位的单人洗手间(ReentrantLock),它的核心逻辑是:一次只能进一个人,其他人只能在门口排队。主要规则是:

  • 抢占 (Lock):门锁着吗?没锁我就冲进去,反手把门锁上(CAS 修改 state 为 1)。
  • 重入 (Reentrancy):如果你已经在里面了,你还可以进出洗手间里的隔断(state 从 1 变成 2),毕竟里面只有你。
  • 阻塞 (Park):如果门锁着,外面的人(Thread B/C)只能在门口排队(进入 CLH 队列),并且闭目养神(LockSupport.park)。
  • 唤醒 (Unlock):你出来时,必须彻底打开门锁(state 归零),然后拍拍后面那个人的肩膀:“喂,该你了。”(unparkSuccessor)。

而对共享模式 (Shared Mode),可以把它想象成电影院的检票闸机(Semaphore 或 CountDownLatch),它的核心逻辑是:只要票没检完,大家可以一起进,且一个人进去了会招呼后面的人赶紧进。主要规则是:

  • 获取 (Acquire):闸机显示还有 5 张票(state = 5)。
  • 传播 (Propagate) —— 这是最关键的区别:第一个人刷票进去后,他不会像独占锁那样进完就关门,而是会回头招手喊:票还有,后面的快来!(setHeadAndPropagate)。
  • 并发执行:只要票没发完,Thread A、B、C 可以同时在放映厅里看电影。他们之间不是互斥的,而是共享这个资源。
  • 释放 (Release):当一个观众看完出来(state++),他释放了一个空位,这同样可能引发 “连锁反应”,唤醒还在门口等票的人。

总结一句话:独占锁是 “我锁门,你等着”,强调的是绝对的安全;共享锁是 “我进门,你跟上”,强调的是极致的效率。


CAS 和 LockSupport

以上我们已经知道 AQS 的根基是 volatile state 以及 CAS and LockSupport 操作。我们现在将目光移到 CAS 和 LockSupport,打破砂锅问到底,这 TM 究竟都是什么东西?

CAS 的底层实现

当你在 Java 中调用 compareAndSetInt 时,HotSpot C++ 代码最终会生成一条特定的 CPU 指令。在 x86 架构下,CAS 使用 lock cmpxchg 指令来实现。这条指令会触发总线锁(Bus Lock)或缓存锁(Cache Lock,告诉 CPU:在这个指令执行期间,锁定这块内存地址,任何其他核心都不准改它。所以 CAS 的原子性并不是靠软件实现的,而是靠 CPU 硬件电路保证的。

1
2
3
4
5
6
public final class Unsafe {
// ...
@IntrinsicCandidate
public final native boolean compareAndSetInt(Object o, long offset, int expected, int x); // Java 代码 → Unsafe (JNI) → C++ Atomic → CPU lock cmpxchg 指令。
// ...
}

第一层:JNI 映射 (unsafe.cpp)。源码路径 src/hotspot/share/prims/unsafe.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSetInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x)) {
// 1. 将 Java 对象引用转换为 JVM 内部的 oop (Ordinary Object Pointer)
oop p = JNIHandles::resolve(obj);

// 2. 根据偏移量计算出变量在内存中的实际物理地址
// p 是对象起始地址,offset 是我们在 Java 层预先计算好的偏移量
void* addr = index_oop_from_field_offset_long(p, offset);

// 3. 调用 Atomic 类的静态方法进行原子比较并交换
// addr: 目标地址, e: 预期值 (expect), x: 更新值 (update)
// 返回值是交换前的值,如果返回 e,说明交换成功
return Atomic::cmpxchg((jint*)addr, e, x) == e;
} UNSAFE_END

第二层:原子操作抽象层 (atomic.hpp)。JVM 为了跨平台,定义了一个 Atomic 类。它会根据不同的操作系统和 CPU 架构(如 x86, ARM, RISC-V)选择不同的实现。

1
2
3
4
5
// 这是一个模板方法,最终会路由到具体平台的实现
template <typename T, typename D, typename U>
inline D Atomic::cmpxchg(D* dest, T compare_value, U exchange_value) {
return PlatformCmpxchg<sizeof(D)>()(dest, compare_value, exchange_value);
}

第三层:硬件级别实现 (以 x86 为例)。这是最关键的一层。在 Linux x86 架构下,JVM 会直接写一段内联汇编。源码路径:src/hotspot/os_cpu/linux_x86/atomic_linux_x86.hpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
template<>
template<typename T>
inline T Atomic::PlatformCmpxchg<4>::operator()(T* dest, T compare_value, T exchange_value) const {
// 这里的 __asm__ 表示内联汇编
// 而 volatile 则告诉编译器不要优化这段代码
__asm__ volatile (

// lock 前缀:硬件锁,lock 前缀的作用是:
//// 总线锁/缓存锁:在执行该指令时,锁定北桥信号或对应的缓存行(Cache Line)。
//// 禁止重排序:它具有 Memory Barrier(内存屏障) 的效果,确保之前的写操作对所有核心可见。

// cmpxchgl 指令:
//// l 代表 Long(在汇编里指 32 位整数,对应 Java 的 int)。
//// 它的逻辑是:比较 eax 寄存器里的值(预期值)和内存地址里的值。如果相等,就把新值写入内存;如果不等,就把内存里的值读入 eax。

"lock cmpxchgl %1,(%3)" // 核心指令:lock cmpxchg
: "=a" (exchange_value) // 输出:将结果放入 eax 寄存器
: "r" (exchange_value), // 输入:%1,要交换的新值
"a" (compare_value), // 输入:eax,预期值
"r" (dest) // 输入:%3,目标内存地址
: "cc", "memory" // 损坏部分:告诉编译器内存和标志位已被修改
);
// 在汇编代码末尾的 "memory" 关键字非常重要。
// 它告诉编译器:“我这段代码直接改了内存,你之前缓存在寄存器里的数据可能失效了,请重新从内存读取。” 这就是 Volatile 语义 在底层的物理实现方式之一。
return exchange_value;
}

相比于传统的锁,CAS 的 “节省” 体现在避免了内核态切换(Context Switch)。

  • 传统锁(重量级):如果拿不到锁,CPU 会把当前线程挂起(Park),这涉及到从 “用户态” 切换到 “内核态”,保存寄存器快照,存入调度队列。这套流程需要消耗数千个 CPU 周期。
  • CAS(无锁):它只是一条普通的汇编指令。如果失败了,线程还在运行,只需在循环里再执行一次指令。在竞争不激烈时,它比 “挂起再唤醒” 快得多。
CAS(无锁)

底层物理原理:MESI 协议与总线仲裁。CAS 能够实现原子性且高效,依赖的是 CPU 硬件内部的缓存一致性协议,最常见的是 MESI 协议。在早期的 CPU 中,lock 前缀真的会锁住总线(Bus),导致其他核心无法访问内存,这确实很浪费。但现代 CPU 使用的是缓存锁定:

  • 当一个核心要执行 lock cmpxchgl 时,它会尝试获取该内存地址所在的缓存行(Cache Line)的独占权(Exclusive)。
  • 如果该缓存行已经在其 L1/L2 缓存中,CPU 只需在内部标记该行被锁定,而不需要去锁总线。

真正物理成本在硬件冲突判定,这是 CAS 真正消耗资源的地方(电子信号在总线上的往返时间):

  • 在执行 lock cmpxchgl 指令时,Core 1 的指令流水线(Pipeline)会卡住。
    • Core 1 发出 RFO。
    • 它必须等待总线仲裁,等待 Core 2/3 的 ACK 回执。这个过程可能消耗几十或上百个时钟周期(几十纳秒),相比传统的重量级锁动不动消耗几千甚至上万个时钟周期(~10 微秒),效率有百倍的提升。
    • 在此期间,Core 1 的这个特定执行单元是停滞的。这就是物理意义上的消耗。
  • 如果 RFO 拿到了,执行了 cmpxchg,但发现内存里的值已经被别人改了(Compare 失败):

    • 指令结束:这条 cmpxchg 指令就算执行完了,它会返回一个 “失败” 的结果。
    • Java层面:在我们的 MyAQS 自旋锁里,你会看到一个 for(;;)。CAS 失败后,程序会立刻发起下一次循环,重新尝试执行指令。
  • 如果 Core 1 和 Core 2 同时发送 RFO 想改同一个地址,总线仲裁器(Bus Arbiter)会根据算法(如轮询或固定优先级)判定谁赢。输掉的核心必须撤回请求,等赢家改完并释放缓存行后,再重新发起 RFO。在高并发下,大量的 CPU 周期都浪费在了 “等回执” 和 “等仲裁” 上。虽然 CPU 占用率看着是 100%,但实际在做有用功(有效指令执行)的时间比例很低。这就是总线风暴(Bus Storm)。

传统重量级锁:钱花在哪了?

用户态到内核态的切换 (Trap):当 synchronized 升级为重量级锁,或者 ReentrantLock 竞争失败导致线程挂起时,JVM 会调用操作系统的底层接口(如 Linux 的 futex)。

  • CPU 必须停止当前指令流,执行一条 syscall 指令。
  • 硬件要进行特权级检查。

上下文切换 (Context Switch):这是最沉重的负担。

  • 保存现场:把当前线程的寄存器值、程序计数器(PC)、堆栈指针(SP)存入内存(PCB 进程控制块)。
  • 调度决策:操作系统内核决定下一个该谁运行。
  • 恢复现场:从内存读取另一个线程的状态,装载进寄存器。
  • TLB/缓存失效:新的线程运行在不同的内存地址空间,CPU 的 TLB(地址转换快照) 和 L1/L2 缓存可能全部作废,导致新线程刚开始跑的时候极慢。

需要注意:CAS 也不是所有场景下都有优势。如果互斥方法执行时间较长(排队时间很长),CAS 情况下 CPU 会一直处于 100% 的忙碌状态,做无效的自旋。如果 100 个线程自旋 1 毫秒,消耗的能量和对总线的压力是灾难性的。而对重量级锁而言,虽然切换的那一下很贵,但一旦线程睡着了,它完全不消耗 CPU 资源。CAS 适合 ‘短平快’ 的场景,即预期竞争很快就能结束。如果你预见到这把锁会被持有很多秒,请毫不犹豫地使用阻塞锁(重量级锁),因为保护 CPU 的‘体力’(时钟周期)比那点切换开销更重要。


LockSupport 的底层实现

实际上,LockSupport 也是 Unsafe 的 “高级包装”,而 Unsafe 是 JVM 留给 Java 通往 C++ 与操作系统的 “后门”。翻开 LockSupport 的源码,你会发现它的每一个核心方法(park, unpark)最终都调用了 Unsafe:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class LockSupport {
// ...
private static final Unsafe U = Unsafe.getUnsafe();
private static final long PARKBLOCKER = U.objectFieldOffset(Thread.class, "parkBlocker");
private static final long TID = U.objectFieldOffset(Thread.class, "tid");

// LockSupport 源码片段
public static void park(Object blocker) {
Thread t = Thread.currentThread();
U.putReferenceOpaque(t, PARKBLOCKER, blocker); // 1. 先把当前线程的 “病历本” 填好(设置 parkBlocker,通常是 AQS 实例或 Lock 实例)
U.park(false, 0L); // 2. 调用 Parker 的底层方法真正挂起线程
U.putReferenceOpaque(t, PARKBLOCKER, null); // 3. 醒来后把当前线程的 “病历本” 清空
}

public static void unpark(Thread thread) {
if (thread != null) {
U.unpark(thread);
}
}
// ...
}

执行链条:

  • Java 层调用 U.park()。
  • JVM 根据方法名找到对应的 C++ 实现函数(在 HotSpot 源码中通常是 Unsafe_Park)。
  • C++ 函数内部获取当前线程对应的 C++ Parker 对象。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package jdk.internal.misc;

/**
* 而 Unsafe 并不是用 Java 实现的,它的方法都被标记为 native。这意味着它的逻辑是在 JVM(HotSpot)的 C++ 源码中实现的。
*
* 为什么 Unsafe 能直接操作内存?
* JNI 桥梁:Unsafe 通过 Java Native Interface (JNI) 进入 C++ 环境。
* 地址寻址:C++ 具有操作物理地址的能力。当 Java 传入一个对象引用和 Offset(偏移量)时,C++ 代码直接将它们相加:TargetAddress = ObjectAddress + Offset。
* 直接读写:计算出地址后,直接使用指针(*(int*)TargetAddress)进行存取,绕过了 JVM 的所有安全检查和字段访问限制。
*/
public final class Unsafe {
// ...
@IntrinsicCandidate
public native void park(boolean isAbsolute, long time);

@IntrinsicCandidate
public native void unpark(Object thread);
// ...
}

park 和 unpark 在 C++ 层面是基于线程级许可机制实现的每个 Java 线程在 JVM 内部都关联一个 Parker 实例。它的核心是一个计数器 _counter(即我们常说的 Permit)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
// 在 Linux/Unix 环境下,无论 JVM 的 ObjectMonitor(synchronized 的核心)还是 Parker(LockSupport.park 的核心),最终都会调用 POSIX Threads 库提供的同步原语。
// 两者最终都是通过操作系统内核的 pthread_mutex_lock / pthread_mutex_unlock 来保证操作的原子性。
// 两者都使用了 pthread_cond_wait / pthread_cond_signal 这种“条件变量”机制来实现线程的挂起与唤醒。

// POSIX Threads 是 Unix/Linux 环境下多线程的标准接口,Pthreads 主要提供以下四种同步原语,它们也是 Java 中 synchronized、ReentrantLock 和 Object.wait/notify 的灵魂碎片。
// 之所以称其为 “原语”,是因为它们具备以下两个关键特性:
///// 原子性:这些操作是由硬件指令(如 CAS - Compare and Swap)和操作系统内核共同支撑的,不可被中断。
///// 不可分割性:它们是构建更复杂同步机制(如 Java 的 AQS、CountDownLatch、CyclicBarrier)的最小、最基本的单元。
// 另外,这四种同步原语是:
// A. 互斥锁 (Mutex - Mutual Exclusion),这是最常用的原语。它保证在同一时刻,只有一个线程能访问某个资源。它只有“锁定”和“解锁”两种状态。
///// 它的特性是谁上锁,谁解锁。如果一个线程试图获取已被锁定的 Mutex,它会被内核挂起(进入睡眠),直到锁被释放。JVM 中的 _mutex 本质上就是对 pthread_mutex_t 的封装。
// B. 条件变量 (Condition Variables),互斥锁解决了“谁能进”的问题,而条件变量解决了“什么时候进”。它允许线程在某个特定条件不满足时挂起自己,直到另一个线程发出信号(Signal)告诉它条件变了。
///// 核心操作:pthread_cond_wait(等待)和 pthread_cond_signal(唤醒)。注意条件变量必须配合互斥锁使用,以防止在“判断条件”和“进入等待”之间发生竞争。
// C. 读写锁 (Read-Write Locks),一种更精细的互斥锁。允许多个线程同时读资源,但在有线程写资源时,必须排他。应用于典型的“读多写少”场景,能极大地提升并发性能。
// D. 信号量 (Semaphores),虽然信号量技术上属于 POSIX 实时扩展(sem_init),但常与 Pthreads 配合。如果信号量的计数器 > 0,线程可以通过并将计数减 1;如果为 0,则等待。
///// 与 Mutex 不同,信号量不要求“谁加锁谁解锁”,因此常用于生产者-消费者模型。

// synchronized:底层对应的 C++ 类是 ObjectMonitor。它内部包含了 _WaitSet、_EntryList 和一个 _owner 字段,这些字段的修改都必须在 _mutex 的保护下进行。
// Parker:底层对应的 C++ 类就是 Parker。它内部维护了一个状态变量 _counter(即 permit),而对这个 _counter 的操作同样由一个 _mutex 保护。

// 虽然底层的“砖块”是一样的,但上层的“建筑结构”决定了它们完全不同的行为:
// synchronized (ObjectMonitor):管程模型 (Monitor),绑定在 对象 (Object) 上,notify 必须在 wait 之后才有意义(否则通知丢失),存在于 Object 的 Mark Word 指向的监视器中。
// park/unpark (Parker):许可模型 (Permit),绑定在 线程 (Thread) 上,unpark 可以在 park 之前执行,存在于每个线程私有的 Parker 实例中。

// 在最底层的操作系统(OS)层面两者依赖了 _mutex(互斥锁)或类似的同步原语,但它们的封装路径和设计意图完全不同。
// Synchronized Monitor = { Owner,WaitSet,EntryList,Mutex lock},它的逻辑是:“抢到锁的进屋,抢不到的在门口排队。”
// Parker (Permit) = { Counter ∈ [0,1],CondVar,Mutex state },它的逻辑是:“我这有一张票,你来了就拿走,没来我先留着,下次你一露面就能拿走。”

// 当一个线程执行 park() 时:
// 1.它需要先获取 _mutex。
// 2.检查 _counter 是否大于 0。
// 3.如果大于 0,消耗掉 permit 并立即返回;如果为 0,则进入 pthread_cond_wait 等待。
// 4.在这一系列判断和修改状态的过程中,必须加锁(即 _mutex),否则会出现并发竞争导致的逻辑错误。

/**
* Parker 的定义
*/
class Parker : public os::PlatformParker {
private:
volatile int _counter ; // 许可计数器:0 代表无许可,1 代表有许可
// ... 其他属性
public:
Parker() : _counter(0) {}
void park(bool isAbsolute, jlong time);
void unpark();
};


/**
* park 的核心实现 (os_linux.cpp)
* 核心逻辑:如果已经有许可,直接消费并返回;否则,进入等待。
*/
void Parker::park(bool isAbsolute, jlong time) {
// 1. 原子交换:如果 _counter > 0,说明之前有人给过 unpark 许可
// 直接把 _counter 设为 0 并返回,线程不需要阻塞
if (Atomic::xchg(0, &_counter) > 0) return;

Thread* thread = Thread::current();
assert(thread != NULL, "invariant");

// 2. 检查当前线程是否被中断,如果已被中断,直接返回
if (Thread::is_interrupted(thread, false)) return;

// 3. 进入底层的互斥锁区域
pthread_mutex_lock(_mutex);

// 4. 二次检查:如果在加锁期间有人给了许可,直接解锁返回
if (_counter > 0) {
_counter = 0;
pthread_mutex_unlock(_mutex);
return;
}

// 5. 真正的阻塞动作:调用操作系统的条件变量等待
// 这会让出 CPU,直到被 unpark 唤醒或超时
if (time == 0) {
pthread_cond_wait(_cond, _mutex);
} else {
// 处理带时间限制的 park (parkNanos)
os::Linux::safe_cond_timedwait(_cond, _mutex, &absTime);
}

// 6. 被唤醒后,将许可重置为 0
_counter = 0;
pthread_mutex_unlock(_mutex);
}



/**
* unpark 的核心实现 (os_linux.cpp)
* 核心逻辑:无论调用多少次 unpark,许可上限永远是 1。
*/
void Parker::unpark() {
int s ;
pthread_mutex_lock(_mutex);
s = _counter;
_counter = 1; // 无论之前是多少,都设为 1

if (s < 1) {
// 如果之前状态是 0,说明可能有人在睡觉,去叫醒它
pthread_cond_signal(_cond);
}

pthread_mutex_unlock(_mutex);
}
  • unpark(thread):将该线程的 Permit 置为 1。如果线程正在 park 阻塞中,通过底层操作系统的条件变量(Condition Variable)叫醒它。
  • park():检查 Permit。
    • 如果是 1,立刻消费掉它(变回 0)并返回(这就是为什么先 unpark 再 park 不会阻塞)。
    • 如果是 0,调用操作系统的 pthread_cond_wait(Linux/Unix)或 WaitForSingleObject(Windows)。


显示锁们

ReentrantLock 实际案例

解决超卖问题的库存扣减(并发资源竞争)

这是分布式系统或电商场景中最经典的案例。虽然现在常用 Redis 分布式锁,但在单机环境下,ReentrantLock 是性能和可靠性的首选。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class StockService {
private int stock = 100; // 剩余库存
private final ReentrantLock lock = new ReentrantLock();

public void deductStock(int count) {
lock.lock(); // 阻塞获取,确保同一时间只有一个线程扣减
try {
if (stock >= count) {
// 模拟业务逻辑
Thread.sleep(10);
stock -= count;
System.out.println(Thread.currentThread().getName() + " 扣减成功,剩余:" + stock);
} else {
System.out.println(Thread.currentThread().getName() + " 库存不足");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock(); // 必须在 finally 中释放,防止异常导致死锁
}
}
}

带有超时的尝试锁(避免长时间阻塞)

在高性能接口中,我们不希望线程因为拿不到锁而无限期挂起(这会导致 Tomcat 线程池耗尽)。利用 tryLock() 可以实现 “拿不到就跑” 或者 “等一会再跑” 的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void tryOrder(long timeout, TimeUnit unit) {
// 尝试在指定时间内获取锁
try {
if (lock.tryLock(timeout, unit)) {
try {
// 执行下单逻辑
doCreateOrder();
} finally {
lock.unlock();
}
} else {
// 超过时间没拿到锁,直接给用户返回“系统繁忙”
throw new RuntimeException("系统繁忙,请稍后再试");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

多条件唤醒的阻塞队列(Condition 的精准接力)

这是 ReentrantLock 相比 synchronized 最强大的地方——支持多个 Condition。它允许你精准地唤醒某类线程(比如只唤醒消费者,或者只唤醒生产者),这也是 Java ArrayBlockingQueue 的底层实现原理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class MyBuffer {

private final ReentrantLock lock = new ReentrantLock();
// 定义两个“休息室”
private final Condition producerCondition = lock.newCondition();
private final Condition consumerCondition = lock.newCondition();
private final Object[] items = new Object[10];
private int count = 0;

public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length) {
//【防止虚假唤醒的标准姿势】醒来后回到 while 开头,再次检查 count
producerCondition.await(); // 缓冲区满了,生产者去 producerCondition 休息室等着
}
items[count++] = x;
consumerCondition.signal(); // 生产了东西,唤醒在 consumerCondition 休息室等的消费者
} finally {
lock.unlock();
}
}

public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
//【防止虚假唤醒的标准姿势】醒来后回到 while 开头,再次检查 count
consumerCondition.await(); // 没东西了,消费者去 consumerCondition 休息室等着
}
Object x = items[--count];
producerCondition.signal(); // 消费了空间,唤醒在 producerCondition 休息室等的生产者
return x;
} finally {
lock.unlock();
}
}
}

灵活性:tryLock() 可以防止死锁和长时间卡顿。
公平性:new ReentrantLock(true) 可以开启公平锁(按排队顺序拿锁,虽然性能略低)。
精准度:Condition 可以实现比 notifyAll() 更细粒度的线程调度。


关于 Condition 的介绍

在上面 MyBuffer 的案例中,我们已经看到了Condition 的使用。实际上,在没有 Condition 之前,我们只能用 Object.wait() 和 notify()。但它们有一个致命缺点:无法定向唤醒。如果你调用 notifyAll(),所有在等待的线程(无论是生产者还是消费者)都会被唤醒去抢锁,这会造成严重的 “锁竞争” 和“无效唤醒”。Condition 的出现解决了这个问题:

  • 多休息室机制:一个 Lock 可以创建多个 Condition 实例。
  • 精准唤醒:你可以让生产者去 conditionA 等待,消费者去 conditionB 等待。当空间足够时,只唤醒 conditionA 里的生产者,互不干扰。
特性 Object (wait/notify) Condition (await/signal)
关联锁 对象的 Monitor 锁 ReentrantLock 实例
等待队列数量 每个对象仅 1 个 每个 Lock 可拥有多个
定向唤醒 不支持(只能 notify 或 notifyAll) 支持
超时/中断响应 支持 支持更丰富的 API (如 awaitNanos)
性能 高并发下竞争激烈 细粒度控制,性能更好

要看透 Condition,你只需要理解 AQS 维护的两套队列:

  • 同步队列 (Sync Queue):这就是我们之前研究的 CLH 队列,存放的是正在抢锁的线程。
  • 等待队列 (Wait Queue / Condition Queue):这是 Condition 对象内部维护的一个单向链表。

await() 的过程:

  • 入队:当前线程(已持有锁)被封装成 Node,放入该 Condition 的等待队列中。
  • 释放锁:彻底释放当前持有的锁(state 归零),以便其他线程能拿到锁。
  • 挂起:调用 LockSupport.park(this),线程在这一行停住。

signal() 的过程:

  • 出队:将 Condition 等待队列中的第一个节点移除。
  • 转移:将这个节点重新放入 AQS 的同步队列(Sync Queue)的队尾。
  • 唤醒:此时该线程并没有立即执行,而是开始排队抢锁。抢到锁后,它会从 await() 方法处醒来并继续执行。

避坑指南:

  • 永远在 while 循环中使用 await():这是为了防止虚假唤醒(Spurious Wakeup)。线程醒来后必须再次检查条件(如 count == 0),确保条件真的满足。
  • 必须持有锁:调用 await() 或 signal() 之前必须先 lock.lock(),否则会抛出 IllegalMonitorStateException(这和调用 wait() 必须先进入 synchronized 块是一个道理)。


ReentrantReadWriteLock

我们已经知道,在 AQS 中,state 是一个 32 位的整数。ReentrantReadWriteLock 将其 “切” 成了两半:State = (读锁次数 << 16)+ 写锁重入次数

  • 高 16 位:代表读锁(Shared)的状态,记录持有读锁的次数。
  • 低 16 位:代表写锁(Exclusive)的状态,记录写锁的重入次数。

这种设计利用了位运算的极速特性:

  • 获取读状态:state >>> 16(无符号右移)。
  • 获取写状态:state & 0x0000FFFF(位与掩码)。

读写锁的核心原则是:读读兼容,读写互斥,写写互斥。适用于 读多写少(如缓存、配置项)的场景。

写锁(独占模式),当线程尝试获取写锁时:

  • 检查 state 是否为 0。
  • 如果 state != 0 且 读锁次数 > 0,获取失败(读锁占着位,写锁进不来)。
  • 如果 state != 0 且 写锁次数 > 0,但持有者不是自己,获取失败。
  • 否则,CAS 增加低 16 位的值。

读锁(共享模式),当线程尝试获取读锁时:

  • 检查写锁次数(低 16 位)。
  • 如果写锁次数不为 0,且持有写锁的不是当前线程,获取失败(写锁占着位,读锁进不来)。
  • 否则,CAS 增加高 16 位的值。由于是共享模式,多个线程可以同时成功。

特殊机制,锁降级(Lock Downgrading):持有写锁的线程,可以再去获取读锁,然后释放写锁。

  • 流程:获取写锁 → 获取读锁 → 释放写锁 → 依然持有读锁。
  • 目的:保证数据可见性。如果你先释放写锁再抢读锁,中间可能被别的写线程插足修改了数据。通过锁降级,可以在不释放 “独占权” 的情况下,平滑过渡到 “共享权”。
  • 注意不支持锁升级(持有读锁去抢写锁会造成死锁,因为多个读线程可能都在等对方放锁)。

案例一:高并发本地缓存(最经典)

如果你在 Spring Boot 中自己实现一个简单的缓存池(不使用 Redis),用 HashMap 配合 ReentrantLock 会导致所有读操作也排队,性能极差。而读写锁可以让上千个线程同时读取,只有在数据更新时才阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class CacheDataCenter {
private final Map<String, Object> map = new HashMap<>();
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock r = rwl.readLock();
private final Lock w = rwl.writeLock();

// 读操作:并发执行,互不阻塞
public Object get(String key) {
r.lock();
try {
return map.get(key);
} finally {
r.unlock();
}
}

// 写操作:独占执行,且阻塞所有的读
public void put(String key, Object value) {
w.lock();
try {
map.put(key, value);
} finally {
w.unlock();
}
}
}

案例二:锁降级(保证数据可见性)

常用于 “读取-判断-更新” 的逻辑中。比如:你读取了一个配置,发现过期了,需要更新它,但在更新完后你还要继续使用这个配置进行业务处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void processCachedData() {
r.lock();
if (!cacheValid) {
// 必须先释放读锁,才能获取写锁
r.unlock();
w.lock();
try {
// 再次检查状态,因为抢写锁期间可能被别人改了(双重检查)
if (!cacheValid) {
data = fetchFromDB();
cacheValid = true;
}
// 【核心:锁降级】在释放写锁前,先获取读锁
r.lock();
} finally {
w.unlock(); // 释放写锁,但依然持有读锁
}
}

try {
use(data); // 此时持有读锁,保证在此期间没有别的线程能写数据
} finally {
r.unlock();
}
}

注意:虽然读写锁看起来很美,但它有三个著名的缺陷:
开销更大:它的内部逻辑(处理位运算、两个 Lock 对象、复杂的传播逻辑)比 ReentrantLock 复杂得多。如果读操作耗时极短(比如只是简单的 return a + b),读写锁的额外开销反而会让吞吐量下降。
写饥饿(Write Starvation):如果读线程源源不断,写线程可能永远抢不到锁,一直等下去。
不支持锁升级:持有读锁时申请写锁会导致死锁。


StampedLock 原理和案例

StampedLock 在 JDK 8 中被引入,它并不是 AQS 的子类,而是通过一个 long 类型的 Stamp(戳记)和一套完全不同的状态机来实现的。它的精髓在于引入了 “乐观读”,这让读操作在大多数情况下完全不加锁。

① 乐观读 (Optimistic Reading) —— 性能杀手锏,这是它快的根本原因。

  • 逻辑:它并不真正加锁,而是返回一个 stamp。所以读操作不会阻塞写操作。
  • 过程:线程读取数据,读完后通过 validate(stamp) 检查。如
    • 果期间没有写操作,校验成功,直接返回;
    • 如果校验失败(说明有写操作进来了),它会升级为传统的悲观读锁。

② 悲观读 (Read Lock)——与 ReentrantReadWriteLock 的读锁类似,会阻塞写操作,但允许多个读线程并发。

③ 写锁 (Write Lock)——独占锁,阻塞一切读写操作。

StampedLock 的状态表示(它使用一个 long state):

  • 低 7 位:记录读锁的数量。
  • 第 8 位:写锁标志位。
  • 其余高位:版本号(每次写操作都会增加版本号,用于 validate 校验)。

典型案例:高性能坐标系点位读取

这是 StampedLock 官方文档中最经典的案例。假设我们要读取一个二维坐标 (x, y)​,由于 x 和 y 必须保持一致性,使用乐观读可以极大提升效率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class Point {
private double x, y;
private final StampedLock sl = new StampedLock();

// 读方法:演示乐观读
void moveIfAtOrigin(double newX, double newY) {
// 1. 尝试获取一个乐观读戳记
long stamp = sl.tryOptimisticRead();
double curX = x, curY = y;

// 2. 检查在读取过程中,有没有写锁被获取过
if (!sl.validate(stamp)) {
// 3. 校验失败,说明数据可能被改了,升级为悲观读锁
stamp = sl.readLock();
try {
curX = x;
curY = y;
} finally {
sl.unlockRead(stamp);
}
}

// 业务逻辑判断
if (curX == 0.0 && curY == 0.0) {
long ws = sl.writeLock(); // 升级为写锁
try {
x = newX;
y = newY;
} finally {
sl.unlockWrite(ws);
}
}
}
}


同步器们

CountDownLatch 案例

CountDownLatch基于 AQS 的共享模式实现。它的逻辑非常纯粹:就是一个倒计时门闩。

状态定义 (State):AQS 的 state 在这里被定义为计数器,new CountDownLatch(N) 会将 state 设置为 N。

阻塞逻辑 (await):

  • 线程调用 await() 时,会触发 tryAcquireShared。
  • 判断准则:只要 state != 0,tryAcquireShared 就返回负数(表示获取失败)。
  • 结果:线程进入 AQS 同步队列阻塞(LockSupport.park),就像在门口等待起跑指令的运动员。

释放逻辑 (countDown):

  • 线程调用 countDown() 时,会触发 tryReleaseShared。
  • 操作:通过 CAS 将 state 减 1。
  • 关键点:只有当 state 减到 0 的那一刻,tryReleaseShared 才会返回 true。
  • 传播(Propagate):一旦 state 归零,AQS 会调用 doReleaseShared,从头节点开始,依次唤醒所有在队列中等待的线程。

案例一:多服务并行调用的“数据聚合”

在微服务架构中,一个详情页可能需要调用:商品服务、库存服务、评价服务、营销服务。为了提高响应速度,我们会并行调用,最后统一汇总。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class DataAggregationDemo {

public static void main(String[] args) throws InterruptedException {
// 模拟 4 个并发任务
CountDownLatch latch = new CountDownLatch(4);
ExecutorService executor = Executors.newFixedThreadPool(4);

System.out.println("--- 页面请求开始,启动并行查询 ---");
long start = System.currentTimeMillis();

// 模拟四个接口调用
executor.execute(new Task("商品详情", 200, latch));
executor.execute(new Task("库存状态", 150, latch));
executor.execute(new Task("用户评价", 500, latch));
executor.execute(new Task("营销活动", 300, latch));

// 主线程等待,最多等 1 秒,防止某个接口卡死导致整个页面崩溃
// 服务员在门口等 1 秒,如果菜还没出来,他直接回来告诉你:“菜太慢了,别等了,先吃别的吧。”
//// 线程进入 TIMED_WAITING 状态,之后:
//// 如果 1 秒内 计数器归零了:线程被唤醒,方法立即返回 true。
//// 如果 1 秒到了 计数器还没归零:JVM 内部的定时器会唤醒该线程,线程发现自己是因为“超时”醒来的,于是自动退出同步队列,方法返回 false。
boolean success = latch.await(1, TimeUnit.SECONDS);
if (success) {
System.out.println("--- 所有数据获取成功,耗时: " + (System.currentTimeMillis() - start) + "ms ---"); // 504ms
} else {
System.out.println("--- 部分接口超时,进行降级处理 ---");
}
executor.shutdown();
}

static class Task implements Runnable {
private final String name;
private final int cost;
private final CountDownLatch latch;

Task(String name, int cost, CountDownLatch latch) {
this.name = name;
this.cost = cost;
this.latch = latch;
}

@Override
public void run() {
try {
Thread.sleep(cost); // 模拟网络耗时
System.out.println(name + " 查询完成");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown(); // 无论成功失败,必须 countDown
}
}
}
}

模拟高并发抢购(发令枪模式)

这种模式下,CountDownLatch 的计数器设为 1。所有的“运动员”线程都在等待这一个信号,从而实现瞬间的爆发。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class HighConcurrencyDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch fireGun = new CountDownLatch(1);
int runners = 5;

for (int i = 0; i < runners; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " 已就位...");
fireGun.await(); // 所有人在这里阻塞,等待 state 变为 0
System.out.println(Thread.currentThread().getName() + " 冲击秒杀接口!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "运动员-" + i).start();
}

Thread.sleep(2000); // 准备时间
System.out.println("--- 砰!发令枪响 ---");
fireGun.countDown(); // state 归零,触发 AQS 的传播唤醒
}
}

分段大数据处理(任务拆解)

假设有一个巨大的任务需要拆分成多个子任务,主线程需要汇总所有子任务的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class BatchProcessDemo {
public static void main(String[] args) throws InterruptedException {
int totalRows = 1000;
int threadCount = 10;
int rowsPerThread = totalRows / threadCount;

CountDownLatch latch = new CountDownLatch(threadCount);
AtomicInteger totalProcessed = new AtomicInteger(0);

for (int i = 0; i < threadCount; i++) {
final int startRow = i * rowsPerThread;
new Thread(() -> {
try {
// 模拟分段处理逻辑
System.out.println(Thread.currentThread().getName() + " 正在处理行: " + startRow + " 到 " + (startRow + rowsPerThread));
Thread.sleep((long) (Math.random() * 1000));
totalProcessed.addAndGet(rowsPerThread);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown();
}
}).start();
}

latch.await(); // 等待所有子线程收工
System.out.println("--- 任务全部完成,总计入库: " + totalProcessed.get() + " 行数据 ---");
}
}


Semaphore 原理和案例

如果说 CountDownLatch 是一个 “倒计时发令枪”,那么 Semaphore 就是一个 “共享资源池的管理员”,它同样是基于 AQS 的 共享模式实现的。Semaphore 维护了一组 “许可”(Permits)。线程在访问资源前必须先获得许可,访问结束后释放许可。不同于 CountDownLatch 的一次性使用,Semaphore 是可以复用的(借了还,还了再借)。

状态定义 (State):AQS 的 state 在这里代表 “当前剩余的许可数量”。new Semaphore(5) 会将 state 设置为 5。

获取许可 (acquire):线程调用 acquire() 时,会触发 tryAcquireShared,它会尝试用 CAS 将 state 减去请求的数量。如果剩余 state >= 0 则 CAS 成功,线程继续执行;如果剩余 state < 0 则 CAS 失败,线程进入 AQS 同步队列阻塞(挂起),等待别人归还许可。

释放许可 (release),线程调用 release() 时,触发 tryReleaseShared。通过 CAS 将 state 加 1(归还许可)。

  • 逻辑:通过 CAS 将 state 加 1(归还许可)。
  • 传播(Propagate):一旦许可归还成功,它会调用 doReleaseShared,唤醒同步队列中排在最前面的线程,让它再次尝试抢票。

CountDownLatch最核心的价值在于 “限流”(Control Concurrency)。

案例:数据库连接池 / 资源池限流

假设数据库只允许 10 个并发连接,如果 100 个线程同时去连,数据库会宕机。我们可以用 Semaphore 挡在前面。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class DatabasePoolDemo {
public static void main(String[] args) {
// 模拟只有 3 个数据库连接资源
Semaphore semaphore = new Semaphore(3);
ExecutorService executor = Executors.newFixedThreadPool(10);

for (int i = 0; i < 10; i++) {
final int threadId = i;
executor.execute(() -> {
try {
System.out.println("用户 " + threadId + " 正在排队等连接...");
// acquire(): 死等。除非拿到票或被中断,否则不回头,适用于必须拿到资源的强依赖业务。
// tryAcquire(): 不等待。拿得到就返回 true,拿不到立刻返回 false。适用于高性能场景,拿不到就走别的逻辑(如降级)。
// tryAcquire(time, unit): 限时等。等一段时间,还没票就放弃。最常用的保护手段,防止线程池被卡死。
semaphore.acquire(); // 抢占连接。线程 park,直到它成功拿到 “许可证”。
System.out.println("用户 " + threadId + " >>> 成功获取连接,开始查询数据...");
Thread.sleep((long) (Math.random() * 3000)); // 模拟查询耗时
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("用户 " + threadId + " <<< 释放连接");
semaphore.release(); // 归还连接
}
});
}
executor.shutdown();
}
}


CyclicBarrier 原理和案例

CyclicBarrier(循环栅栏)基于 ReentrantLock 和 Condition 构建,它的核心逻辑是:人齐了才发车,且这辆车可以反复开。其工作机制更像是一个 “关卡”。

计数器逻辑 (parties & count):

  • parties:初始设定的参与人数(比如 5 人组队)。
  • count:当前还在等待的人数。每当一个线程调用 await(),count 就会减 1。

阻塞与唤醒 (Condition.await / signalAll):

  • 当线程调用 await() 时,获取独占锁 ReentrantLock,count–。
  • 如果 count > 0,调用 trip.await(),线程进入 Condition 的等待队列并释放锁。
  • 如果 count == 0,说明最后一个人到了!此时会执行可选的 barrierAction(发车指令),然后调用 trip.signalAll() 唤醒所有人。

循环的奥秘 (Generation):

  • 一旦所有人被唤醒,CyclicBarrier 会开启一个 “新一代 (New Generation)”。
  • 它会把 count 重新重置为 parties。这就是为什么它叫 Cyclic(循环)。

CyclicBarrier 通常用于 “多线程分步协同计算” 的场景。

案例:并行计算结果汇总(分阶段任务)

假设一个复杂的计算任务分为三个阶段,必须所有线程都完成第一阶段,才能开始第二阶段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class CyclicBarrierDemo {
public static void main(String[] args) {
int players = 3;
// 定义栅栏:3个人到齐后,由最后一个人顺便执行“宣布结果”的任务
CyclicBarrier barrier = new CyclicBarrier(players, () -> {
System.out.println("--- 所有人都到齐了,关卡开启! ---");
});

for (int i = 0; i < players; i++) {
final int id = i;
new Thread(() -> {
try {
System.out.println("玩家 " + id + " 正在加载地图...");
Thread.sleep((long) (Math.random() * 2000));

System.out.println("玩家 " + id + " 已到达第一关栅栏。");
// 只等 5 秒,如果人没齐,直接抛出 TimeoutException,防止死等!
barrier.await(5, TimeUnit.SECONDS); // 在此等待,直到 3 个人都执行到这一行

System.out.println("玩家 " + id + " 开始打怪...");
Thread.sleep((long) (Math.random() * 2000));

System.out.println("玩家 " + id + " 已到达第二关栅栏。");
barrier.await(); // 循环使用:再次等待

System.out.println("玩家 " + id + " 顺利通关!");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
玩家 2 正在加载地图...
玩家 1 正在加载地图...
玩家 0 正在加载地图...
玩家 1 已到达第一关栅栏。
玩家 2 已到达第一关栅栏。
玩家 0 已到达第一关栅栏。
--- 所有人都到齐了,关卡开启! ---
玩家 0 开始打怪...
玩家 1 开始打怪...
玩家 2 开始打怪...
玩家 1 已到达第二关栅栏。
玩家 2 已到达第二关栅栏。
玩家 0 已到达第二关栅栏。
--- 所有人都到齐了,关卡开启! ---
玩家 0 顺利通关!
玩家 1 顺利通关!
玩家 2 顺利通关!
处理方式 特点 适用场景
死等 (await) 简单但危险 确定任务百分百不会失败且不超时的环境
超时 (timed await) 安全,自动唤醒 最通用的后端接口处理方式
手动 reset 暴力重置,全员失败 需要统一回滚或重新开始的任务
Phaser 动态注销 智能降级,存活者继续 复杂的分布式计算,允许部分失败


Phaser 的原理和典型用法

Phaser(移相器)是 Java 7 引入的并发工具,堪称 JUC 家族中最复杂但也最强大的同步辅助类。

简单来说,Phaser = CyclicBarrier + CountDownLatch + 动态调整 + 分层架构

Phaser 的核心在于 Phase(阶段/相位)和 Parties(参与者)的动态解耦。

  • 动态注册(Dynamic Registration):CyclicBarrier 的人数在构造时就定死了。而 Phaser 允许你在运行过程中随时 register()(增加人)或 arriveAndDeregister()(减少人)。
  • 状态推进(Phase Advance):Phaser 内部维护一个 phase 数字。当所有注册的参与者都到达(arrive)时,phase 会自动加 1。这就像是游戏里的 “关卡数”。
  • 非阻塞与阻塞的灵活选择:
    • arrive():我到了,但我还要去干别的,不用等我。
    • arriveAndAwaitAdvance():我到了,我要在这里等大家都到齐再进下一关。

典型用法:解决“队友掉线”与“动态增减”

案例:支持“自动减员”的任务协同

这个例子展示了:如果一个线程在执行过程中崩溃,它如何 “优雅退出”,让剩下的线程继续运行,而不是大家一起卡死。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class PhaserDemo {
public static void main(String[] args) {
// 初始化 3 个参与者
Phaser phaser = new Phaser(3) {
// 每当一个阶段完成时,会自动触发这个方法(类似 CyclicBarrier 的 barrierAction)
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("\n=== 第 " + phase + " 阶段完成,当前剩余人数: " + registeredParties + " ===\n");
return registeredParties == 0; // 如果没人了,Phaser 就彻底终结
}
};

for (int i = 0; i < 3; i++) {
final int id = i;
new Thread(() -> {
try {
// 第一阶段
System.out.println("玩家 " + id + " 正在加载地图...");
Thread.sleep((long) (Math.random() * 2000));

// 模拟玩家 1 发生异常(比如你说的 1/0)
if (id == 1) {
throw new RuntimeException("玩家 1 网络断开...");
}

System.out.println("玩家 " + id + " 到达第一关。");
phaser.arriveAndAwaitAdvance(); // 到达并等待他人

// 第二阶段
System.out.println("玩家 " + id + " 开始打怪...");
Thread.sleep((long) (Math.random() * 2000));
phaser.arriveAndAwaitAdvance();

System.out.println("玩家 " + id + " 顺利通关!");
phaser.arriveAndDeregister(); // 结束并注销自己

} catch (Exception e) {
System.err.println("!!! 捕获到异常: " + e.getMessage());
// 【关键点】如果出事了,直接注销自己
// 这样 Phaser 的“总人数”会减 1,剩下的队友不需要等他,可以继续前进!
phaser.arriveAndDeregister();
}
}).start();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
玩家 2 正在加载地图...
玩家 0 正在加载地图...
玩家 1 正在加载地图...
玩家 2 到达第一关。
!!! 捕获到异常: 玩家 1 网络断开...
玩家 0 到达第一关。

=== 第 0 阶段完成,当前剩余人数: 2 ===

玩家 0 开始打怪...
玩家 2 开始打怪...

=== 第 1 阶段完成,当前剩余人数: 2 ===

玩家 2 顺利通关!
玩家 0 顺利通关!

=== 第 2 阶段完成,当前剩余人数: 0 ===


Exchanger 原理和案例

做一个形象的类比,Exchanger 就像两个人在公园长椅上交换情报:只有当两个人都坐下时,交换才会发生;如果只有一个人到了,他必须一直等着另一个人。Exchanger 是一个用于线程间协作的同步点,主要应用于两个线程在一个点上交换彼此的数据。

核心方法为 exchange(V x),这是一个阻塞式方法。配对逻辑如下:

  • 线程 A 到达交换点,调用 exchange(“情报A”)。
  • 由于 线程 B 还没到,线程 A 进入等待状态(WAITING)。
  • 线程 B 终于到了,调用 exchange(“情报B”)。
  • 关键时刻:Exchanger 内部将 A 的数据给 B,将 B 的数据给 A。
  • 两个线程同时被唤醒,继续执行。

底层机制:槽位 (Slot) 与 消除 (Elimination)

  • 单槽位:在低并发下,内部使用一个 Slot(槽位)。第一个到的线程把数据放进去,第二个到的线程取走并留下自己的数据。
  • 多槽位(Arena):在高并发下,为了减少竞争,Exchanger 内部会使用类似 LongAdder 的 Arena(竞技场)数组。不同的线程对会在不同的槽位上进行交换,极大地提高了吞吐量。

典型案例:双缓冲区(Pipeline Buffer)

这是 Exchanger 最经典、最无可替代的场景:生产者-消费者模型的双缓冲优化。

通常的生产者-消费者模型使用 BlockingQueue。但如果数据量极大,频繁的入队/出队会产生大量的锁竞争。对于这种情况,我们可以准备两个缓冲区(Buffer A 和 Buffer B)。生产者往 A 里塞数据,消费者从 B 里取数据。当生产者塞满了,消费者取空了,两人交换缓冲区。

特性 BlockingQueue Exchanger
数据流向 单向(生产 → 消费) 双向(互换)
参与人数 任意多个 必须成对 (2个)
性能优势 频繁锁竞争 极低的内存开销(直接引用交换)
典型比喻 传送带 秘密交易
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class ExchangerDemo {
public static void main(String[] args) {
Exchanger<List<String>> exchanger = new Exchanger<>();

// 生产者线程
new Thread(() -> {
List<String> buffer = new ArrayList<>();
try {
for (int i = 0; i < 2; i++) { // 模拟两轮交换
System.out.println("生产者: 正在填充缓冲区...");
Thread.sleep(2000);
buffer.add("数据为 " + Math.random());

System.out.println("生产者: 缓冲区已满,大小 " + buffer.size() + ",等待交换...");
// 交换:把满的 buffer 发出去,得到一个空的 buffer 回来
buffer = exchanger.exchange(buffer);
System.out.println("生产者: 交换成功,得到新缓冲区,大小: " + buffer.size());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();

// 消费者线程
new Thread(() -> {
List<String> buffer = new ArrayList<>();
try {
for (int i = 0; i < 2; i++) {
System.out.println("消费者: 正在处理现有的空缓冲区 (等待中)...");
// 交换:把空的 buffer 发出去,得到一个满的 buffer 回来
buffer = exchanger.exchange(buffer);

System.out.println("消费者: 拿到数据了,开始消费: " + buffer.get(0));
Thread.sleep(1000);
buffer.clear(); // 消费完清空
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
1
2
3
4
5
6
7
8
9
10
生产者: 正在填充缓冲区...
消费者: 正在处理现有的空缓冲区 (等待中)...
生产者: 缓冲区已满,大小 1,等待交换...
生产者: 交换成功,得到新缓冲区,大小: 0
生产者: 正在填充缓冲区...
消费者: 拿到数据了,开始消费: 数据为 0.23883715938894112
消费者: 正在处理现有的空缓冲区 (等待中)...
生产者: 缓冲区已满,大小 1,等待交换...
生产者: 交换成功,得到新缓冲区,大小: 0
消费者: 拿到数据了,开始消费: 数据为 0.6445937735171946

Exchanger 需要注意的问题:
死锁风险:如果你的线程数是奇数(比如 3 个线程调用 exchange),那么必然有一个线程会永远等下去,除非你使用带超时版本的 exchange(V x, long timeout, TimeUnit unit)。
垃圾回收:由于两个线程互相持有对方之前的对象引用,如果不注意手动清理,在某些复杂场景下可能会对 GC 造成微弱影响。


该怎么选同步组件?

JUC 同步组件的选择

你的并发场景属于哪种?

点击上方按钮获取建议...