高并发场景下常见的设计模式可能存在线程安全问题,比如传统的单例模式就是典型。本章介绍几种高并发场景下常用的几种模式:线程安全的单例模式、ForkJoin模式、生产者-消费者模式、Master-Worker模式和 Future模式。
8.1 线程安全的单例模式
没有volatile 情况下的双重检查实现的单例模式,可能会存在问题,可以看到下面这句代码:
instance = new Singleton();
转换成(具有原子性的)汇编指令大致会分为 3个:
分派一块内存 M
在 M 上初始化 Singleton 对象
M 的地址赋值给 instance 变量
但是,以前说过可能会进行重排序,上面 3 个指令优化之后可能会变为:
分派一块内存 M
M 的地址赋值给 instance 变量
在 M 上初始化 Singleton 对象
指令重排后,获取单例可能导致问题发生,假设 A 、B线程过来获取单例:
A 通过 getInstance() 方法,执行到分配一块内存并将地址赋值给 instance,恰好发生了线程切换,此时,A 还没来得及对 M 指向的内存初始化
线程 B 进入 getInstance() 方法,判断 instance 不为空,于是 B 直接获取到了未初始化的 instance
线程 B 使用未初始化完全的对象 instance 在访问 instance 的成员变量时可能会发生异常
所以需要添加 volatile 防止指令重排。
8.1.5 使用静态内部类实现懒汉式单例模式
虽然通过 双重锁检查+volatile相结合的方式能实现高性能、线程安全的单例模式,但是该实现的底层原理比较复杂、实现繁琐,另一种易于理解、编程简单的单例模式实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13
| public class Singleton { private Singleton() {}
public static final Singleton getInstance() { return LazyHolder.INSTANCE; }
private static class LazyHolder { private static final Singleton INSTANCE = new Singleton(); } }
|
这种方式只有在调用 getInstance() 的时候才会初始化单例,该方式既解决了线程安全问题,又解决了写法繁琐的问题。书中推荐使用这种方案。
8.2 Master-worker 模式
这是一种常见的高并发模式,它的核心思想是:任务的调度和执行分离,调度任务的角色是 Master,执行任务的角色是 Worker,Master 负责接收、分配和合并(Merge)任务结果,Worker 负责执行任务。
8.2.1 Master-worker 模式的参考实现
Master 的参考代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| public class Master<T extends Task, R> { private HashMap<String, Worker<T, R>> workers = new HashMap<>();
private LinkedBlockingQueue<T> taskQueue = new LinkedBlockingQueue<>();
protected Map<String, R> resultMap = new ConcurrentHashMap<>();
private Thread thread = null;
private AtomicLong sum = new AtomicLong(0);
public Master(int workerCount) { for (int i = 0; i < workerCount; i++) { Worker<T, R> worker = new Worker<>(); workers.put("子节点: " + i, worker); } thread = new Thread(() -> this.execute()); thread.start(); }
public void submit(T task) { taskQueue.add(task); }
private void resultCallBack(Object o) { Task<R> task = (Task<R>) o; String taskName = "Worker:" + task.getWorkerId() + "-" + "Task:" + task.getId(); R result = task.getResult(); resultMap.put(taskName, result); sum.getAndAdd((Integer) result); }
public void execute() {
for (; ; ) { 配任务 for (Map.Entry<String, Worker<T, R>> entry :workers.entrySet()) { T task = null; try { task = this.taskQueue.take();
Worker worker = entry.getValue(); 获取节点 worker.submit(task, this::resultCallBack); } catch (InterruptedException e) { e.printStackTrace(); } } } }
public void printResult() { Print.tco("----------sum is :" + sum.get()); for (Map.Entry<String, R> entry : resultMap.entrySet()) { String taskName = entry.getKey(); Print.fo(taskName + ":" + entry.getValue()); } } }
|
Worker 的代码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| public class Worker<T extends Task, R> { private LinkedBlockingQueue<T> taskQueue = new LinkedBlockingQueue<>(); static AtomicInteger index = new AtomicInteger(1); private int workerId; private Thread thread = null;
public Worker() { this.workerId = index.getAndIncrement(); thread = new Thread(() -> this.run()); thread.start(); }
public void run() { for (; ; ) { try { T task = this.taskQueue.take(); task.setWorkerId(workerId); task.execute();
} catch (InterruptedException e) { e.printStackTrace(); } } }
public void submit(T task, Consumer<R> action) { task.resultAction = action; try { this.taskQueue.put(task); } catch (InterruptedException e) { e.printStackTrace(); } } }
|
8.3 FokJoin模式
目前没心思看,先略过
8.4 生产者-消费者模式
略过
8.5 Future模式
略