0%

3.1 协程的构造

3.1.1 协程的创建

Kotlin 提供了一个 createCoroutine 函数用来创建协程:

1
2
3
fun <T> (suspend () -> T).createCoroutine(
completion: Continuation<T>
): Continuation<Unit>

其中 suspend() -> T 是 createCoroutin 函数的 Receiver,对 Kotlin 函数不了解的话,这个还是有点费解的。我们依次剖析 createCoroutine 的参数和返回值:

  • Receiver 是一个被 suspend 修饰的挂起函数,这也是协程协程的执行体(协程体)

  • 参数是 completion(是 Continuation 类型)会在协程体执行完后调用,实际上就是协程完成的回调

  • 返回值也是一个 Continuation 对象,由于现在协程仅仅被创建出来,因此需要通过这个值在之后触发协程的启动

以一个例子来说明 createCoroutine 的用法:

1
2
3
4
5
6
7
8
9
10
val continuation = suspend {
println("In Coroutine.")
5
}.createCoroutine(object: Continuation<Int> {
override val context = EmptyCoroutineContext

override fun resumeWith(result: Result<Int>) {
println("Coroutine End: $result")
}
})

目前协程被创建出来了,但是它还未启动。

3.1.2 协程的启动

在上述的例子中,我们已经创建了协程,之后只需要调用 continuation.resume(Unit) 之后,协程就会立即开始。为什么这样就可以触发协程体执行呢?

其实,我们创建协程得到的 continuation 是 SafeContinuation 的实例,不过这也是个“马甲”,它有个 delegate 属性,里面将 suspend() -> T 封装成了一个 Continuation 对象

也就是说,我们创建协程得到的 continuation 其实就是套了几层“马甲”的协程体,故调用这个 continuation.resume() 可以触发协程体的执行

一般来讲,我们创建协程之后就会启动它,所以标准库还提供了另一个一步到位的API——startCoroutin :

1
fun <T> (suspend () -> T).startCoroutine(completion: Continuation<T>)

我们知道,作为参数传入的 completion 就如同回调一样,**协程体的返回值会作为 resumeWith 的参数传入,例如,下面的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
suspend fun main() {
suspend {
println("我跑在协程里")
3
}.startCoroutine(object: Continuation<Int> {
override val context: CoroutineContext
get() = EmptyCoroutineContext

override fun resumeWith(result: Result<Int>) {
println("协程执行完成,result = $result")
}
})
}

如果协程体执行正常,则 result 为 Success 结果;否则返回 Failure 结果。上述代码结果如下:

我跑在协程里
//协程执行完成,result = Failure(java.lang.IllegalStateException: 抛出异常)
协程执行完成,result = Success(3)

3.1.3 协程体的 Receiver

与协程创建和启动相关的 API 还有一组:

1
2
3
4
5
6
fun <R, T> (suspend R.() -> T).create(
receiver: R,
completion: Continuation<T>
): Continuation<Unit>

// start 方法略

上述的 R 可以为协程体提供一个作用域,在协程体内我们可以直接使用,如下代码所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//先把上述方法封装下,如果不封装怎么调用?
fun <R, T> launchCoroutine(receiver: R, block: suspend R.() -> T) {
block.startCoroutine(receiver, object : Continuation<T> {
override fun resumeWith(result: Result<T>) {
println("Coroutine End: $result")
}

override val context = EmptyCoroutineContext
})
}


//创建额外的域
class ProducerScope<T> {
suspend fun produce(value: T) {
println("我在额外的 Receiver 中了: $value")
}
}



//使用
suspend fun main() {
launchCoroutine(ProducerScope<Int>()) {
println("In Coroutine.")
produce (1024)
//delay (1000) //我自己测试的时候,如果放开这个注释,就不会打到 2048,coroutine end 也不会执行到
produce (2048)
}
}

这里要注意的一点是,如果额外的域 ProducerScope 添加了 @RestrictsSuspension 注解,则无法使用外部的函数,因此例子中的 delay 也不会调用

3.1.4 可挂起的 main 函数

前面我们一直这样写:

1
2
3
suspend fun main() {
....
}

这样就能在程序入口就获得一个协程了,这到底是怎么实现的呢?首先,我们能确认2点:

  • JVM 压根就不知道什么挂起函数,kotlin协程

  • JVM 肯定有一个 main 函数的

为了搞清楚原理,我们可以对 Kotlin 反编译成 Java 代码(IDEA的 Kotlin byteCode 功能):

1
2
3
public static void main(String[] var0) {
RunSuspendKt.runSuspend(new KotlinMainKt$$$main(var0));
}

可以看到,其实它是有真正的 main 函数的,里面的协程封装逻辑都被扔到 RunSuspendKt里面了

3.2.1 挂起函数

整个 kotlin 语境下有 2种函数: 普通函数和挂起函数,其中:挂起函数能调用任何函数,但是普通函数不能调用挂起函数。

所谓的协程挂起其实就是程序执行流程发生异步调用时,当前调用流程进入等待状态。注意:挂起函数不一定真的会挂起,只是提供了挂起的条件,那额什么情况才会真正挂起呢?

3.2.2 挂起点

回想下协程的创建过程,我们的协程体本身就是一个 Continuation 实例,正因如此,挂起函数才能在协程体内运行。在协程体内部,挂起函数的调用处称为挂起点,挂起点如果出现异步调用,当前协程就会被挂起,直到对应的 Continuation.resume 函数被调用才会恢复执行。

异步调用如何发生,取决于 resume 函数与对应的挂起函数的调用是否在相同的调用栈上

3.2.3 CPS 变换

我们知道,挂起函数如果需要挂起,则需要通过 suspendCoroutine 来获取 Continuation 实例,我们已经知道它是协程体封装成的 Continuation,但是这个实例是怎么传入的呢?先看下面的suspend 函数:

1
2
3
suspend fun notSuspend() = suspendCoroutine<Int> { continuation ->
continuation.resume(100)
}

看起来这个方法没有接收任何参数,kotlin 中看不出来我们就用 Java 直接调用它:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void main(String[] args) {
Object result = KotlinMainKt.notSuspend(new Continuation<Integer>() {
@NotNull
@Override
public CoroutineContext getContext() {
return null;
}

@Override
public void resumeWith(@NotNull Object o) {

}
});
}

我们发现用Java 调用的时候,需要传入一个 Continuation ,也就是说kotlin 中的 suspend() -> Int 类型 在Java 看来实际上是 Continuation -> Object 类型 !

这与我们平时写的异步回调相似,传入callback 等待结果回调。但是为什么会有返回值 Object?这里的 Object 会有 2 种情况:

  • 挂起函数同步返回时:作为参数传入的 Continuation 的 resumeWith 不会被调用,函数实际地返回值 Object 就是挂起函数的返回值

  • 挂起函数挂起,执行异步逻辑。此时函数返回值 Object 是一个挂起标志,通过这个标志外部协程就可以知道该函数需要挂起等到异步逻辑执行。

挂起标志是一个常量,定义在 Intrinsics.kt 当中:

1
2
3
4
5
6
7
public val COROUTINE_SUSPENDED: Any get() = CoroutineSingletons.COROUTINE_SUSPENDED

internal enum class CoroutineSingletons {
COROUTINE_SUSPENDED,
UNDECIDED,
RESUMED
}

现在大家知道了原来挂起函数就是普通函数参数中多了一个 Continuation 实例,这也难怪普通函数不能调用挂起函数,但是挂起函数可以调用普通函数的原因

还可以仔细想想,为什么Kotlin 语法要求挂起函数一定要运行在协程体内或者挂起函数中呢?答案是:协程体或者挂起函数中都隐含了 Continuation 实例

3.3 协程的上下文

3.3.1 协程上下文的集合特征

协程的 Context 更像是个 List 结构,都有空的表示:

1
2
3
var list: List<Int> = emptyList()

var context: CoroutineContext = EmptyCoroutineContext

接下来,我们往里面添加数据(要记得将 list 和 context 设置为 var 而不是 val ,我就是因为设置 val 所以不能用 “+” 操作):

1
2
3
4
list += 0   //添加一个元素,得到一个新的list
list += listOf<Int>(1,2,3) //将listOf中的元素都添加进去,生成一个新的 list

context += EmptyCoroutineContext

协程 Context 是一个集合,那么它的元素类型是什么呢?看下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public interface CoroutineContext {
/**
* Returns the element with the given [key] from this context or `null`.
*/
public operator fun <E : Element> get(key: Key<E>): E?
}



/**
* An element of the [CoroutineContext]. An element of the coroutine context is a singleton context by itself.
*/
public interface Element : CoroutineContext {
/**
* A key of this coroutine context element.
*/
public val key: Key<*>
//省略其他代码
}

Element 是本身也实现了 CoroutineContext 接口,这看上去就好像 Int 实现了 List 接口一样,这就很奇怪。其实这主要是为了 API 设计方便,Element 中是不会存放除了它自己以外的其他数据的(这句话其实不太明白,还需要后续的理解

Element 中有个属性 key ,这个属性很关键,虽然我们往 list中添加元素时没有明确指出,但是我们都知道 list 中的元素都有一个 index 索引,而这里的协程上下文Element 的 key 就是这个集合中元素的索引,不同之处是这个索引“长”在数据里面,意味着上下文的数据在出生时就找到了自己的位置(这句话同样不太理解。。。)

可能有人觉得协程 Context 和 Map 似乎更近,为什么这里要与 List 对比呢?一是 List 的 Key 类型是固定的 Int ,而 Map 的Key 有很多种类型;二是是协程上下文内部实现是一个单链表,这也反映出它与 List 之间的关系。

3.3.2 协程上下文元素的实现

上一节知道协程 Conext 是个接口,实际上还有个抽象类 AbstractCoroutineContextElement,能让我们实现协程上下文更加方便:

1
public abstract class AbstractCoroutineContextElement(public override val key: Key<*>) : Element

创建元素并不难,只需要提供对应的 Key 即可,以下是协程名的实现(系统源码):

1
2
3
4
5
6
7
8
9
10
11
public data class CoroutineName(
/**
* User-defined coroutine name.
*/
val name: String
) : AbstractCoroutineContextElement(CoroutineName) {
/**
* Key for [CoroutineName] instance in the coroutine context.
*/
public companion object Key : CoroutineContext.Key<CoroutineName>
}

以下是协程异常处理器的实现源码:

1
2
3
4
5
6
7
8
class CoroutineExceptionHandler(val onErrorAction: (Throwable) -> Unit) : AbstractCoroutineContextElement(Key) {
companion object Key : CoroutineContext.Key<CoroutineExceptionHandler>

fun onError(error: Throwable) {
error.printStackTrace()
onErrorAction(error)
}
}

3.3.3 协程上下文的使用

前面说了,可以为协程上下文添加多个 Context,添加好之后,我们可以将 Context 绑定到协程上了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
suspend {
println("我在协程里面")
5
}.createCoroutine(object: Continuation<Int> {
var tempContext: CoroutineContext = CoroutineName("name-01") +
CoroutineExceptionHandler(){
println("出错啦")
}

override val context: CoroutineContext = tempContext

override fun resumeWith(result: Result<Int>) {
// todo
}
})

先自己构建出 tempContext 然后再将 tempContext 赋值给 context。绑定了协程的上下文,我们的协程就初步成型了!接下来演示如何使用这个CoroutineExceptionHandler:

1
2
3
4
5
override fun resumeWith(result: Result<Int>) {
result.onFailure {
context[CoroutineExceptionHandler]?.onError(it)
}
}

不管结果如何,这个 resumeWith 是一定会被调用的,如果有异常出现,我们就从协程上下文找到 CoroutineExceptionHandler 实例,调用它的 onError 方法即可,这个上下文在协程内部都是可以直接获取的,比如,在协程内部获取名字:

1
2
3
4
suspend {
println("协程名字:${coroutineContext[CoroutineName]?.name}")
5
}.createCoroutine(object: Continuation<Int>

这样,我们就知道了协程上下文的设置和获取方法了。

3.4 协程的拦截器

之前的内容知道 Kotlin 协程通过调用挂起函数实现挂起,通过Continuation 的恢复调用来实现恢复,还可以通过 Context 的设置来丰富协程能力,那么,如果处理线程的调度?其实标准库还提供了拦截器(Interceptor)的组件,允许我们拦截协程异步回调时的恢复调用,那么线程调度应该也不是什么难事。

3.4.2 拦截器的使用

拦截器 ContinuationInterceptor 继承了 CoroutineContext.Element ,而Element 又继承 CoroutineContext 类型,所以拦截器也是 上下文 的一种实现

1
2
3
4

public interface ContinuationInterceptor : CoroutineContext.Element

public interface Element : CoroutineContext

自己定义拦截器只需要实现拦截器接口 ContinuationInterceptor 即可,比如打印日志(注意:拦截器的 Key 是一个固定值: ContinuationInterceptor,协程执行时会拿到拦截器并拦截):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class LogInterceptor() : ContinuationInterceptor{
override val key: CoroutineContext.Key<*> = ContinuationInterceptor
override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> {
return LogContinuation(continuation)
}
}

class LogContinuation<T> (private val continuation: Continuation<T>): Continuation<T> by continuation {
override fun resumeWith(result: Result<T>) {
println("执行前")
continuation.resumeWith(result)
println("执行后")
}
}

接下来使用的时候,我们就将拦截器作为context就好了,前面说了拦截器本来就是上下文:

1
2
3
4
5
6
7
8
9
10
11
12
fun haha (){
val continuation = suspend {
println("我在协程里面")
5
}.startCoroutine(object : Continuation<Int>{
override val context: CoroutineContext = LogInterceptor()

override fun resumeWith(result: Result<Int>) {
//。。。
}
})
}

3.4.3 拦截器的执行细节

3.5 Kotlin 协程所属类别

2.2 协程的分类

可以按照按照调用栈(函数调用栈,用来保存函数调用时的状态信息的数据结构)来分,因为协程需要挂起和恢复,因此对于挂起点的状态保存就极其关键。

2.2.1 按照调用栈分类

根据是否开辟相应的调用栈来分的话可以分为 2 类:

  • 有栈协程,需要额外的栈空间,但是灵活,可以在任意函数调用层级的任意位置挂起

  • 无栈协程, 节省内存,咱 Kotlin 协程一般认为就归为这一类,依靠对协程体本身编译生成的状态机的状态流转来实现控制流

不过,Kotlin 协程可以在挂起函数范围内任意调用层次挂起,换句话说,我们启动一个 Kotlin 协程,可以在其中任意嵌套 suspend 函数 !

2.2.2 按调度方式分类

  • 对称协程: 任何一个协程都是相互独立且平等的,调度权可以在任意协程之间转移

  • 非对称协程:协程出让调度权的目标只能是它的调用者,即协程之间存在调用和非调用关系

可以看出来,对称协程和线程已经非常接近了

2.3 协程的实现举例

列举了各种语言协程的实现,这里先略过。

高并发场景下常见的设计模式可能存在线程安全问题,比如传统的单例模式就是典型。本章介绍几种高并发场景下常用的几种模式:线程安全的单例模式、ForkJoin模式、生产者-消费者模式、Master-Worker模式和 Future模式。

8.1 线程安全的单例模式

没有volatile 情况下的双重检查实现的单例模式,可能会存在问题,可以看到下面这句代码:

instance = new Singleton();

转换成(具有原子性的)汇编指令大致会分为 3个:

  1. 分派一块内存 M

  2. 在 M 上初始化 Singleton 对象

  3. M 的地址赋值给 instance 变量

但是,以前说过可能会进行重排序,上面 3 个指令优化之后可能会变为:

  1. 分派一块内存 M

  2. M 的地址赋值给 instance 变量

  3. 在 M 上初始化 Singleton 对象

指令重排后,获取单例可能导致问题发生,假设 A 、B线程过来获取单例:

  1. A 通过 getInstance() 方法,执行到分配一块内存并将地址赋值给 instance,恰好发生了线程切换,此时,A 还没来得及对 M 指向的内存初始化

  2. 线程 B 进入 getInstance() 方法,判断 instance 不为空,于是 B 直接获取到了未初始化的 instance

  3. 线程 B 使用未初始化完全的对象 instance 在访问 instance 的成员变量时可能会发生异常

所以需要添加 volatile 防止指令重排。

8.1.5 使用静态内部类实现懒汉式单例模式

虽然通过 双重锁检查+volatile相结合的方式能实现高性能、线程安全的单例模式,但是该实现的底层原理比较复杂、实现繁琐,另一种易于理解、编程简单的单例模式实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class Singleton {
//私有构造函数
private Singleton() {}

//获取单例的方法
public static final Singleton getInstance() {
return LazyHolder.INSTANCE;
}

private static class LazyHolder {
private static final Singleton INSTANCE = new Singleton();
}
}

这种方式只有在调用 getInstance() 的时候才会初始化单例,该方式既解决了线程安全问题,又解决了写法繁琐的问题。书中推荐使用这种方案。

8.2 Master-worker 模式

这是一种常见的高并发模式,它的核心思想是:任务的调度和执行分离,调度任务的角色是 Master,执行任务的角色是 Worker,Master 负责接收、分配和合并(Merge)任务结果,Worker 负责执行任务。

8.2.1 Master-worker 模式的参考实现

Master 的参考代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73

public class Master<T extends Task, R> {
// 所有Worker的集合
private HashMap<String, Worker<T, R>> workers = new HashMap<>();


// 任务的集合
private LinkedBlockingQueue<T> taskQueue = new LinkedBlockingQueue<>();

//任务处理结果集合
protected Map<String, R> resultMap = new ConcurrentHashMap<>();

//Master的任务调度线程
private Thread thread = null;

//保持最终的和
private AtomicLong sum = new AtomicLong(0);

public Master(int workerCount) {
// 每个Worker对象都需要持有queue的引用,用于领任务与提交结果
for (int i = 0; i < workerCount; i++) {
Worker<T, R> worker = new Worker<>();
workers.put("子节点: " + i, worker);
}
thread = new Thread(() -> this.execute());
thread.start();
}

// 提交任务
public void submit(T task) {
taskQueue.add(task);
}

//获取worker结果处理的回调函数
private void resultCallBack(Object o) {
Task<R> task = (Task<R>) o;
String taskName = "Worker:" + task.getWorkerId() + "-" + "Task:" + task.getId();
R result = task.getResult();
resultMap.put(taskName, result);
sum.getAndAdd((Integer) result); //和的累加
}

// 启动所有的子任务
public void execute() {

for (; ; ) {
// 从任务队列中获取任务,然后Worker节点轮询,轮流分
配任务
for (Map.Entry<String, Worker<T, R>> entry :workers.entrySet()) {
T task = null;
try {
task = this.taskQueue.take();

//获取任务
Worker worker = entry.getValue(); //
获取节点
worker.submit(task, this::resultCallBack); //分配任务
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

// 获取最终的结果
public void printResult() {
Print.tco("----------sum is :" + sum.get());
for (Map.Entry<String, R> entry : resultMap.entrySet()) {
String taskName = entry.getKey();
Print.fo(taskName + ":" + entry.getValue());
}
}
}

Worker 的代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class Worker<T extends Task, R> {
//接收任务的阻塞队列
private LinkedBlockingQueue<T> taskQueue = new LinkedBlockingQueue<>();
//Worker 的编号
static AtomicInteger index = new AtomicInteger(1);
private int workerId;
//执行任务的线程
private Thread thread = null;

public Worker() {
this.workerId = index.getAndIncrement();
thread = new Thread(() -> this.run());
thread.start();
}

/**
* 轮询执行任务
*/
public void run() {
// 轮询启动所有的子任务
for (; ; ) {
try {
//从阻塞队列中提取任务
T task = this.taskQueue.take();
task.setWorkerId(workerId);
task.execute();

} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

//接收任务到异步队列
public void submit(T task, Consumer<R> action) {
task.resultAction = action; //设置任务的回调方法
try {
this.taskQueue.put(task);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

8.3 FokJoin模式

目前没心思看,先略过

8.4 生产者-消费者模式

略过

8.5 Future模式

Java的基础容器主要有 List、Set、Queue、Map 四大类,但是大家熟知的 ArrayList、LinkedList 、HashMap 等都不是线程安全的。为了解决安全问题,Java用内置锁提供了一套线程安全的同步容器类,但是效率不高;因此,JUC提供了一套高并发容器。

7.1 线程安全的同步容器类

Java 同步容器类通过 synchronized 来实现同步的容器,比如 HashTable、Vector 以及 SynchronizedList 等容器,另外,Java 还提供了一组包装方法,将一个普通的基础容器包装成线程安全的同步容器,例如通过 Collections.synchronizedMap() 包装方法能将Map 包装成线程安全的 Map,看代码就能知道其原理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//java.util.Collections

public static <K,V> Map<K, V> synchronizedMap(Map<K,V> m) {
return new SynchronizedMap<>(m);
}

private static class SynchronizedMap<K, V> implements Map<K,V>,Serializable {
private final Map<K,V> m;
final Object mutext;

SynchronizedMap(Map<K,V> m) {
this.m = Objects.requeireNonNull(m);
this.mutext = this;
}

public int size () {
synchronized (mutext) {
return m.size();
}
}

public int isEmpty () {
synchronized (mutext) {
return m.isEmpty();
}
}

....省略其他方法
}

通过上述代码可以看出,Collections 提供的包装方法实现步骤:首先实现了容器的操作接口,在操作接口上使用 synchronized 进行线程同步,然后在 synchronized 临界区将实际的操作委托给被包装的基础容器。

7.1.1 同步容器面临的问题

由前面的描述可知:同步容器实现线程安全的方式是(包括HashTable之类的以及 Collections包装类之类的)*在需要同步访问的方法上添加关键字 synchonized *。所以效率并不高。

7.2 JUC 高并发容器

为了解决同步容器的性能问题,有了 JUC 高并发容器。高并发容器是基于非阻塞算法(也说无锁编程算法)实现的容器类,主要通过 CAS(Compare And Swap) + volatile 组合实现,其中 CAS 保证原子性,volatile 保证可见性。其主要优点如下:

  • 开销小:无需在内核态和用户态来回切换
  • 读写不互斥: 读读操作之间可以不互斥,只有写操作需要使用基于 CAS 机制的乐观锁

7.3 CopyOnWriteArrayList

很多应用场景读操作可能会远远大于写操作,由于读操作不会修改原有数据,因此每次读取都要加锁其实是一种浪费。

7.3.2 CopyOnWriteArrayList 原理

CopyOnWrite(写时复制)就是在修改器对一块内存进行修改时,不直接在原有内存块上进行写操作,而是将内存复制一份,在新的内存中进行写操作,写完之后,再将原来的指针(引用)指向新的内存,原来的内存GC 。CopyOnWriteArrayList 的核心成员如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class CopyOnWriteArrayList<E> implements List<E>, RandomAccess, Cloneable,Serializable {
private static final long serialVersionUID = 8673264195747942595L;

/** 对所有的修改器方法进行保护,访问器方法并不需要保护 */
final transient ReentrantLock lock = new ReentrantLock();

/** 内部对象数组,通过 getArray/setArray 方法访问 */
private transient volatile Object[] array;

/**
*获取内部对象数组
*/
final Object[] getArray() {
return array;
}

/**
*设置内部对象数组
*/
final void setArray(Object[] a) {
array = a;
}
// 省略其他代码
}

7.3.3 CopyOnWriteArrayList 的读取操作

读取操作没有任何同步操作和锁控制,理由是内部数组array 不会发生修改,只会被另一个 array 替换,因此可以保证数据安全:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/** 操作内存的引用*/
private transient volatile Object[] array;

public E get(int index) {
return get(getArray(), index);
}

//获取元素
@SuppressWarnings("unchecked")
private E get(Object[] a, int index) {
return (E) a[index];
}

//返回操作内存
final Object[] getArray() {
return array;
}

7.3.4 CopyOnWriteArrayList 写入操作

写入操作 add() 方法在执行时加了独占锁以确保只能有一个线程进行写入操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock(); // 加锁
try {
Object[] elements = getArray();
int len = elements.length;

// 复制新数组
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock(); // 释放锁
}
}

每次进行添加操作时,都会重新复制一份数组,再往新数组中添加元素,添加完了,再将array 引用指向新的数组。也就是当add() 操作完成后,array 的引用就已经指向新的存储空间了

7.3.5 CopyOnWriteArrayList 优缺点以及对比(自己加的章节)

7.3.5.1 优缺点(自己加的章节)

  • 优点:高并发操作下读取、遍历操作不需要同步,速度非常快,适用于“读多写少“的场景
  • 缺点: 每次添加要复制一份,增加内存开销

7.3.5.2 比较(自己加的章节)

CopyOnWriteArrayList 和 ReentrantReadWriteLock (读写锁) 的思想非常类似,ReentrantReadWriteLock 的泗县时:读读共享、写写互斥、读写互斥、写读互斥,而 CopyOnWriteArrayList 更进一步了:为了将读取的性能发挥到极致,读取时完全不加锁。

7.4 BlockingQueue

在多线程环境中,通过 BlockingQueue (阻塞队列) 可以很容易实现多线程之间的数据共享和通信,比如在经典的“生产者-消费者“模型中,通过 BlockingQueue 可以完成一个高性能版本。

7.4.3 常见的 BlockingQueue

BlockingQueue 的实现类大概有 :ArrayBlockingQueue、DelayQueue、LinkedBlockingDeque、
PriorityBlockingQueue、SynchronousQueue等。

7.4.3.1 ArrayBlockingQueue

ArrayBlockingQueue 内部采用定长数组来存储元素,添加和删除操作采用同一个锁对象,也就是说添加和删除无法并行运行(为什么不能并行呢?因为作者认为ArrayBlockingQueue的写入和获取操作已经足够轻量了)

为什么 ArrayBlockingQueue 比 LinkedBlockingQueue 更加常用?因为前者添加或者删除的时候不会产生或者销毁任何额外的 Node 实例,在高并发场景下,这可以减轻系统 GC 压力

7.4.3.2 LinkedBlockingQueue

LinkedBlockingQueue 是基于链表的阻塞队列,对于添加和删除元素分别才用了独立的锁控制,也就是在高并发场景下,消费者和生产者可以并行地操作队列中数据。

需要注意的是,新建 LinkedBlockingQueue 时如果没有指定其容量大小,则默认大小近乎无限(Integer.MAX_VALUE),这样的话,一旦生产速度大于消费速度,也许还没等到队列满阻塞产生,系统内存就消耗光了。

7.4.3.3 DelayQueue

DelayQueue 只有当其指定的延迟时间到了才能够从队列中取该元素,它是一个没有大小限制的队列,因此添加(生产者)永远不会被阻塞,只有获取数据(消费者)才会被阻塞。

DelayQueue 的适用场景较少,常见的例子是用来管理一个超时未响应的连接队列

7.4.3.4 PriorityBlockingQueue

PriorityBlockingQueue 和 DelayQueue 类似,它也不会阻塞生产者,只会在没有可消费的数据时阻塞消费者。

7.4.3.5 SynchronousQueue

SynchronousQueue 是一种无缓冲的等待队列,不像 LinkedBlockingQueue 有中间缓冲区,所以吞吐率相对而言会低一些。不过对于单个任务来说,正因为没有缓冲区,所以响应会快一些。

LinkedBlockingQueue、DelayQueue 以及 PriorityBlockingQueue 都需要注意生产速度不能快于消费者,否则容易耗光内存。

7.4.4 ArrayBlockingQueue 的基本使用

用 ArrayBlockingQueue 队列实现一个生产者-消费者的案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
 // 省略import
public class ArrayBlockingQueuePetStore {

public static final int MAX_AMOUNT = 10; //数据区长度

//共享数据区,类定义
static class DataBuffer<T> {
//使用阻塞队列保存数据
private ArrayBlockingQueue<T> dataList = new ArrayBlockingQueue<>(MAX_AMOUNT);

// 向数据区增加一个元素,委托给阻塞队列
public void add(T element) throws Exception {
dataList.add(element); //直接委托
}

/**
* 从数据区取出一个商品,委托给阻塞队列
*/
public T fetch() throws Exception {
return dataList.take(); //直接委托
}
}

public static void main(String[] args) throws InterruptedException {
Print.cfo("当前进程的ID是" + JvmUtil.getProcessID());
System.setErr(System.out);
//共享数据区,实例对象
DataBuffer<IGoods> dataBuffer = new DataBuffer<>();
//生产者执行的操作
Callable<IGoods> produceAction = () -> {
//首先生成一个随机的商品
IGoods goods = Goods.produceOne();
//将商品加上共享数据区
dataBuffer.add(goods);
return goods;
};
//消费者执行的操作
Callable<IGoods> consumerAction = () -> {
// 从PetStore获取商品
IGoods goods = null;
goods = dataBuffer.fetch();
return goods;
};
// 同时并发执行的线程数
final int THREAD_TOTAL = 20;
// 线程池,用于多线程模拟测试
ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_TOTAL);

// 假定共11个线程,其中有10个消费者,但是只有1个生产者
final int CONSUMER_TOTAL = 11;
final int PRODUCE_TOTAL = 1;

for (int i = 0; i < PRODUCE_TOTAL; i++) {
//生产者线程每生产一个商品,间隔50毫秒
threadPool.submit(new
Producer(produceAction, 50));
}
for (int i = 0; i < CONSUMER_TOTAL; i++){
//消费者线程每消费一个商品,间隔100毫秒
threadPool.submit(new
Consumer(consumerAction, 100));
}
}
}

7.4.6 非阻塞式添加元素 add()、offer() 方法的原理

首先来看非阻塞式添加元素,在队列满而不能添加元素时,非阻塞式添加元素的方法会立即返回,所以线程不会被阻塞。add() 方法的实现如下:

1
2
3
4
5
6
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}

可以看出直接调用了 offer 方法,如果 offer 方法添加失败,直接抛出异常,否则返回true。

offer() 方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//offer()方法
public boolean offer(E e) {
checkNotNull(e); //检查元素是否为null
final ReentrantLock lock = this.lock;
lock.lock(); //加锁
try {
if (count == items.length)//判断数组是否已满
return false;
else {
enqueue(e);//添加元素到队列
return true;
}
} finally {
lock.unlock();
}
}

可以看到,offer() 方法的操作如下:

  1. 如果数组满了,就直接释放锁,返回false
  2. 数组没满,将元素加入队(通过enqueue()方法)然后返回true

7.4.7 阻塞式添加元素:put() 方法的原理

put() 方法是一个阻塞方法,如果队列元素已满,那么当前线程会被加入 notFull 条件等待队列中,直到有空位置才会被唤醒执行添加操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//put()方法,阻塞时可中断
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//该方法可中断
try {
//当队列元素个数与数组长度相等时,无法添加元素
while (count == items.length)
//将当前调用线程挂起,添加到notFull条件队列中,等待被唤醒
notFull.await();
enqueue(e);//如果队列没有满,就直接添加
} finally {
lock.unlock();
}
}

总结一下put()流程:

  1. 获取 putLock 锁
  2. 如果队列满了,就被阻塞,put线程进入 notFull 等待队列,等着被唤醒
  3. 如果队列未满,通过 enqueue 方法入队
  4. 释放 putLock 锁

7.4.8 非阻塞式删除元素: poll() 方法

当队列空而不能删除元素时,非阻塞删除元素的方法会立即返回,执行线程不会被阻塞。poll() 方法实现:

1
2
3
4
5
6
7
8
9
10
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//判断队列是否为null,不为null执行dequeue()方法,否则返回null
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}

里面使用了 dequeue() 方法出队:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//删除队列头元素并返回
private E dequeue() {
//拿到当前数组的数据
final Object[] items = this.items;
@SuppressWarnings("unchecked")
//获取要删除的对象
E x = (E) items[takeIndex];
//清空位置:将数组中的takeIndex索引位置设置为null
items[takeIndex] = null;
//takeIndex索引加1并判断是否与数组长度相等
//如果相等就说明已到尽头,恢复为0
if (++takeIndex == items.length)
takeIndex = 0;
count--;//元素个数减1
if (itrs != null)
itrs.elementDequeued();//同时更新迭代器中的元素数据
//删除了元素说明队列有空位,唤醒notFull条件等待队列中的put线程,执行添加操作
notFull.signal();
return x;
}

主要注意后面的通过 notFull.signal() 唤醒条件等待队列中的一个 put 线程。阻塞式的 take() 方法略。

7.5 ConcurrentHashMap

在 Java 7 之前版本 ConcurrentHashMap 采用分段锁实现,数据分为一段一段的,每段分配一把锁,当一个线程访问其中一段数据的时候,其他段的数据能被正常访问,实现了真正的并发访问;Java8对内部存储结构进行了优化,性能进一步提升。

7.5.1 HashMap 和 HashTable 的问题

HashMap 不是线程安全的,多线程环境下,HashMap 的 put() 操作可能会引起死循环,导致CPU使用率接近100%。于是JDK提供了线程安全的Map-HashTable,它使用几乎与 HashMap 几乎一样区别有2点:

  • HashTable 不允许key 和value 为null
  • HashTable 的包括 get/set 在内的方法都是用 synchronized 来保证线程安全,对整个 Hash 表锁定,但是代价会非常大的

7.5.2 Java 1.7 及以前版本的 ConcurrentHashMap

分段锁是一种锁设计,并不是具体的锁。对于 ConcurrentHashMap 而言,分段锁技术将 key 分成一个个小 segment 存储,给每段数据一把锁,当一个线程占用锁访问其中一段数据时,其他段数据也能被其他线程访问,实现真正的并发。

这个原理已经比较了解了,这里就不按照书本的章节走,略过了

7.5.4 JDK 1.8版本 ConcurrentHashMap 的结构

1.7 版本虽然通过 segment 方式实现了并发热点分离,默认情况下将一个table 分裂成 16 个小的 table(Segment表示),从而在 Segment 维度实现并发。但是这样并发粒度还不够细。1.8 版本抛弃了 Segment 分段锁机制,存储结构采用数组+链表或者红黑树的组合方式,将并发粒度细化到每一个桶,进一步细化了热点,利用 CAS + Synchronized 来保证并发更新安全。

JDK 1.7 的 ConcurrentHashMap 每个桶为链表结构,1.8 引入了红黑树结构,当桶的节点超过阈值(默认64)时,自动将链表结构转换为红黑树,可以理解为将链式桶转为树状桶。这样的好处在于,访问的时候只需要对一个桶锁定,而不需要将整个 Map 集合都进行粗粒度锁定。事实上,引入红黑树的一个原因是:链表查询复杂度 O(n) ,红黑树查询复杂度 O(log(n))

一个1.8版本的ConcurrentHashMap实例内部结构

7.5.5 ConcurrentHashMap的核心原理-1.8 版本

JDK 1.8版本的ConcurrentHashMap中通过一个 Node<K,V>[] 数组table 来保存添加到哈希表中的桶,在同一个 Bucket 位置是通过链表和 红黑树的形式来保存的,但是 table 是懒加载的,只有在第一次添加元素的时候才会初始化。它的主要成员属性大致如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class ConcurrentHashMap<K,V> extends
AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable {

private static final int MAXIMUM_CAPACITY = 1 << 30;
private static final int DEFAULT_CAPACITY = 16;
static final int TREEIFY_THRESHOLD = 8;
static final int UNTREEIFY_THRESHOLD = 6;
static final int MIN_TREEIFY_CAPACITY = 64;
//常量:表示正在转移
static final int MOVED = -1;
// 常量:表示已经转换成树
static final int TREEBIN = -2;
// 常量:hash for transient reservations
static final int RESERVED = -3;
// 常量:usable bits of normal node hash
static final int HASH_BITS = 0x7fffffff;
//数组,用来保存元素
transient volatile Node<K,V>[] table;
//转移时用的数组
private transient volatile Node<K,V>[] nextTable;
/**
* 用来控制表初始化和扩容的控制属性
*/
private transient volatile int sizeCtl;

// 省略其他
}

重要属性介绍如下:

  • table用于保存添加到哈希表中的桶
  • DEFAULT_CAPACITY: table的默认长度。默认初期长度为16,在第一次添加元素时,会将table初始化成16个元素的数组
  • sizeCtl:sizeCtl用来控制table的初始化和扩容操作的过程

涉及修改 sizeCtl 的方法有 5 个:

  • initTable():初始化哈希表时,涉及sizeCtl的修改
  • addCount():增加容量时,涉及sizeCtl的修改
  • tryPresize():ConcurrentHashMap扩容方法之一
  • transfer():table数据转移到 nextTable,扩容操作的核心在于数据的转移,把旧数组中的数据前一到新的数组。ConcurrentHashMap可以利用多线程来协同扩容,简单说是把 table 数组当做多个线程之间共享的任务队列,然后通过维护一个指针来划分每个线程锁负责的取件,一个已经迁移完的 Bucket 会被替换为一个 ForwardingNode 节点,标记当前 Bucket 已经被其他线程迁移完成。
  • helpTransfer():并发添加元素时,如果正在扩容,其他线程会帮助扩容,也就是多线程扩容。

7.5.6 JDK 1.8 版本 ConcurrentHashMap的核心源码

下面来看JDK 1.8版本ConcurrentHashMap的put()操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
public V put(K key, V value) {
return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
//自旋:并发情况下,也可以保障安全添加成功
for (Node<K, V>[] tab = table; ; ) {
Node<K, V> f;
int n, i, fh;
if (tab == null || (n = tab.length) == 0) {
//第一次添加,先初始化node数组
tab = initTable();
} else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//计算出table[i]无节点,创建节点
//使用Unsafe.compareAndSwapObject 原子操作table[i]位置
//如果为null,就添加新建的node节点,跳出循环
//反之,再循环进入执行添加操作
if (casTabAt(tab, i, null, new Node<K, V>(hash, key, value, null))) {
break;
}
} else if ((fh = f.hash) == MOVED) {
//如果当前处于转移状态,返回新的tab内部表,然后进入循环执行添加操作
tab = helpTransfer(tab, f);
} else {
//在链表或红黑树中追加节点
V oldVal = null;
//使用synchronized 对 f 对象加锁
// f = tabAt(tab, i = (n - 1) & hash) : table[i] 的node对象(桶)
//注意:这里没用ReentrantLock,而是使用 synchronized 进行同步
//在争用不激烈的场景中,synchronized 的性能和 ReentrantLock不相上下
synchronized (f) {
if (tabAt(tab, i) == f) {
//在链表上追加节点
if (fh >= 0) {
binCount = 1;
for (Node<K, V> e = f; ; ++binCount) {
K ek;
if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent) {
e.val = value;
}
break;
}
Node<K, V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K, V>(hash, key, value, null);
break;
}
}
}

//在红黑树上追加节点
else if (f instanceof TreeBin) {
Node<K, V> p;
binCount = 2;
if ((p = ((TreeBin<K, V>) f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent) p.val = value;
}
}
}
}

if (binCount != 0) {
//节点数大于临界值,转换成红黑树
if (binCount >= TREEIFY_THRESHOLD) {
treeifyBin(tab, i);
}

if (oldVal != null) {
return oldVal;
}
break;
}
}
}
addCount(1L, binCount);
return null;
}

从源码可以看出,使用 CAS 自旋完成桶的设置时,使用 synchronized 内置锁保证桶内并发操作的线程安全。尽管对同一个 Map 操作的线程争夺会非常激烈,但是在同一个桶内的线程争夺通常不会很激烈,所以使用 CAS 自旋、synchronized 的偏向锁或轻量级锁 不会降低 ConcurrentHashMap 的性能。为什么不用显式锁 ReentrantLock 呢?因为如果为每个桶都创建一个 ReentrantLock 实例,就会带来大量的内存消耗,而前面那些方法带来的内存消耗微乎其微。

get方法也没有加锁操作,与 JDK1.7差不多,就不赘述了。

1.2 异步程序设计的关键问题

1.2.1 结果传递

不同于同步调用,异步调用是立即返回的,因此被调用方的逻辑通常存在2种情形:

  • 结果尚未就绪:进入任务执行的状态,等结果就绪后通过回调传递给调用方

  • 结果已经就绪,可以立即提供结果

用图片获取作为例子,上述2种情况代码示意如下所示(这是我个人觉得理解Kotlin协程实现最巧妙的例子了,其实后面的本质就是这个):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 方法实现
fun asyncBitmap(url: String, callback: (Bitmap) -> Unit): Bitmap {
val bitmap = asyncBitmap(url) {
show(it) //...② 异步请求
}

if (bitmap != null) {
show(bitmap) //... ① 直接返回了
} return when (val bitmap = Cache.get(url)) {
null -> {
thread {
download(url)
.also { Cache.put(url, it) }
.also(callback)
}
}

else -> bitmap
}
}


//使用方法,如果有同步返回,就使用同步返回,否则依赖异步回调
@JvmStatic
fun main(args: Array<String>) {
val bitmap = asyncBitmap(url) {
show(it) //...② 异步请求
}

if (bitmap != null) {
show(bitmap) //... ① 直接返回了
}
}

Kotlin 协程的挂起函数 (suspend function) 本质上就是采取了这个异步返回值的设计思路

1.2.2 异常处理

我们希望异步的任务在 suspend 的时候执行,执行完成后 resume 回来,之后再同步判断是否发生了异常。这样逻辑就会简单很多。异步逻辑同步化也是 Kotlin 协程要解决的问题。

1.2.4 复杂分支

为同步操作添加分支甚至循环操作是很容易的,比如获取图片是同步的情况下,获取多个图片:

1
val bitmap = urls.map { syncBitmap(it) }

在上述情况下我们甚至还能方便地通过一个 try-catch 捕捉所有异常。而当获取图片的操作是异步的情况时,就变得复杂,还需要用到一些同步工具来辅助:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
val countDownLatch = CountDownLatch(urls.size)

val map = urls.map { it to EMPTY_BITMAP }
.toMap(ConcurrentHashMap<String, Bitmap>())

urls.map { url ->
asyncBitmap(url, onSuccess = {
map[url] = it
countDownLatch.countDown() //...②
}, onError = {
showError(it)
countDownLatch.countDown()//...③
})
}

countDownLatch.await() //...①

//获取所有的图片 bitmap
val bitmaps = map.values

这里还需要借助 CountDownLatch 门闩 来实现,因此在 ① 处会阻塞当前线程,直到所有回调的 ② 或 ③ 位置执行之后,才会执行获取所有图片的操作:val bitmaps = map.values ,总之,这个过程还是挺复杂的。

EMPTY_BITMAP 是一个空的 Bitmap 对象,为什么要这么做呢?因为 ConcurrentHashMap 不允许 value 为空!

1.3 常见异步程序设计思路

1.3.1 Future

但是 Future.get() 方法会造成当前调用阻塞。

1.3.2 CompletableFuture

它的 get() 方法无需阻塞,异步调用不阻塞主流程调用但是结果脱离了主调用流程(需要回调获取)

1.3.3 JavaScript 的 Promise——async/await

比较完好地实现了需求,他们 async/await 的语义稍微不同。

1.3.5 Kotlin协程

Kotlin 协程是为异步程序设计而生的。

有人称协程只是“一个线程框架”,认为协程就是用来切换线程的,显然有点“一叶障目不见泰山”了

Kotlin 协程用一个 suspend 关键字,包含了异步调用回调 两层含义。我们知道,所有异步回调对于当前调用流程只是一个挂起点。在这个挂起点可以做的事情非常多:既可以做异步回调,还可以添加调度器来处理线程切换,还可以作为协程取消响应的位置。

看一个 Kotlin 处理异步调用的例子:

1
2
3
4
5
6
7
8
9
10
suspend fun bitmapSuspendable(url: String): Bitmap =
suspendCoroutine<Bitmap> { continuation ->
thread {
try {
continuation.resume(download(url))
} catch (e: Exception) {
continuation.resumeWithException(e)
}
}
}

suspend 修饰函数,意味着支持同步化的异步调用。上述代码中,

  • continuation.resume(download(url)) 将正常的结果返回

  • continuation.resumeWithException(e) 则是将异常返回,在调用 bitmapSuspendable() 方法时,如果产生异常会把这个异常抛出

效果如下:

1
2
3
4
5
6
7
8
suspend fun main() {
try {
val bitmap = bitmapSuspendable("xxxx")
//todo 正常图片处理
} catch (e: Exception) {
//todo 异常处理
}
}

从 1.3.0 开始,开始支持 suspend fun main 作为函数入口了

前面介绍在激烈争用的情况下,CAS 自旋实现的轻量级锁会有两大问题:

  • CAS 恶性空自旋会浪费大量 CPU 资源
  • 某些架构 CPU 上可能会导致 “总线风暴“

解决这些问题的常见方案有 2 种:

  • 分散操作热点
  • 使用队列削峰

JUC 使用队列削峰方案解决 CAS 性能问题,提供了一个基于双向队列的削峰基类——抽象基础类 AbstractQueuedSynchronizer(抽象同步器类,简称 AQS) 。JUC 中许多类都是基于AQS构建:例如ReentrantLock、Semaphore、CountDownLatch、ReentrantReadWriteLock、FutureTask等。

6.1 锁与队列的关系

第5章介绍了 CLH ,它用了FIFO的单项队列;AQS 是 CLH 的一个变种,主要原理差不多,用的是 FIFO 的双向链表,这样做的好处就是可以从任意一个节点开始很方便地访问前驱和后继节点

6.2 AQS 的核心成员

6.2.1 状态标志位

AQS 中维持了一个单一的 volatile 修饰 int 类型的状态信息 state ,它标记了锁的状态,默认初始状态 0 为未锁定状态。同时,提供了 compareAndSetState 原子设置方法来设置 state 的值。

当线程 A 通过 tryAcquire() 获取到独占锁并将 state 加一后,其他线程通过 tryAcquire 获取锁就会失败(执行compareAndSet(0,1)会失败),直到 A 释放了锁为止,其他线程才能获取锁。

AQS 继承了 AbstractOwnableSynchronizer ,父类中有个当前占用该锁的线程的变量 exclusiveOwnerThread:

1
2
//表示当前占用该锁的线程
private transient Thread exclusiveOwnerThread;

6.2.2 队列节点类 Node

AQS 是一个虚拟队列,不存在队列实例,仅存在节点之间的前后关系,Node 的主要成员如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static final class Node {

//节点状态:值为SIGNAL、CANCELLED、CONDITION、PROPAGATE、0
//普通的同步节点的初始值为0,条件等待节点的初始值为CONDITION(-2)
volatile int waitStatus;

//节点所对应的线程,为抢锁线程或者条件等待线程
volatile Thread thread;

//前驱节点,当前节点会在前驱节点上自旋,循环检查前驱节点的waitStatus状态
volatile Node prev;

//后继节点
volatile Node next;

//若当前Node不是普通节点而是条件等待节点,则节点处于某个条件的等待队列上
//此属性指向下一个条件等待节点,即其条件队列上的后继节点
Node nextWaiter;
...
}

解释一下 waitStatus 变量中的几个值:

  • CONDITION :waitStatus 取这个值时,表示该线程(调用了Condition 的 awati 方法后)在条件队列中阻塞(Condition 有使用),当持有锁的线程调用了 Condition 的 signal() 方法后,节点会从该 Condition 的等待队列转移到该锁的同步队(也就是AQS的FIFO双向队列)列中去竞争锁。
  • PROPAGATE:waitStatus 取这个值时,表示下一个线程获取共享锁后,自己的共享状态会被无条件传播下去,因为共享锁可能出现有N个锁可用,这时直接让后面 N 个节点都来工作。这种状态在 CountDownLatch 中用到了

6.3.1 模板模式

这种模式值得看下,AQS 也使用这种模式

6.4 通过 AQS 实现一把简单的独占锁

基于 AQS 实现一个简单的非公平的独占锁 SimpleMockLock:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class SimpleMockLock implements Lock {
//同步器实例
private final Sync sync = new Sync();

// 自定义的内部类:同步器
// 直接使用 AbstractQueuedSynchronizer.state 值表示锁的状态
// AbstractQueuedSynchronizer.state=1 表示锁没有被占用
// AbstractQueuedSynchronizer.state=0 表示锁没已经被占用
private static class Sync extends AbstractQueuedSynchronizer {
//钩子方法
protected boolean tryAcquire(int arg) {
//CAS更新状态值为1
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

//钩子方法
protected boolean tryRelease(int arg) {
//如果当前线程不是占用锁的线程
if (Thread.currentThread() != getExclusiveOwnerThread()) {
//抛出非法状态的异常
throw new IllegalMonitorStateException();
}

//如果锁的状态为没有占用
if (getState() == 0) {
//抛出非法状态的异常
throw new IllegalMonitorStateException();
}
//接下来不需要使用CAS操作,因为下面的操作不存在并发场景
setExclusiveOwnerThread(null);
//设置状态
setState(0);
return true;
}

//显式锁的抢占方法
@Override
public void lock() {
//委托给同步器的acquire()抢占方法
sync.acquire(1);
}

//显式锁的释放方法
@Override
public void unlock() {
//委托给同步器的release()释放方法
sync.release(1);
}
// 省略其他未实现的方法
}
}

6.5 AQS 锁抢占的原理

文中前面讲了一大堆,实在没法梳理各个章节的联系,云里雾里的,直到用 ReentrantLock 来讲这个过程,就清晰了,所以前面一些内容略过。

直接以 ReentrantLock 抢锁来说明整个抢锁流程,ReentrantLock 有2种模式:公平锁 和 非公平锁。

6.8.1 ReentrantLock 非公平锁的抢占流程

ReentrantLock 为非公平所实现了一个内部的同步器——NonfairSync ,其显式锁获取方法 lock() 源码如下:

1
2
3
4
5
6
7
8
9
10
//非公平抢占
static final class NonfairSync extends Sync {
final void lock() {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
} else {
acquire(1);
}
}
}

非公平性就体现在这里:如果占用锁的线程刚释放锁,state 为 0,而队列中排队等待锁的线程还未唤醒,新来的线程就直接抢占了该锁,那么就插队了。举个例子:假设 A、B 线程在排队等锁,但是此时不在队列中的 C 直接进行 CAS 操作成功了,拿到锁开开心心返回了,那么 A、 B 只能乖乖看着。

6.8.4 ReentrantLock 公平锁的抢占流程

ReentrantLock 为公平所实现了一个内部的同步器——FairSync ,其显式锁获取方法 lock() 源码如下:

1
2
3
4
5
static final class FairSync extends Sync {
final void lock() {
acquire(1);
}
}

其核心思想是通过 AQS 模板方法 acquire 进行队列入队操作。

6.8.5 AQS模板方法 acquire(arg)

自己调整的章节,本来这节在前面,但是放前面又看不懂,用意也不太明确。

acquire(arg) 方法是 AQS 提供的利用独占的方式获取资源的方法,源码实现如下:

1
2
3
4
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

它的含义是:如果通过 tryAcquire(arg)方法尝试成功,则直接返回,表示已经抢到锁;否则,将线程加入等待队列。

6.8.6 AQS模板方法 tryAcquire(arg)

在 ReentrantLock 中,在公平锁状态和非公平锁状态下, tryAcquire 的实现是不一样的。

  • 公平锁状态下会判断是否是队头,是队头就允许CAS获取锁;如果不是就判断是否是重入,重入允许进入;否则就返回了false了(自己看代码总结的);
  • 非公平状态下还是会直接 CAS 抢锁了,不管队头这些了,这也是非公平锁的行为体现

5.1 显式锁

使用 Java内置锁 时,无需通过 Java 代码显式地对同步对象的监视器进行抢占和释放,使用起来非常方便。但是不具备一些比较高级的锁功能:

  • 限时抢锁:设置超时时长,不至于无限等下去

  • 可中断抢锁:抢锁时,外部线程给抢锁线程发一个中断信号,就能唤起等待锁的线程,并终止抢占过程

  • 多个等待队列:为锁维持多个等待队列,以提高锁的效率。比如生产者-消费者模式中,生产者和消费者公用一把锁,锁上维持2个队列:一个生产队列和一个消费者队列

5.1.3 使用显式锁的模板代码

因为 JUC 中的显式锁都实现了 Lock 接口,所以不同类型的显式锁对象使用的方法都是模板化的、套路化的,模板代码如下:

1
2
3
4
5
6
7
8
9
10
11

//创建锁对象,SomeLock 为 Lock 的某个实现类,如 ReentrantLock
Lock lock = new SomeLock();
//step 1: 抢占锁
lock.lock;
try {
//step2 : 抢锁成功,执行临界区代码
doSomething();
} finally {
lock.unlock(); //step3: 释放锁
}

模板代码有几个需要注意的点:

  • 释放锁操作 unlock 必须在 try-catch 的finally 中执行,否则如果临界区代码抛出异常,锁就可能永远也得不到释放了

  • 抢占锁的操作lock 必须在 try 语句之外,原因:lock 方法不一定能够抢锁成功(我猜测作者是想说 tryLock() 之类的方法不一定会获取成功),如果没有抢占到锁,也肯定不需要释放锁,在没有占有锁的情况下释放锁可能导致异常

  • 在抢占锁操作 lock 和 try 语句之间不要插入任何代码,避免抛出异常而无法执行到 try,进而无法释放锁。

5.1.4 基于显式锁进行“等待-通知”方式的线程间通信

Java 内置锁可以通过 Object 的 wait 和 notify 方法来实现简单的线程间通信,与此类似的是,基于 Lock 显式锁,JUC 也提供了一个用于线程间通信的接口 Condition

Condition 接口有2类主要方法:

  • await() : 在功能上与 Object.wait() 语意等效,线程会加入 await() 等待队列,并释放当前锁
  • signal() : 在功能上与 Object.notify() 语意等效,唤醒 await() 等待队列中的线程

为了避免与 Object 中的 wait/notify 2类方法在使用时发生混淆,JUC 对 Condition 接口方法改了名称,成为了 await/signal。Condition 对象的 signal 和同一个对象的 await 是一一配对使用的

Condition 对象是基于显式锁的,所以不能独立创建 Condition 对象,可以通过 lock.newCondition() 方法获取一个与当前显式锁绑定的 Condition 对象。用法举例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();

class WaitTarget implements Runnable {
@Override
public void run() {
lock.lock();//抢锁
try {
print("开始等待");
condition.await();//开始等待,并且释放锁
print("收到通知,开始继续执行");
} finally {
lock.unlock();//释放锁
}
}
}

使用 await 方法前必须要先获取锁,await 方法会让当前线程加入 Condition 的等待队列,同理, signal 方法也要在获取锁之后才能调用,调用 signal 之后一定要释放锁,只有这样被唤醒的等待线程才能抢锁。

5.1.5 LockSupport

LockSupport 是JUC 提供的一个 线程阻塞与唤醒的工具类。大体有2类方法(阻塞和唤醒):

1
2
3
4
5
6
// 无限期阻塞当前线程
public static void park();

// 唤醒某个被阻塞的线程
public static void unpark(Thread thread);

一个简单的演示的实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class WaitTarget implements Runnable {
@Override
public void run() {

try {
print("即将进入阻塞");
LockSupport.park();//阻塞当前线程
if (Thread.currentThread().isInterrupted()) {
print("被中断了");
} else {
print("被重新唤醒");
}

} finally {
lock.unlock();//释放锁
}
}
}

5.1.5.1 LockSupport.park() 的对比(自己改的标题)

1、与 Thread.sleep() 的区别

  • Thread.sleep() 只能自己醒来,没法外部唤醒;LockSupport.park() 可以通过 unpark 唤醒
  • Thread.sleep() 声明了中断异常(InterruptedException) ,而LockSupport.park() 没有
  • 被中断的时候,虽然线程都会被设置中断标记,但是线程表现不同:sleep 会抛异常,park 不会

2、与 Object.wait() 的区别

  • wait 需要在同步块中执行,park 可以在任意地方执行
  • 当阻塞线程被中断时,wait 方法抛出中断异常;而park 不会抛出异常
  • 如果没有调用过 wait 而直接执行 notify 会导致 IllegalMonitorStateException异常;而未做park 直接做 unpark 不会有任何异常

自己看了下 LockSupport 的源码,发现 park 和 unpark 都是 native 方法,所以在代码层面就没对比了

5.1.6 显式锁分类

从多个维度分类: 可重入、悲观/乐观、公平、共享/独占、可中断/不可中断

5.2.2 通过 CAS 实现乐观锁

乐观锁通过 CAS 实现主要就是两个步骤:

  1. 冲突检测 (CAS 检测内存位置 V 的值是否为 A)
  2. 数据更新 (CAS 上述检测如果是,则将位置 V 更新为 B 值,否则不更改)

在实际使用中,仅仅进行一次 CAS 是不够的,一般情况下需要不断循环重试直到CAS 操作成功,也即自旋

乐观锁是一种思想,CAS 是这种思想的一种实现

作为演示,这里设计一个简单版本的不可重入(如果需要重入就count计数,这里不贴例子了)的自旋锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class MyLock implements Lock {

//当前锁的拥有者
private AtomicReference<Thread> owner = new AtomicReference<>();

@Override
public void lock() {
Thread t = Thread.currentThread();
//书中的例子这里是写错了,它写成 while(owner.compareAndSet(null, t))
while(!owner.compareAndSet(null, t)) {//循环竞争锁
//没获取到锁,让出cpu
Thread.yield();
}
}

@Override
public void unlock() {
Thread t = Thread.currentThread();
if(t == owner.get()) {
owner.set(null);
}
}
}

5.2.5 CAS 可能导致“总线风暴”

为了保障“缓存一致性”,不同的内核需要通过总线来回通信,使用 lock 前缀(用于内存屏障)指令的 Java 操作(比如CAS、volatile)会产生缓存一致性流量,很多线程同时执行lock前缀操作时,会在总线上产生过多的流量,也就是 “总线风暴”。

那么,基于 JUC 实现的轻量级锁怎么避免总线风暴?答案是:使用队列对抢锁线程进行排队。

5.2.6 CLH自旋锁

CLH锁就是一种基于队列排队的自旋锁(由3个发明人的名字命名的),AQS 也是基于这种原理,为了说明其原理,这里实现一个 CLH 锁的学习版本,并不是真正的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public class CLHLock implements Lock {
/**
* 当前节点的线程本地变量
*/
private static ThreadLocal<Node> curNodeLocal = new ThreadLocal();
/**
* CLHLock队列的尾部指针,使用AtomicReference,方便进行
* CAS操作
*/
private AtomicReference<Node> tail = new AtomicReference<>(null);

public CLHLock() {
//设置尾部节点
tail.getAndSet(Node.EMPTY);
}

//加锁操作:将节点添加到等待队列的尾部
@Override
public void lock() {
Node curNode = new Node(true, null);
Node preNode = tail.get();
//CAS自旋:将当前节点插入队列的尾部
while (!tail.compareAndSet(preNode, curNode)) {
preNode = tail.get();
}
//设置前驱节点
curNode.setPrevNode(preNode);

// 自旋,监听前驱节点的locked变量,直到其值为false
// 若前驱节点的locked状态为true,则表示前一个线程还在抢占或者占有锁
while (curNode.getPrevNode().isLocked()) {
//让出CPU时间片,提高性能
Thread.yield();
}
// 能执行到这里,说明当前线程获取到了锁
// Print.tcfo("获取到了锁!!!");
//将当前节点缓存在线程本地变量中,释放锁会用到
curNodeLocal.set(curNode);
}

//释放锁
@Override
public void unlock() {
Node curNode = curNodeLocal.get();
curNode.setLocked(false);
curNode.setPrevNode(null); //help for GC
curNodeLocal.set(null); //方便下一次抢锁
}

//虚拟等待队列的节点
@Data
static class Node {
public Node(boolean locked, Node prevNode) {
this.locked = locked;
this.prevNode = prevNode;
}

// true:当前线程正在抢占锁,或者已经占有锁
// false:当前线程已经释放锁,下一个线程可以占有锁了
volatile boolean locked;
// 前一个节点,需要监听其locked字段
Node prevNode;

// 空节点
public static final Node EMPTY = new Node(false, null);
}
// 省略其他代码
}

CLH 算法的几个要点就是(我理解的是,这种步骤就是公平锁环境下弄的,非公平锁不会每次头节点获得锁):

  1. 初始状态队列尾部(tail)指向一个 EMPTY节点,tail 节点使用 AtomicReference 类型是为了让多线程并发操作时安全
  2. Thread 在抢锁时会创建一个 Node 加入等待队列尾部(默认lock 属性为true),同时自己作为新的尾部,这些操作通过 CAS 自旋操作
  3. Node 加入之后,会循环判断前去节点的 lock 属性是否为false,如果为false,即前驱节点释放了锁,当前节点获得了锁
  4. 当前node 获得锁之后,将locked 属性设置为true
  5. 临界区代码执行完毕后,当前节点的 locked 置为 false,方便后续节点获取锁

5.4.2 死锁的监测与中断

JDK 8 中包含一个 ThreadMXBean 接口,提供多种监视线程的方法:

  • findDeadlockedThreads :用于检测由于抢占JUC显式锁、Java内置锁引起死锁的线程。
  • findMonitorDeadlockedThreads:仅仅用于检测由于抢占Java内置锁引起死锁的线程。

5.5 共享锁与独占锁

JUC 中的共享锁包括 Semaphore(信号量)、ReadLock(读写锁中的读锁)、CountDownLatch 倒数闩

5.5.2 共享锁 Semaphore

Semaphore 可以用来控制在同一时刻共享资源的线程数量,维护了一组虚拟许可。将 Semaphore 称为一个许可管理器 更形象。

5.5.2.1 Semaphore 使用示例

Semaphore 使用一个很形象的场景是银行排队办理业务,只有 N 个窗口,M 个人在排队,那么其实相当于有 N 个许可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
final Semaphore semaphore = new Semaphore(N);

Runnable r = () -> {
try {
//阻塞开始获取许可
semaphore.acquire(1);
//获取了一个许可
print("业务办理中");
//模拟业务操作: 处理排队业务
Thread.sleep(1000);
//用完了释放许可
semaphore.release(1);
} catch(Exception e) {
e.printStackTrace();
}
}

5.5.3 共享锁 CountDownLatch

CountDownLatch 功能相当于一个多线程环境下的倒数门闩,它可以指定一个计数值,在并发环境下由线程进行减1操作,当计数变为 0 之后,被 await 阻塞的线程将会唤醒。

它的一个经典示例就是,司机开车之前需要每个人报数,报数到 100 后说明人到齐发车:

1
2
3
4
5
6
7
8
9
10
11
12
13
CountDownLatch doneSignal = new CountDownLatch(N);

for (int i = 1; i <= N; ++i) {// 启动报数任务
threadPoll.execute(new Runable() {
public void run() {
print("第" + i + "个人已到");
//倒数闩减1
doneSignal.countDown();
}
});
}
doneSignal.await(); //step2:等待报数完成,倒数闩计数值为0
print("人到齐,开车");

5.6 读写锁

读写锁的读和写操作的互斥原则如下:

  • 读读能共存
  • 读写不能共存
  • 写写不能共存

JUC 包中的读写锁接口为 ReadWriteLock ,主要有2个方法:

1
2
3
4
5
6
7
8
9
10
11
public interface ReadWriteLock {
/**
* 返回读锁
*/
Lock readLock();

/**
* 返回写锁
*/
Lock writeLock();
}

其主要实现类为 ReentrantReadWriteLock ,与 ReentrantLock 相比,前者更适合 读多写少 的场景,而 ReentrantLock 适合 读写比例相差不大 的场景

5.6.3 StampedLock 印戳锁

StampedLock 是对 ReentrantReadWriteLock 读写所的一种改进,主要改进为: 在没有写只有读的场景,StampedLock 支持不用加读锁而是直接进行读操作,最大限度提升读的效率,只有发生过写操作后,再加读锁才能进行读操作。

4.1 CPU物理缓存结构

L1缓存离 CPU 最近也最快,一般就 32k/64k水平;L2 缓存速度次之,容量一般比L1大;L3级缓存最大,比前面二者都大,速度也最慢,大小可能 12M 的水平。

4.2 并发编程的三大问题

三大问题分别是: 原子性、可见性、有序性 问题。必须要保证这 3 个,只要有一个没保证,在多线程情况下就可能不正确。

后续的略。

4.3 硬件层的MESI协议原理

为了缓解内存速度和CPU速度差问题,现在计算机都会为CPU添加高速缓存,每个CPU内核都有自己的一级、二级高速缓存,同一个CPU多个内核之间共享一个三级高速缓存。

4.3.1 总线锁和缓存锁

CPU的处理流程为:现将计算需要用到的数据缓存到CPU的高速缓存中,CPU计算时,直接从高速缓存获取数据并在计算完成后写会高速缓存,整个运算完成后再把高速缓存的数据同步回主存。由于每个线程可能运行在不同的CPU内核中,因此同一份数据可能被缓存到多个CPU内核中,就会发生内存可见性问题

后续的略。

4.4.1 重排序

编译器为什么要重排序?其目的为: 与其等待阻塞指令(如等待缓存刷入),不如先去执行其他指令。另外,CPU层面也有重排序。

4.5 JMM 详解

JMM (Java Memory Model ,Java 内存模型) 并不像JVM 内存结构一样是真实存在的运行实体,更多体现为一种规范和规则。

JMM 定义了一组规则或规范,该规范定义了一个线程对共享变量写入时,如何确保对另一个线程是可见的,实际上 JMM 提供了合理的禁用缓存以及禁止重排序的方法,所以其核心价值在于解决可见性和有序性。它的另一大价值在于:屏蔽各大硬件和操作系统差异,保证 Java 程序在各大平台堆内存访问是一致的。

JMM 规定所有的变量都存储在主存(类似于物理内存,但是有区别)中,每个 Java 线程都有自己的工作内存(类似于CPU高速缓存,但有区别)。

JMM 提供了一套自己的方案解决可见性和有序性问题,包括 volatile、synchronized、final 等。

4.7 volatile 不具备原子性

对于关键字 volatile 修饰的内存可见变量而言,具有2个重要的语义:

  • 使用 volatile 修饰的变量在变量的值发生改变时,会立刻同步到主存,并使其他线程的变量副本失效

  • 禁止指令重排序

使用++操作说明volatile不具备重排序功能:

  1. A、B线程分别运行在Core1 和 Core2 核上,假设此时共享value 的值为 0,现在线程 A、B 都读取value值到自己的工作内存上

  2. 线程 A 将 value 的值变为 1,完成了 assign、store 操作,假设在执行 write 指令前 A 的时间片用完,线程 A 被空闲但是 write 操作还没达到主存,但是呢, store 操作触发了写的信号,导致了 B 缓存过期

  3. B重新从主存读到 value,可想而知这时候还是 0

  4. 线程 B 执行完所有操作,将 value 值变成 1 写入主存

  5. 线程 A 重新拿到时间片,将过期了的 1 写入主存

所以,对于复合操作,volatile无法保障原子性,如果要保证复合操作的原子性,就需要用到锁

JVM 的Synchrod 轻量级锁使用 CAS 进行自旋抢锁,并且处于用户态下,所以轻量级锁开销较小。

3.1 什么是 CAS

JDK 5 增加的 JUC (java.util.concurrent) 并发包对操作系统的底层 CAS 原子操作进行了封装,为上层提供了 CAS 操作的 API 。

3.1.1 Unsafe 类中的 CAS 方法

Unsafe 是位于 sun.misc 包下的一个类,主要提供一些用于执行低级别、不安全的底层操作,如直接访问系统内存资源、自主管理内存资源等。从名字都可以看出这个类对普通程序员来说是“危险”的,官方也不建议直接在程序中使用这些类。

获取 Unsafe 实例

Unsafe 类时一个final 修饰的不允许继承的类,并且构造函数是 private 类型,源码如下:

1
2
3
4
5
6
7
8
9
10
11
public final class Unsafe {

static {
Reflection.registerMethodsToFilter(Unsafe.class, Set.of("getUnsafe"));
}

private Unsafe() {}

private static final Unsafe theUnsafe = new Unsafe();
...
}

所以我们无法在外部对 Unsafe 实例化,那么应该怎么获取呢?可以通过反射方式获取 theUnsafe 实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class JvmUtil {
//自定义地获取Unsafe实例的辅助方法
public static Unsafe getUnsafe() {
try {
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
return (Unsafe) theUnsafe.get(null);
} catch (Exception e) {
throw new AssertionError(e);
}
}
// 省略不相干代码
}
Unsafe提供的 CAS 方法

总共提供了如下3种方法:

1
2
3
4
5
public final boolean compareAndSwapObject(Object o, long offset, Object expected,Object x) 

public final boolean compareAndSwapInt(Object o, long offset, int expected, int x)

public final boolean compareAndSwapLong(Object o, long offset,long expected, long x)

这些方法首先将内存位置的值与预期值比较,如果相匹配,那么CPU 会自动将该内存位置的值更新为新值,并返回true;否则,CPU不做任何操作,并返回false。

3.1.2 使用 CAS 进行无锁编程

CAS 是一种无锁算法,底层CPU 利用原子操作判断 内存值与期望值是否相等,如果相等就给内存地址赋新值,否则不做任何操作。使用 CAS 进行无锁编程的步骤大概如下:

  1. 获得字段的期望值(oldValue)

  2. 计算出需要替换的新值(newValue)

  3. 通过 CAS 将 newValue 放在字段的内存地址上,如果 CAS 失败就重复从第1步开始,直到 CAS 成功。这种重复俗称“自旋”

举例: 假如 2 个线程 A 和B 对一个共享变量做 +1 操作,用 CAS 去做这个操作。但是线程是并发进行的,假如 A 和 B 都读到旧值是 1 ,然后并发通过 CAS 操作,都是 CAS(1, 2) ,但是CAS 是原子操作,同一个内存地址的 CAS 在同一个时刻只能执行一个,因此,假设 A 先执行,A 的 CAS(1, 2) 因为期望值是1,内存值也是1,操作成功,返回true;接下来 B 执行 CAS(1, 2) 肯定会失败了,因为内存值目前已经是 2 了,而期望值是 1 ,所以只得重新获取得到期望值 2,计算出新的值 3, 最后通过 CAS(2, 3) 才能成功。

3.2 JUC原子类

并发执行时,诸如 ++ 或者 – 类的运算不具备原子性,大家可能会用 synchronized 方法做同步,但效率肯定会影响的。JDK 为这些类型不安全的操作提供了一些原子类,与 synchronized 相比效率会更高。

3.2.1 JUC中的Atomic 原子操作包

只需要知道有:

  • 基本原子类:AtomicInteger、AtomicLong、AtomicBoolean

  • 数组原子类:AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray

  • 引用原子类:AtomicReference、AtomicMarkableReference、AtomicStampedReference

等等一些常见的即可。

3.4 ABA问题

什么是 ABA 问题,举个例子:比如线程 A 从内存位置 M 取出 V1,另一个线程 B 也取出 V1 ,假设 B 进行了一些操作后将 M 位置的数据 V1 变成了 V2,然后又在一些操作之后将 V2 变成了 V1,然后线程 A 通过 CAS 操作时发现条件满足,CAS 操作成功。

但是这个过程是有问题的,A 操作的时候 V1 已经不是以前的 V1 了,这就是 ABA 问题。

3.4.2 ABA问题的解决方案

很多乐观锁的实现版本是: 使用版本号(Version)方式来解决ABA问题。 每次在执行数据的修改操作时都会带上一个版本号,版本号和数据的版本号一致就可以执行修改操作,否则执行失败。因为操作的版本号只会增加,不会减少。

当然,参考乐观锁的版本号实现, JDK 提供了一个 AtomicStampedReference 类来解决 ABA 问题,AtomicStampedReference 在 CAS 的基础上增加了一个 Stamp(印戳或标记)来察觉数据是否发生了变化。

当然,还可以使用 AtomicMarkableReference 解决。它是 AtomicStampedReference 的简化版,不关心修改过几次,只关心是否修改过。

3.5 提升高并发场景下 CAS 操作的性能

在竞争激烈的场景下,会导致大量的 CAS 自旋,比如大量线程同时并发修改一个 AtomicInteger 是,很多线程可能不停地自旋, 这浪费了大量的 CPU。

3.5.1 以空间换时间:LongAdder

AtomicLong 使用内部变量 value 保存着实际的 long 值,所有操作都是针对该 value 的,也就是说,当高并发的情况下,value 变量其实是一个热点,N 个线程竞争这一个热点,重试的线程越多,意味着 CAS 失败的概率越高。

LongAdder 的核心思想是热点分离,与 ConcurrentHashMap 的设计思想类似:将 value 值分离成一个数组,当多线程访问时,通过 Hash 算法将线程映射到数组的一个元素进行操作;而获取最终value 结果时,则将数组的元素求和。

具体一点:LongAdder 将 value 值分散到一个数组中,不同的线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行 CAS 操作,这样热点就被分散了,这样,即使线程数再多也不担心,各个线程分配到多个元素更新。如果要获取完整的 LongAdder 存储的值,只要将各个槽中的变量值累加即可

在 CAS 竞争非常激烈的场景, LongAdder 的性能可达到 AtomicLong 的 8 倍。

3.6 CAS在JDK中的广泛应用

3.6.1 CAS操作的弊端和规避措施

CAS 操作弊端主要有下面 3 点:

  • ABA问题,解决思路: 添加版本号、使用 AtomicStampedReference、AtomicMarkableReference,其中前者比较常用

  • 只能保证一个共享变量的原子操作。解决思路:将多个共享变量合并成一个共享变量来操作

  • 开销问题。解决思路:分散操作热点(如 LongAdder)、使用队列削峰(将 发生 CAS 争用的线程加入一个队列中排队,降低 CAS 争用的激烈程度,JUC 中非常重要的基础类 AQS 就是这么做的!)

3.6.2 CAS 在JDK中的应用

CAS 在 在java.util.concurrent.atomic包中的原子类、Java AQS 以及 显式锁、ConcurrentHashMap 等重要并发容器中都有非常广泛的应用

2.2 synchronized 关键字

synchronized 方法和 synchronized 同步块有什么区别呢?总体来说 synchronized 代码块是一种细粒度的并发控制,处于块之外的代码可以被多个线程并发访问。而如下代码本质上都是一样的,都是锁住当前对象:

1
2
3
4
5
6
7
8
9
public void plus() {
synchronized(this){ //对方法内部全部代码进行保护
amount++;
}
}

public synchronized void plus() {
amount++;
}

2.2.3 静态的同步方法

Class 没有公共的构造方法,Class 对象是在类加载的时候由 Java 虚拟机调用类加载器中的 defineClass 方法自动构造的,因此不能显式地声明一个 Class 对象。

普通的 synchronized 实例方法,其同步锁是当前对象 this 的监视锁,如果某个 synchronized 方法是static 方法,其同步锁又是什么呢?答案是:类对应的 Class 对象的监视锁。

在代码执行完毕或者程序出现异常,synchronized 持有的监视锁都会正常释放,所以无需手动释放。

2.4 Java对象结构与内置锁

Java 内置锁很多重要信息都存放在对象结构中。

2.4.1 Java 对象结构

Java 对象结构包括三部分:

  • 对象头:包括3个字段:Mark Word(存储GC标记位、锁状态)、类对象指针(存放方法区Class对象的地址,能确定该对象是哪个类的实例)、Array Length(如果对象是Java 数组,那么就是数组长度;如果不是数组,就不存在这字段)
  • 对象体:包括成员属性,包括父类的成员属性
  • 对齐字节:填充对齐,用来保证对象所占内存字节数为8的倍数

2.4.2 Mark Word 的结构信息

从Mark Word 锁标志位的状态来看,内置锁的状态就有了 4 种: 无锁、偏向锁、轻量级锁、重量级锁,这4种状态会随着竞争的激烈程度逐渐升级,并且是不可逆的过程,即不可降级。

2.4.3 使用 JOL 工具查看对象的布局

知道有 JOL 工具即可,略。

2.4.5 无锁、偏向锁、轻量级锁和重量级锁

JDK 1.6 以前,所有内置锁都是重量级锁,所以会在用户态和核心态之间频繁切换,所以代价很高。后续引入了 偏向锁和 轻量级锁,所以一共就有 4 种状态:无锁、偏向锁、轻量级锁、重量级锁。内置锁可以升级但是不能降级

2.5 偏向锁的原理与实战

原理:如果不存在线程竞争,那么线程获得锁之后就进入偏向状态:偏向锁标志位为 1,锁状态为 01。以后该线程获取锁时判断一下线程 ID 和标志位,就可以直接进入同步块,连 CAS 都不需要,从而提升性能。

但是,一旦有第二条线程需要竞争锁,偏向模式就立即结束,进入轻量级锁状态。这里需要好好理解下,感觉这句话不一定对,书中更准确的表述是:线程获取锁时,判断该偏向状态的锁的 ID 是不是自己的,如果是自己的,则直接进入同步块;否则,采用 CAS 操作将 Mark Word 中的偏向锁 ID 换成自己的,如果 CAS 操作成功,就获取偏向锁成功,执行同步块代码;如果 CAS 操作不成功,表示有竞争,抢锁线程被挂起,撤销占锁线程的偏向锁,然后将偏向锁膨胀为轻量级锁。

偏向锁的撤销

  1. 在一个安全点停止拥有锁的线程
  2. 遍历线程栈帧,找到并删除栈帧,使其变为无锁状态,修复锁指向的 Mark Word ,并清除锁 Mark Word 中的线程 ID
  3. 将当前锁升级为轻量级锁
  4. 唤醒当前线程

2.6 轻量级锁的原理与实战

轻量级锁是一种自旋锁,希望在应用层面通过自旋解决线程同步问题。轻量级锁的执行过程:

抢锁线程进入临界区之前,如果内置锁没有被锁定,JVM 首先将在抢锁线程的栈帧中创建一个锁记录(Lock Record),用于存储对象目前的 Mark Word 的拷贝

然后抢锁线程将使用 CAS 自旋操作,尝试将内置锁对象头的 Mark Word 的ptr_to_lock_record(锁记录指针)更新为抢锁线程栈帧中拷贝的 Mark Word ,如果这个更新执行成功,线程就拥有了这个对象锁,之后会改掉 Mark Word 中的lock 标记为 00,即轻量级锁

为什么要拷贝呢?因为内置锁对象的 Mark Word 结构会有所变化,而不再存着无锁状态下的一些信息,所以要拷贝。

2.6.3 轻量级锁的分类

轻量级锁分为 2 种:

  • 普通自旋锁: 抢锁线程一直在自旋,而不是被阻塞,直到占有锁的线程释放之后抢锁线程才能获取到锁
  • 自适应自旋锁:自旋次数不是固定的,而是根据系统以前的经验来的。解决的是锁竞争时间不确定的问题

2.7 重量级锁的原理与实战

在 JVM 中,每个对象都关联一个监视器,这里的对象包括 Object 实例和 Class 实例。监视器是一个同步工具,相当于一个许可证:拿到许可证的线程可以进入临界区执行,没有拿到的则需要阻塞等待。

2.7.1 重量级锁的核心原理

HotSpot 虚拟机中,监视器是由 C++ 类 ObjectMonitor 实现的,它有以下几个比较关键的属性:

Owner、WaitSet、Cxq、EntryList ,其中 Owner 所指向的线程为获得锁的线程,WaitSet、Cxq、EntryList 是 3 个队列,用于存放抢夺重量级锁的线程:

  • Cxq:竞争队列(Contention Queue),所有请求锁的线程首先被放在这个竞争队列中(不是真正的队列,只是由Node及其 next 指针逻辑构成,每次都通过 CAS 操作在头部新增节点,取元素从尾获取,因为只有 Owner 线程才能从队尾获取节点,所以,Cxq 出队无争用操作,是无锁结构)
  • EntryList: Cxq 中那些有资格成为候选资源的线程被移动到 EntryList。Cxq 会被线程并发访问,为了降低对 Cxq 的争用而建立了 EntryList。在 Owner 线程释放锁时,JVM 会从 Cxq 中迁移线程到 EntryList,并会指定 EntryList 中的某个线程(一般为 Head) 为 Ready Thread。 EntryList 作为候选竞争线程而存在(自己加的:但由于是非公平锁,所以这个 Ready Thread 不一定能得以执行,后续的说明非公平性会提及)。
  • WaitSet: 某个拥有锁的线程在调用 Object.wait() 方法之后将被阻塞,然后线程将被放置在 WaitSet 链表中。等到执行 Object.notify/notifyAll 唤醒之后,该线程又会回到 EntryList 中(注意不是 Cxq 中)。

Synchronized 的不公平性:在线程进入 Cxq 前,抢锁线程会先尝试通过 CAS 自旋获取锁,如果获取到就直接用了;获取不到,才进入 Cxq 队列,这对于已经进入 Cxq 队列的线程是不公平的。但是这由于避免了 Cxq 队列中线程唤醒——内核态到用户态的过程,节省了时间,提升了吞吐率

2.7.2 重量级锁开销

处于 Cxq、EntryList 以及 WaitSet 中的线程都处于阻塞状态,线程的阻塞或者唤醒都需要操作系统来帮忙,需要通过系统调用实现,进城需要从用户态切换到内核态,这种切换需要消耗很多时间,有可能比用户执行代码的时间还要长。

由于轻量级锁使用 CAS 进行自旋抢锁,而 CAS 操作都处于用户态下,不存在用户态和内核态的切换,因此轻量级锁的开销比较小。

2.8 偏向锁、轻量级锁与重量级锁的对比

总结一下,synchronized 的执行过程大致如下:

  1. 线程抢锁时,JVM 首先检测内置锁对象 Mark Word 的biased_lock(偏向锁标识)是否为1,lock (锁标志位)是否为01,如果都满足,说明内置锁对象为可偏向状态
  2. 如果内置锁对象为可偏向状态,JVM 检查 Mark Word 中线程 ID 是否为当前抢锁线程的 ID,如果是,标识抢锁线程处于偏向所状态,快速获得锁,开始执行临界区代码
  3. 如果Mark Word 中的线程 ID 不是当前抢锁线程,就通过 CAS 竞争锁。如果竞争成功,就将 Mark Word 中的线程 ID 设置为抢锁线程的 ID ,偏向锁标志设为 1 ,锁标志位设为 01,此时内置锁对象处于偏向锁状态,然后开始执行临界区代码
  4. 如果 CAS 竞争失败,说明发生了竞争,撤销偏向锁,进而升级为轻量级锁
  5. JVM 使用 CAS 将锁对象的 Mark Word 替换为抢锁线程的锁记录指针,如果成功,抢锁线程就获得锁;如果替换失败,就表示其他线程在竞争锁。那么 JVM 尝试使用 CAS 自旋替换抢锁线程的锁记录指针,如果自旋成功(抢锁成功),那么锁对象依旧处于轻量级锁状态。
  6. 如果JVM的CAS 替换锁记录指针自旋失败,轻量级锁就膨胀为重量级锁,后面等待锁的线程也要进入阻塞状态

3种锁的优缺点对比和适用场景如下表所示:

优点 缺点 适用场景
偏向锁 加解锁不需要额外消耗,和执行非同步方法仅存在纳秒级差距 如果线程间存在锁竞争,会带来额外的撤销锁操作 适用于只有一个线程访问的临界区场景
轻量级锁 竞争的线程不会阻塞,提高了程序的响应速度 抢不到锁的竞争线程会CAS自旋,消耗CPU 锁占用时间短,吞吐量低
重量级锁 线程竞争无需自旋,不消耗CPU 线程阻塞,响应时间慢 锁占用时间长,吞吐量高

2.9 线程间通信

多个线程共同操作共享的资源时,线程间通过某种方法互相告知自己的状态,以避免无效的资源争夺。线程间通信的方式可以有很多种:等待-通知、共享内存、管道流。

2.9.2 低效的线程轮询

轮询版本的生产者-消费者模型中,消费者每一轮消费,无论数据区是否为空,都需要进行数据区的询问和判断:

1
2
3
4
5
6
7
8
9
public synchronized IGoods get() throws Exception {
IGoods goods = null;
if (amount <= 0) {
Print.tcfo("队列已经空了!");
//数据区为空,直接返回
return null;
}
...
}

当数据区为空(amount <= 0)时,消费者无法取出数据,但是仍然做无用的询问工作,浪费了CPU的时间片。同理,对于生产者也会存在这样的问题:

1
2
3
4
5
6
7
public synchronized void add(T element) throws Exception {
if (amount.get() > MAX_AMOUNT) {
Print.tcfo("队列已经满了!");
return;
}
...
}

当数据区满时,生产者无法加入数据,这时执行add方法也浪费CPU的时间片。使用“等待-通知”方式进行生产者与消费者之间的线程通信可以避免这种浪费。

具体方法是:当数据区满时,给让生产者等待,当可以添加数据时,给生产者发通知,让生产者唤醒;消费者同理。具体操作为:消费者取出一个数据后,由消费者去唤醒等待的生产者;生产者加入一个数据后,由生产者唤醒等待的消费者。

2.9.3 wait 、notify 方法的原理

wait 方法

对象的 wait 方法作用就是让当前线程阻塞并等待被唤醒,wait 方法与对象监视器密切相关,使用时一定要放在同步块中:

1
2
3
4
5
synchronized(locko) {
...
locko.wait();
....
}

其原理大致如下:

  • 线程调用了 locko 的wait 方法后,JVM 会将当前线程假如 locko 监视器的 WaitSet(等待集) 中,等待被其他线程唤醒

  • 当前线程会释放 locko 对象监视器 的 Owner 权利,让其他线程可以抢夺 locko 对象的监视器

  • 让当前线程等待,其状态变为 WAITING

notify 方法

notify 方法也需要放在同步块中执行,它有2个版本:

  • notify : 唤醒 locko 监视器等待集中的第一条等待线程,被唤醒的线程进入 EntryList ,状态从 WAITING 变为 BLOCKED

  • notifyAll: 唤醒 locko 监视器等待集中全部等待线程,所有线程进入 EntryList ,状态从 WAITING 变为 BLOCKED

notify 核心原理如下:

  • 当线程调用了 locko 的 notify 方法后,JVM 会唤醒 locko 监视器等待集中的第一条等待线程(如果是 notifyAll 则是所有线程),被唤醒的线程进入 EntryList ,状态从 WAITING 变为 BLOCKED,具备了排队抢夺监视器 Owner权利的资格

  • EntryList 中的线程抢夺到监视器的 Owner 权利后,线程的状态从 BLOCKED 变成 RUNNABLE,具备重新执行的资格

2.9.5 生产者-消费者之间的线程间通信

此实现版本大致需要定义以下3个同步对象:

  • LOCK_OBJECT:用于临界区同步,临界区资源为数据缓冲区的 dataList 变量和 amount 变量

  • NOT_FULL:用于数据缓冲区的未满条件等待和通知,生产者在添加元素时需要判定是否已满,如果已满,则进入 NOT_FULL 的同步去等待,只要消费者耗费一个元素,就会通过 NOT_FULL 发送通知。

  • NOT_EMPTY:同理,这是用于数据缓冲区的非空条件的等待和通知。消费者在消费前需要判断数据区是否空,如果是,消费者就进入 NOT_EMPTY 的同步区等待被通知,只要生产者添加一个元素,生产者就会通过 NOT_EMPTY 发送通知

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
public class CommunicatePetStore {

public static final int MAX_AMOUNT = 10; //数据缓冲区最大长度

//数据缓冲区,类定义
static class DataBuffer<T> {
//保存数据
private List<T> dataList = new LinkedList<>();
//数据缓冲区长度
private Integer amount = 0;

private final Object LOCK_OBJECT = new Object();
private final Object NOT_FULL = new Object();
private final Object NOT_EMPTY = new Object();

// 向数据区增加一个元素
public void add(T element) throws Exception {
while (amount > MAX_AMOUNT) {
synchronized (NOT_FULL) {
Print.tcfo("队列已经满了!");
//等待未满通知
NOT_FULL.wait();
}
}
synchronized (LOCK_OBJECT) {
dataList.add(element);
amount++;
}
synchronized (NOT_EMPTY) {
//发送未空通知
NOT_EMPTY.notify();
}
}

/**
* 从数据区取出一个商品
*/
public T fetch() throws Exception {
while (amount <= 0) {
synchronized (NOT_EMPTY) {
Print.tcfo("队列已经空了!");
//等待未空通知
NOT_EMPTY.wait();
}
}

T element = null;
synchronized (LOCK_OBJECT) {
element = dataList.remove(0);
amount--;
}

synchronized (NOT_FULL) {
//发送未满通知
NOT_FULL.notify();
}
return element;
}
}

public static void main(String[] args) throws
InterruptedException {
Print.cfo("当前进程的ID是" + JvmUtil.getProcessID());
System.setErr(System.out);
//共享数据区,实例对象
DataBuffer<IGoods> dataBuffer = new DataBuffer<>();

//生产者执行的动作
Callable<IGoods> produceAction = () -> {
//首先生成一个随机的商品
IGoods goods = Goods.produceOne();
//将商品加上共享数据区
dataBuffer.add(goods);
return goods;
};
//消费者执行的动作
Callable<IGoods> consumerAction = () -> {
// 从PetStore获取商品
IGoods goods = null;
goods = dataBuffer.fetch();
return goods;
};
// 同时并发执行的线程数
final int THREAD_TOTAL = 20;
//线程池,用于多线程模拟测试
ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_TOTAL);

//假定共11个线程,其中有10个消费者,但是只有1个生产者
final int CONSUMER_TOTAL = 11;
final int PRODUCE_TOTAL = 1;

for (int i = 0; i < PRODUCE_TOTAL; i++) {
//生产者线程每生产一个商品,间隔50毫秒
threadPool.submit(new Producer(produceAction, 50));
}
for (int i = 0; i < CONSUMER_TOTAL; i++) {
//消费者线程每消费一个商品,间隔100毫秒
threadPool.submit(new Consumer(consumerAction, 100));
}
}
}

2.9.6 需要在synchronized 同步块的内部使用 wait 和notify

调用 wait 和 notify 方法时,“当前线程”必须拥有该对象的同步锁,也即wait 和notiry 方法必须在同步块中使用,否则JVM 就会抛出 IllegalMonitorStateException 异常。

这是为什么呢?还得从这 2 个方法的原理说起:

  • 调用 wait :JVM 会释放当前线程的对象监视器的 Owner 资格,还会将当前线程移入监视器的 WaitSet 队列,这些操作都是和对象监视器锁相关的,所以,当前线程执行 wait 方法前,必须通过 synchronized 方法称为对象锁的 Owner,要在同步块内调用

  • 同理, 调用 notify 时,JVM 从对象锁的监视器 WaitSet 队列移动线程到其 EntryList 队列,这些操作都与对象锁的监视器有关,所以,也必须先成为对象锁监视器的 Owner,然后在同步块内调用

2.9.7 调用wait、notify方法进行线程间通信的要点(自己加的章节)

有了以上的知识储备,来说下wait 和 notify 方法进行线程间通信的要点:

  • 调用某个同步对象 locko 的 wait 和 notify 类型方法前,必须要获得这个锁对象的监视器锁,这2个类型的方法必须放在同步块中执行,否则报错

  • 调用wait方法是使用while进行条件判断,如果是在某种条件下进行等待,对条件的判断就不能使用if语句做一次性判断,而是使用while 循环进行反复判断,只有这样才能在线程被唤醒后继续检查wait 条件,并在条件没有满足的情况下继续等待。

正确的条件判断代码:

1
2
3
4
5
6
7
8
9
10
//消费者获取元素
public T fetch() throws Exception {
while (amount <= 0) {
synchronized (NOT_EMPTY) {
//队列空了
NOT_EMPTY.wait();
}
}
...
}

错误地使用 if 条件判断:

1
2
3
4
5
6
7
8
9
10
//消费者获取元素
public T fetch() throws Exception {
if (amount <= 0) {
synchronized (NOT_EMPTY) {
//队列空了
NOT_EMPTY.wait();
}
}
...
}

至于为什么要这样,从之前说的原理我们知道,wait 方法会释放锁。我们考虑这么一种场景:

  • 假如有 2 个消费者 consumerOne 和 consumerTwo

  • consumerOne 在判定是空的时候,wait 了,这时候会释放锁;由于释放了锁,consumerTwo 自然就能获取到这个锁,然后发现也是空的,自然也 wait 了

  • 也就是说 consumerOne 和 consumerTwo 都在wait 等待了,这是问题关键

  • 此时,生产者放入一个元素,完了调用 notifyAll ,consumerOne 和 consumerTwo 都被唤醒了,他们会竞争锁

  • 假如 consumerOne 拿到锁了,consumerTwo 还在锁池中继续阻塞,consumerOne 执行wait 后面的代码消费了,接着又会变为空

  • consumerOne 执行完成后,consumerTwo 拿到锁也接着执行 wait 后面的代码,由于被 consumerOne 消费变为空了之后,consumerTwo 后续的执行以不空作为条件的执行会出现问题

如果不太明白,还可以参考为什么生产者消费者中模式中要用while作临界判断?_xuwen_chen的博客-CSDN博客