Java 并发基础(五)
LockSupport 工具类
LockSupport 是个基础工具类,它的主要作用是挂起和唤醒线程, 该工具类是创建锁和其他同步类的实现基础。
LockSupport 类与每个使用它的线程都会关联一个许可证,在默认情况下调用 LockSupport 类的方法的线程是不持有许可证的。LockSupport 是使用 Unsafe 类实现的, 下面介绍 LockSupport 中的几个主要函数。
park
如果调用 park 方法的线程已经拿到了与 LockSupport 关联的许可证,则调用 LockSupport.park() 时会马上返回,否则调用线程会被禁止参与线程的调度,也就是会被阻塞挂起。如下代码直接在 main 函数里面调用 park 方法,最终只会输出 begin park!,然后当前 线程被挂起,这是因为在默认情况下调用线程是不持有许可证的。
1 | public static void main(String[] args) { |
在其他线程调用 unpark(Thread thread) 方法并且将当前线程作为参数时,调用 park 方法而被阻塞的线程会返回。另外,如果其他线程调用了阻塞线程的 interrupt() 方法,设置了中断标志或者线程被虚假唤醒,则阻塞线程也会返回。所以在调用 park 方法时最好也使用循环条件判断方式。需要注意的是,因调用 park() 方法而被阻塞的线程被其他线程中断而返回时并不会抛出 InterruptedException 异常。
1 | public static void main(String[] args) throws InterruptedException { |
1 | t1: 开始 park.. 【等2秒】 |
另外 park 方法还支持带有 blocker 参数的方法 void park(Object blocker) 方法,当线程 在没有持有许可证的情况下调用 park 方法而被阻塞挂起时,这个 blocker 对象会被记录到该线程内部。
1 | public static void park() { |
使用诊断工具可以观察线程被阻塞的原因,诊断工具是通过调用 getBlocker(Thread) 方法来获取 blocker 对象的,所以 JDK 推荐我们使用带有 blocker 参数的 park 方法 , 并且 blocker 被设置为 this,这样当在打印线程堆栈排查问题时就能知道是哪个类被阻塞了。例如下面的代码。
1 | public class Demo { |
运行代码后,使用 jstack pid 命令查看线程堆栈时可以看到如下输出结果。
1 | $ jps |
修改代码 (1) 为 LockSupport.park(this) 后运行代码,则 jstack pid 的输出结果为
1 | "main" #1 prio=5 os_prio=31 cpu=38.76ms elapsed=12.91s tid=0x00007ff79a008200 nid=0x2503 waiting on condition [0x000070000794b000] |
使用带 blocker 参数的 park 方法,线程堆栈可以提供更多有关阻塞对象的信息。
unpark
当一个线程调用 unpark 时,如果参数 thread 线程没有持有 thread 与 LockSupport 类关联的许可证,则让 thread 线程持有。如果 thread 之前因调用 park() 而被挂起,则调用 unpark 后,该线程会被唤醒。如果 thread 之前没有调用 park,则调用 unpark 方法后,再 调用 park 方法,其会立刻返回。修改代码如下。
1 | public static void main(String[] args) { |
1 | begin park! |
下面再来看一个例子以加深对 park 和 unpark 的理解。
1 | public static void main(String[] args) throws InterruptedException { |
1 | t1: 开始 park.. 【等2秒】 |
parkNanos(nanos)
和 park 方法类似,如果调用 park 方法的线程已经拿到了与 LockSupport 关联的许可证, 则调用 parkNanos(long nanos) 方法后会马上返回。该方法的不同在于,如果没有拿到许可证,则调用线程会被挂起 nanos 时间后修改为自动返回。有没有既挂起 nanos 时间,又记录阻塞对象的 park?有的,那就是 void parkNanos(Object blocker, long nanos) 方法。
parkUntil(blocker, deadline)
1 | public static void parkUntil(long deadline) { |
其中参数 deadline 的时间单位为 ms,该时间是从 1970 年到现在某一个时间点的毫秒 值。这个方法和 parkNanos(Object blocker, long nanos) 方法的区别是,后者是从当前算等待 nanos 秒时间,而前者是指定一个时间点,比如需要等到 2017.12.11 日 12:00:00,则把这个时间点转换为从 1970 年到这个时间点的总毫秒数。看一个例子:
1 | static class FIFOMutex { |
这是一个先进先出的锁,也就是只有队列的首元素可以获取锁。在代码(1)处,如 果当前线程不是队首或者当前锁已经被其他线程获取,则调用 park 方法挂起自己。
然后在代码 (2) 处判断,如果 park 方法是因为被中断而返回,则忽略中断,并且重置中断标志,做个标记,然后再次判断当前线程是不是队首元素或者当前锁是否已经被其他线程获取,如果是则继续调用 park 方法挂起自己。
然后在代码 (3) 中,判断标记,如果标记为 true 则中断该线程,这个怎么理解呢? 其实就是其他线程中断了该线程,虽然我对中断信号不感兴趣,忽略它,但是不代表其他线程对该标志不感兴趣,所以要恢复下。
基础的 AQS 组件
AQS 简介
AbstractQueuedSynchronizer 抽象同步队列简称 AQS,它是实现同步器的基础组件, 并发包中锁的底层就是使用 AQS 实现的。另外,大多数开发者可能永远不会直接使用 AQS,但是知道其原理对于架构设计还是很有帮助的。
#getExclusiveOwnerThread(): Thread
-tail: Node
-head: Node
-unsafe: Unsafe
-stateOffset: long
-headOffset: long
-tailOffset: long
-waitStatusOffset: long
-nextOffset: long
#tryAcquire(arg: int): boolean
#acquireShared(arg: int): void
#acquireInterruptibly(arg: int): void
#release(arg: int): boolean
#releaseShared(arg: int): boolean
#tryRelease(arg: int): boolean
#tryReleaseShared(arg: int): boolean
-lastWaiter: Node
+signalAll(): void
+await(): void
#EXCLUSIVE: Node
#CANCELLED: int
#SIGNAL: int
#CONDITION: int
#PROPAGATE: int
#waitStatus: int
#prev: Node
#next: Node
#thread: Thread
#nextWaiter: Node
+predecessor(): Node
AQS 是一个 FIFO 的双向队列,其内部通过节点 head 和 tail 记录队 首和队尾元素,队列元素的类型为 Node。其中 Node 中的 Thread waiter 变量用来存放进入 AQS 队列里面的线程; Node 节点内部的 SHARED 用来标记该线程是获取共享资源时被阻塞挂起后放入 AQS 队列的,EXCLUSIVE 用来标记线程是获取独占资源时被挂起后放入 AQS 队列的; wait status 记录当前线程等待状态,可以为 CANCELLED(线程被取消了)、 SIGNAL(线程需要被唤醒)、CONDITION(线程在条件队列里面等待)、PROPAGATE(释放共享资源时需要通知其他节点); prev 记录当前节点的前驱节点,next 记录当前节点的后继节点。
在 AQS 中 维持了一个单一的状态信息 state, 可以通过 getState、setState、 compareAndSetState 函数修改其值。对于 ReentrantLock 的实现来说,state 可以用来表示当前线程获取锁的可重入次数; 对于读写锁 ReentrantReadWriteLock 来说,state 的高 16 位表示读状态,也就是获取该读锁的次数,低 16 位表示获取到写锁的线程的可重入次数; 对于 semaphore 来说,state 用来表示当前可用信号的个数; 对于 CountDownlatch 来说,state 用来表示计数器当前的值。
AQS 有个内部类 ConditionObject,用来结合锁实现线程同步。ConditionObject 可以直接访问 AQS 对象内部的变量,比如 state 状态值和 AQS 队列。ConditionObject 是条件变量,每个条件变量对应一个条件队列 (单向链表队列),其用来存放调用条件变量的 await 方法后被阻塞的线程,如类图所示,这个条件队列的头、尾元素分别为 firstWaiter 和 lastWaiter。
对于 AQS 来说,线程同步的关键是对状态值 state 进行操作。根据 state 是否属于一个 线程,操作 state 的方式分为独占方式和共享方式。
在独占方式下获取和释放资源使用的方法为 :
- void acquire(int arg)
- void acquireInterruptibly(int arg)
- boolean release(int arg)。
在共享方式下获取和释放资源的方法为:
- void acquireShared(int arg)
- void acquireSharedInterruptibly(int arg)
- boolean releaseShared(int arg)
使用独占方式获取的资源是与具体线程绑定的,就是说如果一个线程获取到了资源, 就会标记是这个线程获取到了,其他线程再尝试操作 state 获取资源时会发现当前该资源不是自己持有的,就会在获取失败后被阻塞。比如独占锁 ReentrantLock 的实现,当一个线程获取了 ReentrantLock 的锁后,在 AQS 内部会首先使用 CAS 操作把 state 状态值从 0 变为 1,然后设置当前锁的持有者为当前线程,当该线程再次获取锁时发现它就是锁的持有者,则会把状态值从 1 变为 2,也就是设置可重入次数,而当另外一个线程获取锁时发现自己并不是该锁的持有者就会被放入 AQS 阻塞队列后挂起。
对应共享方式的资源与具体线程是不相关的,当多个线程去请求资源时通过 CAS 方式竞争获取资源,当一个线程获取到了资源后,另外一个线程再次去获取时如果当前资源还能满足它的需要,则当前线程只需要使用 CAS 方式进行获取即可。比如 Semaphore 信号量,当一个线程通过 acquire() 方法获取信号量时,会首先看当前信号量个数是否满足需要,不满足则把当前线程放入阻塞队列,如果满足则通过自旋 CAS 获取信号量。
在独占方式下,获取与释放资源的流程如下 :
当一个线程调用 acquire(int arg) 方法获取独占资源时,会首先使用 tryAcquire 方 法尝试获取资源,具体是设置状态变量 state 的值,成功则直接返回,失败则将当前线程 封装为类型为 Node.EXCLUSIVE 的 Node 节点后插入到 AQS 阻塞队列的尾部,并调用 LockSupport.park(this) 方法挂起自己。
1
2
3
4
5public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
selfInterrupt();
}
}当一个线程调用 release(int arg) 方法时会尝试使用 tryRelease 操作释放资源,这里 是设置状态变量 state 的值,然后调用 LockSupport.unpark(thread) 方法激活 AQS 队列里面 被阻塞的一个线程 (thread)。被激活的线程则使用 tryAcquire 尝试,看当前状态变量 state 的值是否能满足自己的需要,满足则该线程被激活,然后继续向下运行,否则还是会被放 入 AQS 队列并被挂起。
1
2
3
4
5
6
7
8
9public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}需要注意的是,AQS 类并没有提供可用的 tryAcquire 和 tryRelease 方法,正如 AQS 是锁阻塞和同步器的基础框架一样,tryAcquire 和 tryRelease 需要由具体的子类来实现。 子类在实现 tryAcquire 和 tryRelease 时要根据具体场景使用 CAS 算法尝试修改 state 状态值 , 成功则返回 true, 否则返回 false。子类还需要定义,在调用 acquire 和 release 方法时 state 状态值的增减代表什么含义。比如继承自 AQS 实现的独占锁 ReentrantLock,定义当 status 为 0 时表示锁空闲,为 1 时表示锁已经被占用。在重写 tryAcquire 时,在内部需要使用 CAS 算法查看当前 state 是否为 0,如果为 0 则使用 CAS 设置为 1,并设置当前锁的持有者为当前线程,而后返回true,如果 CAS 失败则返回 false。
在共享方式下,获取与释放资源的流程如下 :
当线程调用 acquireShared(int arg) 获取共享资源时,会首先使用 tryAcquireShared 尝试获取资源,具体是设置状态变量 state 的值,成功则直接返回,失败则将当前线 程封装为类型为 Node.SHARED 的 Node 节点后插入到 AQS 阻塞队列的尾部,并使用 LockSupport.park(this) 方法挂起自己。
1
2
3
4public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}当一个线程调用 releaseShared(int arg) 时会尝试使用 tryReleaseShared 操作释放资源,这里是设置状态变量 state 的值,然后使用 LockSupport.unpark(thread) 激活 AQS 队列里面被阻塞的一个线程 (thread)。被激活的线程则使用 tryReleaseShared 查看当前状态变 量 state 的值是否能满足自己的需要,满足则该线程被激活,然后继续向下运行,否则还 是会被放入 AQS 队列并被挂起。
1
2
3
4
5
6
7public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}同样需要注意的是,AQS 类并没有提供可用的 tryAcquireShared 和 tryReleaseShared 方法,正如 AQS 是锁阻塞和同步器的基础框架一样,tryAcquireShared 和 tryReleaseShared 需要由具体的子类来实现。子类在实现 tryAcquireShared 和 tryReleaseShared 时要根据具体 场景使用 CAS 算法尝试修改 state 状态值,成功则返回 true,否则返回 false。比如继承自 AQS 实现的读写锁 ReentrantReadWriteLock 里面的读锁在重写 tryAcquireShared 时,首先查看写锁是否被其他线程持有,如果是则直接返回 false,否则使用 CAS 递增 state 的高 16 位 ( 在 ReentrantReadWriteLock 中,state 的高 16 位为获取读锁的次数 )。
另外,独占方式下的 void acquire(int arg) 和 void acquireInterruptibly(int arg),与共享方式下的 void acquireShared(int arg) 和 void acquireSharedInterruptibly(int arg), 这两套函数中都有一个带有 Interruptibly 关键字的函数,那么带这个关键字和不带有什么 区别呢? 其实不带 Interruptibly 关键字的方法的意思是不对中断进行响应,也就是线程在调用不带 Interruptibly 关键字的方法获取资源时或者获取资源失败被挂起时,其他线程中断了该线程,那么该线程不会因为被中断而抛出异常,它还是继续获取资源或者被挂起,也就 是说不对中断进行响应,忽略中断。而线程在调用带 Interruptibly 关键字的方法获取资源时或者获取资源失败被挂起时,其他线程中断了该线程,那么该线程会抛出 InterruptedException 异常而返回。
最后,我们来看看如何维护 AQS 提供的队列,主要看入队操作。当一个线程获取锁失败后该线程会被转换为 Node 节点,然后就会使用 enq(final Node node) 方法将该节点插入到 AQS 的阻塞队列。
1 | private Node enq(final Node node) { |
正如在基础篇中讲解的,notify 和 wait,是配合 synchronized 内置锁实现线程间同步 的基础设施一样,条件变量的 signal 和 await 方法也是用来配合锁 (使用 AQS 实现的锁) 实现线程间同步的基础设施。它们的不同在于,synchronized 同时只能与一个共享变量的 notify 或 wait 方法实现同步, 而 AQS 的一个锁可以对应多个条件变量。在基础篇中已做介绍,在调用共享变量的 notify 和 wait 方法前必须先获取该共享变量的内置锁,同理,在调用条件变量的 signal 和 await 方法前也必须先获取条件变量对应的锁。看下面示例:
1 | public static void main(String[] args) { |
1 | begin wait 【阻塞在这儿】 |
在上面代码中,lock.newCondition() 的作用其实是 new 了一个在 AQS 内部声明的 ConditionObject 对象,ConditionObject 是 AQS 的内部类,可以访问 AQS 内部的变量(例 如状态变量 state)和方法。在每个条件变量内部都维护了一个条件队列,用来存放调用条 件变量的 await() 方法时被阻塞的线程。注意这个条件队列和 AQS 队列不是一回事。
在如下代码中,当线程调用条件变量的 await() 方法时(必须先调用锁的 lock() 方法 获取锁),在内部会构造一个类型为 Node.CONDITION 的 node 节点,然后将该节点插入条件队列末尾,之后当前线程会释放获取的锁(也就是会操作锁对应的 state 变量的值), 并被阻塞挂起。这时候如果有其他线程调用 lock.lock() 尝试获取锁,就会有一个线程获取到锁,如果获取到锁的线程调用了条件变量的 await()方法,则该线程也会被放入条件变量的阻塞队列,然后释放获取到的锁,在 await() 方法处阻塞。
1 | public final void await() throws InterruptedException { |
在如下代码中,当另外一个线程调用条件变量的 signal 方法时(必须先调用锁的 lock() 方法获取锁),在内部会把条件队列里面队头的一个线程节点从条件队列里面移除并 放入 AQS 的阻塞队列里面,然后激活这个线程。
1 | public final void signal() { |
我们用一个图总结如下: 一个锁对应一个AQS阻塞队列,对应多个条件变量,每个条件变量有自己的一个条件队列。
基于 AQS 实现自定义同步器
如前文所讲的,自定义 AQS 需要 重写一系列函数,还需要定义原子变量 state 的含义。这里我们定义,state 为 0 表示目前锁没有被线程持有,state 为 1 表示锁已经被某一个线程持有,由于是不可重入锁,所以不需要记录持有锁的线程获取锁的次数。另外,我们自定义的锁支持条件变量。
1 | public class NonReentrantLock implements Lock, Serializable { |
NonReentrantLock 定义了一个内部类 Sync 用来实现具体的锁的操 作,Sync 则继承了 AQS。由于我们实现的是独占模式的锁,所以 Sync 重写了 tryAcquire、 tryRelease 和 isHeldExclusively 3 个方法。另外,Sync 提供了 newCondition 这个方法用来 支持条件变量。下面我们使用上节自定义的锁实现一个简单的生产—消费模型,代码如下。
1 | public class Demo { |