0%

第5章:JUC显式锁的原理与实战

5.1 显式锁

使用 Java内置锁 时,无需通过 Java 代码显式地对同步对象的监视器进行抢占和释放,使用起来非常方便。但是不具备一些比较高级的锁功能:

  • 限时抢锁:设置超时时长,不至于无限等下去

  • 可中断抢锁:抢锁时,外部线程给抢锁线程发一个中断信号,就能唤起等待锁的线程,并终止抢占过程

  • 多个等待队列:为锁维持多个等待队列,以提高锁的效率。比如生产者-消费者模式中,生产者和消费者公用一把锁,锁上维持2个队列:一个生产队列和一个消费者队列

5.1.3 使用显式锁的模板代码

因为 JUC 中的显式锁都实现了 Lock 接口,所以不同类型的显式锁对象使用的方法都是模板化的、套路化的,模板代码如下:

1
2
3
4
5
6
7
8
9
10
11

//创建锁对象,SomeLock 为 Lock 的某个实现类,如 ReentrantLock
Lock lock = new SomeLock();
//step 1: 抢占锁
lock.lock;
try {
//step2 : 抢锁成功,执行临界区代码
doSomething();
} finally {
lock.unlock(); //step3: 释放锁
}

模板代码有几个需要注意的点:

  • 释放锁操作 unlock 必须在 try-catch 的finally 中执行,否则如果临界区代码抛出异常,锁就可能永远也得不到释放了

  • 抢占锁的操作lock 必须在 try 语句之外,原因:lock 方法不一定能够抢锁成功(我猜测作者是想说 tryLock() 之类的方法不一定会获取成功),如果没有抢占到锁,也肯定不需要释放锁,在没有占有锁的情况下释放锁可能导致异常

  • 在抢占锁操作 lock 和 try 语句之间不要插入任何代码,避免抛出异常而无法执行到 try,进而无法释放锁。

5.1.4 基于显式锁进行“等待-通知”方式的线程间通信

Java 内置锁可以通过 Object 的 wait 和 notify 方法来实现简单的线程间通信,与此类似的是,基于 Lock 显式锁,JUC 也提供了一个用于线程间通信的接口 Condition

Condition 接口有2类主要方法:

  • await() : 在功能上与 Object.wait() 语意等效,线程会加入 await() 等待队列,并释放当前锁
  • signal() : 在功能上与 Object.notify() 语意等效,唤醒 await() 等待队列中的线程

为了避免与 Object 中的 wait/notify 2类方法在使用时发生混淆,JUC 对 Condition 接口方法改了名称,成为了 await/signal。Condition 对象的 signal 和同一个对象的 await 是一一配对使用的

Condition 对象是基于显式锁的,所以不能独立创建 Condition 对象,可以通过 lock.newCondition() 方法获取一个与当前显式锁绑定的 Condition 对象。用法举例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();

class WaitTarget implements Runnable {
@Override
public void run() {
lock.lock();//抢锁
try {
print("开始等待");
condition.await();//开始等待,并且释放锁
print("收到通知,开始继续执行");
} finally {
lock.unlock();//释放锁
}
}
}

使用 await 方法前必须要先获取锁,await 方法会让当前线程加入 Condition 的等待队列,同理, signal 方法也要在获取锁之后才能调用,调用 signal 之后一定要释放锁,只有这样被唤醒的等待线程才能抢锁。

5.1.5 LockSupport

LockSupport 是JUC 提供的一个 线程阻塞与唤醒的工具类。大体有2类方法(阻塞和唤醒):

1
2
3
4
5
6
// 无限期阻塞当前线程
public static void park();

// 唤醒某个被阻塞的线程
public static void unpark(Thread thread);

一个简单的演示的实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class WaitTarget implements Runnable {
@Override
public void run() {

try {
print("即将进入阻塞");
LockSupport.park();//阻塞当前线程
if (Thread.currentThread().isInterrupted()) {
print("被中断了");
} else {
print("被重新唤醒");
}

} finally {
lock.unlock();//释放锁
}
}
}

5.1.5.1 LockSupport.park() 的对比(自己改的标题)

1、与 Thread.sleep() 的区别

  • Thread.sleep() 只能自己醒来,没法外部唤醒;LockSupport.park() 可以通过 unpark 唤醒
  • Thread.sleep() 声明了中断异常(InterruptedException) ,而LockSupport.park() 没有
  • 被中断的时候,虽然线程都会被设置中断标记,但是线程表现不同:sleep 会抛异常,park 不会

2、与 Object.wait() 的区别

  • wait 需要在同步块中执行,park 可以在任意地方执行
  • 当阻塞线程被中断时,wait 方法抛出中断异常;而park 不会抛出异常
  • 如果没有调用过 wait 而直接执行 notify 会导致 IllegalMonitorStateException异常;而未做park 直接做 unpark 不会有任何异常

自己看了下 LockSupport 的源码,发现 park 和 unpark 都是 native 方法,所以在代码层面就没对比了

5.1.6 显式锁分类

从多个维度分类: 可重入、悲观/乐观、公平、共享/独占、可中断/不可中断

5.2.2 通过 CAS 实现乐观锁

乐观锁通过 CAS 实现主要就是两个步骤:

  1. 冲突检测 (CAS 检测内存位置 V 的值是否为 A)
  2. 数据更新 (CAS 上述检测如果是,则将位置 V 更新为 B 值,否则不更改)

在实际使用中,仅仅进行一次 CAS 是不够的,一般情况下需要不断循环重试直到CAS 操作成功,也即自旋

乐观锁是一种思想,CAS 是这种思想的一种实现

作为演示,这里设计一个简单版本的不可重入(如果需要重入就count计数,这里不贴例子了)的自旋锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class MyLock implements Lock {

//当前锁的拥有者
private AtomicReference<Thread> owner = new AtomicReference<>();

@Override
public void lock() {
Thread t = Thread.currentThread();
//书中的例子这里是写错了,它写成 while(owner.compareAndSet(null, t))
while(!owner.compareAndSet(null, t)) {//循环竞争锁
//没获取到锁,让出cpu
Thread.yield();
}
}

@Override
public void unlock() {
Thread t = Thread.currentThread();
if(t == owner.get()) {
owner.set(null);
}
}
}

5.2.5 CAS 可能导致“总线风暴”

为了保障“缓存一致性”,不同的内核需要通过总线来回通信,使用 lock 前缀(用于内存屏障)指令的 Java 操作(比如CAS、volatile)会产生缓存一致性流量,很多线程同时执行lock前缀操作时,会在总线上产生过多的流量,也就是 “总线风暴”。

那么,基于 JUC 实现的轻量级锁怎么避免总线风暴?答案是:使用队列对抢锁线程进行排队。

5.2.6 CLH自旋锁

CLH锁就是一种基于队列排队的自旋锁(由3个发明人的名字命名的),AQS 也是基于这种原理,为了说明其原理,这里实现一个 CLH 锁的学习版本,并不是真正的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public class CLHLock implements Lock {
/**
* 当前节点的线程本地变量
*/
private static ThreadLocal<Node> curNodeLocal = new ThreadLocal();
/**
* CLHLock队列的尾部指针,使用AtomicReference,方便进行
* CAS操作
*/
private AtomicReference<Node> tail = new AtomicReference<>(null);

public CLHLock() {
//设置尾部节点
tail.getAndSet(Node.EMPTY);
}

//加锁操作:将节点添加到等待队列的尾部
@Override
public void lock() {
Node curNode = new Node(true, null);
Node preNode = tail.get();
//CAS自旋:将当前节点插入队列的尾部
while (!tail.compareAndSet(preNode, curNode)) {
preNode = tail.get();
}
//设置前驱节点
curNode.setPrevNode(preNode);

// 自旋,监听前驱节点的locked变量,直到其值为false
// 若前驱节点的locked状态为true,则表示前一个线程还在抢占或者占有锁
while (curNode.getPrevNode().isLocked()) {
//让出CPU时间片,提高性能
Thread.yield();
}
// 能执行到这里,说明当前线程获取到了锁
// Print.tcfo("获取到了锁!!!");
//将当前节点缓存在线程本地变量中,释放锁会用到
curNodeLocal.set(curNode);
}

//释放锁
@Override
public void unlock() {
Node curNode = curNodeLocal.get();
curNode.setLocked(false);
curNode.setPrevNode(null); //help for GC
curNodeLocal.set(null); //方便下一次抢锁
}

//虚拟等待队列的节点
@Data
static class Node {
public Node(boolean locked, Node prevNode) {
this.locked = locked;
this.prevNode = prevNode;
}

// true:当前线程正在抢占锁,或者已经占有锁
// false:当前线程已经释放锁,下一个线程可以占有锁了
volatile boolean locked;
// 前一个节点,需要监听其locked字段
Node prevNode;

// 空节点
public static final Node EMPTY = new Node(false, null);
}
// 省略其他代码
}

CLH 算法的几个要点就是(我理解的是,这种步骤就是公平锁环境下弄的,非公平锁不会每次头节点获得锁):

  1. 初始状态队列尾部(tail)指向一个 EMPTY节点,tail 节点使用 AtomicReference 类型是为了让多线程并发操作时安全
  2. Thread 在抢锁时会创建一个 Node 加入等待队列尾部(默认lock 属性为true),同时自己作为新的尾部,这些操作通过 CAS 自旋操作
  3. Node 加入之后,会循环判断前去节点的 lock 属性是否为false,如果为false,即前驱节点释放了锁,当前节点获得了锁
  4. 当前node 获得锁之后,将locked 属性设置为true
  5. 临界区代码执行完毕后,当前节点的 locked 置为 false,方便后续节点获取锁

5.4.2 死锁的监测与中断

JDK 8 中包含一个 ThreadMXBean 接口,提供多种监视线程的方法:

  • findDeadlockedThreads :用于检测由于抢占JUC显式锁、Java内置锁引起死锁的线程。
  • findMonitorDeadlockedThreads:仅仅用于检测由于抢占Java内置锁引起死锁的线程。

5.5 共享锁与独占锁

JUC 中的共享锁包括 Semaphore(信号量)、ReadLock(读写锁中的读锁)、CountDownLatch 倒数闩

5.5.2 共享锁 Semaphore

Semaphore 可以用来控制在同一时刻共享资源的线程数量,维护了一组虚拟许可。将 Semaphore 称为一个许可管理器 更形象。

5.5.2.1 Semaphore 使用示例

Semaphore 使用一个很形象的场景是银行排队办理业务,只有 N 个窗口,M 个人在排队,那么其实相当于有 N 个许可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
final Semaphore semaphore = new Semaphore(N);

Runnable r = () -> {
try {
//阻塞开始获取许可
semaphore.acquire(1);
//获取了一个许可
print("业务办理中");
//模拟业务操作: 处理排队业务
Thread.sleep(1000);
//用完了释放许可
semaphore.release(1);
} catch(Exception e) {
e.printStackTrace();
}
}

5.5.3 共享锁 CountDownLatch

CountDownLatch 功能相当于一个多线程环境下的倒数门闩,它可以指定一个计数值,在并发环境下由线程进行减1操作,当计数变为 0 之后,被 await 阻塞的线程将会唤醒。

它的一个经典示例就是,司机开车之前需要每个人报数,报数到 100 后说明人到齐发车:

1
2
3
4
5
6
7
8
9
10
11
12
13
CountDownLatch doneSignal = new CountDownLatch(N);

for (int i = 1; i <= N; ++i) {// 启动报数任务
threadPoll.execute(new Runable() {
public void run() {
print("第" + i + "个人已到");
//倒数闩减1
doneSignal.countDown();
}
});
}
doneSignal.await(); //step2:等待报数完成,倒数闩计数值为0
print("人到齐,开车");

5.6 读写锁

读写锁的读和写操作的互斥原则如下:

  • 读读能共存
  • 读写不能共存
  • 写写不能共存

JUC 包中的读写锁接口为 ReadWriteLock ,主要有2个方法:

1
2
3
4
5
6
7
8
9
10
11
public interface ReadWriteLock {
/**
* 返回读锁
*/
Lock readLock();

/**
* 返回写锁
*/
Lock writeLock();
}

其主要实现类为 ReentrantReadWriteLock ,与 ReentrantLock 相比,前者更适合 读多写少 的场景,而 ReentrantLock 适合 读写比例相差不大 的场景

5.6.3 StampedLock 印戳锁

StampedLock 是对 ReentrantReadWriteLock 读写所的一种改进,主要改进为: 在没有写只有读的场景,StampedLock 支持不用加读锁而是直接进行读操作,最大限度提升读的效率,只有发生过写操作后,再加读锁才能进行读操作。

谢谢你的鼓励