0%

第8章:JUC容器类

高并发场景下常见的设计模式可能存在线程安全问题,比如传统的单例模式就是典型。本章介绍几种高并发场景下常用的几种模式:线程安全的单例模式、ForkJoin模式、生产者-消费者模式、Master-Worker模式和 Future模式。

8.1 线程安全的单例模式

没有volatile 情况下的双重检查实现的单例模式,可能会存在问题,可以看到下面这句代码:

instance = new Singleton();

转换成(具有原子性的)汇编指令大致会分为 3个:

  1. 分派一块内存 M

  2. 在 M 上初始化 Singleton 对象

  3. M 的地址赋值给 instance 变量

但是,以前说过可能会进行重排序,上面 3 个指令优化之后可能会变为:

  1. 分派一块内存 M

  2. M 的地址赋值给 instance 变量

  3. 在 M 上初始化 Singleton 对象

指令重排后,获取单例可能导致问题发生,假设 A 、B线程过来获取单例:

  1. A 通过 getInstance() 方法,执行到分配一块内存并将地址赋值给 instance,恰好发生了线程切换,此时,A 还没来得及对 M 指向的内存初始化

  2. 线程 B 进入 getInstance() 方法,判断 instance 不为空,于是 B 直接获取到了未初始化的 instance

  3. 线程 B 使用未初始化完全的对象 instance 在访问 instance 的成员变量时可能会发生异常

所以需要添加 volatile 防止指令重排。

8.1.5 使用静态内部类实现懒汉式单例模式

虽然通过 双重锁检查+volatile相结合的方式能实现高性能、线程安全的单例模式,但是该实现的底层原理比较复杂、实现繁琐,另一种易于理解、编程简单的单例模式实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class Singleton {
//私有构造函数
private Singleton() {}

//获取单例的方法
public static final Singleton getInstance() {
return LazyHolder.INSTANCE;
}

private static class LazyHolder {
private static final Singleton INSTANCE = new Singleton();
}
}

这种方式只有在调用 getInstance() 的时候才会初始化单例,该方式既解决了线程安全问题,又解决了写法繁琐的问题。书中推荐使用这种方案。

8.2 Master-worker 模式

这是一种常见的高并发模式,它的核心思想是:任务的调度和执行分离,调度任务的角色是 Master,执行任务的角色是 Worker,Master 负责接收、分配和合并(Merge)任务结果,Worker 负责执行任务。

8.2.1 Master-worker 模式的参考实现

Master 的参考代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73

public class Master<T extends Task, R> {
// 所有Worker的集合
private HashMap<String, Worker<T, R>> workers = new HashMap<>();


// 任务的集合
private LinkedBlockingQueue<T> taskQueue = new LinkedBlockingQueue<>();

//任务处理结果集合
protected Map<String, R> resultMap = new ConcurrentHashMap<>();

//Master的任务调度线程
private Thread thread = null;

//保持最终的和
private AtomicLong sum = new AtomicLong(0);

public Master(int workerCount) {
// 每个Worker对象都需要持有queue的引用,用于领任务与提交结果
for (int i = 0; i < workerCount; i++) {
Worker<T, R> worker = new Worker<>();
workers.put("子节点: " + i, worker);
}
thread = new Thread(() -> this.execute());
thread.start();
}

// 提交任务
public void submit(T task) {
taskQueue.add(task);
}

//获取worker结果处理的回调函数
private void resultCallBack(Object o) {
Task<R> task = (Task<R>) o;
String taskName = "Worker:" + task.getWorkerId() + "-" + "Task:" + task.getId();
R result = task.getResult();
resultMap.put(taskName, result);
sum.getAndAdd((Integer) result); //和的累加
}

// 启动所有的子任务
public void execute() {

for (; ; ) {
// 从任务队列中获取任务,然后Worker节点轮询,轮流分
配任务
for (Map.Entry<String, Worker<T, R>> entry :workers.entrySet()) {
T task = null;
try {
task = this.taskQueue.take();

//获取任务
Worker worker = entry.getValue(); //
获取节点
worker.submit(task, this::resultCallBack); //分配任务
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

// 获取最终的结果
public void printResult() {
Print.tco("----------sum is :" + sum.get());
for (Map.Entry<String, R> entry : resultMap.entrySet()) {
String taskName = entry.getKey();
Print.fo(taskName + ":" + entry.getValue());
}
}
}

Worker 的代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class Worker<T extends Task, R> {
//接收任务的阻塞队列
private LinkedBlockingQueue<T> taskQueue = new LinkedBlockingQueue<>();
//Worker 的编号
static AtomicInteger index = new AtomicInteger(1);
private int workerId;
//执行任务的线程
private Thread thread = null;

public Worker() {
this.workerId = index.getAndIncrement();
thread = new Thread(() -> this.run());
thread.start();
}

/**
* 轮询执行任务
*/
public void run() {
// 轮询启动所有的子任务
for (; ; ) {
try {
//从阻塞队列中提取任务
T task = this.taskQueue.take();
task.setWorkerId(workerId);
task.execute();

} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

//接收任务到异步队列
public void submit(T task, Consumer<R> action) {
task.resultAction = action; //设置任务的回调方法
try {
this.taskQueue.put(task);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

8.3 FokJoin模式

目前没心思看,先略过

8.4 生产者-消费者模式

略过

8.5 Future模式

谢谢你的鼓励