0%

Java筑基-:线程基本使用入门(2)

一、分而治之思想

算法中分而治之思想和动态规划思想有点类似,要注意区分:

  • 分而治之: 将一个任务分割成很多小任务,并且各个任务之间无关联

  • 动态规划: 将一个任务 A 分割成很多小任务,比如 a1、a2、a3 ,假如存在 a1 执行完成才能执行a3,那就是小任务有关联性了,这就是动态规划思想了

快排,归并,二分都是分而治之的思想。

在 Java 7 中提供了 Fork/Join 工具(Fork:将任务拆分成很多独立的子任务;Join:将所有的子任务结果合并)来让分而治之的思想并发执行起来。当然,我们也可以不使用这个框架,自己将任务分成很多小任务,然后用多线程去执行。不过呢,Java 的 ForkJoin 给提供了工作窃取(work-stealing) 算法,能够让执行更高效。

2.1 工作窃取

工作窃取(work-stealing) 简单来说就是空闲线程试图从繁忙线程的 deques 中窃取工作。

默认情况下,每个工作线程从自己的双端队列中获取任务,但如果自己队列中任务已经执行完成,队列为空时,它就会从另一个繁忙线程的双端队列尾部或者全局入口队列中获取任务,因为这是最大概率可能找到工作的地方。

为什么会有任务先执行完成呢?因为工作量的划分不一定均等。即使是相同数量的数字相加,比如,每个任务都是10个数字相加,如果数字很大的话,耗时也会很长。

二、Java 提供的 ForkJoin

Java 提供分而治之的框架: ForkJoin。既然是要提交任务,我们需要对任务进行封装,ForkJoin 提供了 ForkJoinTask 来包装任务:

1
public abstract class ForkJoinTask<V> implements Future<V>, Serializable

不过一般不直接使用它,而是用它的 2 个主要的子类:

1
2
3
4
public abstract class RecursiveTask<V> extends ForkJoinTask<V>


public abstract class RecursiveAction extends ForkJoinTask<Void>

从这 2 个子类我们能看出,RecursiveTask 更适合有返回值的情形;RecursiveAction 适合没有返回值的情形。ForkJoinTask 它有2种使用方法:

  • 同步用法:采用 invoke 来提交任务

  • 异步用法:采用submit 或者 execute 方法来提交任务

使用 ForkJoin 的方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void main(String[] args) {
int[] src = MakeArray.makeArray();
//第一步,创建池子
ForkJoinPool pool = new ForkJoinPool();
//第二步:构建任务
SumTask innerFind = new SumTask(src, 0, src.length - 1);

long start = System.currentTimeMillis();
//第三步、提交,invoke 意味着同步执行
pool.invoke(innerFind);
//第四步、获取结果
System.out.println("the result is " + innerFind.join() +
",spend time:" + (System.currentTimeMillis() - start));
}

其中,makeArray 只是用来生成一个数组:

1
2
3
4
5
6
7
8
9
10
11
12
public class MakeArray {
public static final int ARRAY_LENGTH = 4000;
public final static int THRESHOLD = 47;

public static int[] makeArray() {
int[] result = new int[ARRAY_LENGTH];
for (int i = 0; i < ARRAY_LENGTH; i++) {
result[i] = i;
}
return result;
}
}

真正的任务代码,由于我们需要有返回值,所以我们继承了 RecursiveTask :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class SumTask extends RecursiveTask<Integer> {

private static final int THREASHOLD = MakeArray.ARRAY_LENGTH / 10;
private int[] src;
private int fromIndex;
private int toIndex;

public SumTask(int[] src, int fromIndex, int toIndex) {
this.src = src;
this.fromIndex = fromIndex;
this.toIndex = toIndex;
}

@Override
protected Integer compute() {
//判定任务大小是否合适
if (toIndex - fromIndex < THREASHOLD) {
//满足要求,计算结果
System.out.println("from index = " + fromIndex + ",toIndex = " + toIndex);
int count = 0;
for (int i = fromIndex; i <= toIndex; i++) {
//Thread.sleep(1);
count = count + src[i];
}
return count;
} else {
//还不满足要求,则继续拆分任务
int mid = (fromIndex + toIndex) / 2;
//拆分出左边的任务
SumTask left = new SumTask(src, fromIndex, mid);
//拆分出右边的任务
SumTask right = new SumTask(src, mid + 1, toIndex);
//提交到我们创建的 pool 中
invokeAll(left, right);
return left.join() + right.join();
}
}
}

在重写的 compute 方法中,我们不断拆分任务,直至任务满足我们定义的阈值位置。计算量越大,中途 IO 操作越多 或者 休眠时间越多, ForkJoin 的优势越明显。如果只是少量数据,那么由于:

  1. ForkJoin 的递归调用

  2. ForkJoin 多线程切换

反而导致还没单线程那么快。但是当计算过程中有IO 操作或者休眠时间时,ForkJoin 在大量数据情况下的优势就会显示出来。我们注意上述代码中注释的代码 : Thread.sleep(1); 如果在单线程计算和 ForkJoin 计算过程中都休眠 1ms ,后续导致的耗时差异是很大的,这个可以自行验证。

上面我们看了同步的操作,接下来看下如果进行异步操作,以下代码将根据指定的文件中,所有的 txt 文件,有可能文件下还有目录:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) throws Exception {
ForkJoinPool pool = new ForkJoinPool();

FindDirFiles task = new FindDirFiles(new File("D:/"));

//异步提交
pool.execute(task);

//主线程做自己的事情
System.out.println("Task isRuning");
Thread.sleep(1);

System.out.println("doing other works");

//获取结果,这是一个阻塞方法
task.join();

//由于上述join 是阻塞方法,所以一定得有结果之后,才会打印下面的 task end
System.out.println("task end");
}

首先 main 方法中调用与上述的同步差不多,只不过在使用 execute 异步调用后,主线程的日志和子线程的日志同时在打,最后,我们会调用 task.join 阻塞方法,这也导致当所有文件遍历完成后,才会在main 线程中 输出最后的 task end 语句。我们看下 FindDirFiles 的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class FindDirFiles extends RecursiveAction {

private File path;

public FindDirFiles(File path) {
this.path = path;
}

@Override
protected void compute() {
List<FindDirFiles> subTasks = new ArrayList<>();
File[] files = path.listFiles();
if (files != null) {
for (File file : files) {
if (file.isDirectory()) {
//对每个子目录新建一个子任务
subTasks.add(new FindDirFiles(file));
} else {
//遇到文件,检查
if (file.getAbsolutePath().endsWith("txt")) {
System.out.println("文件:" + file.getAbsolutePath());
}
}
}

if (!subTasks.isEmpty()) {
//在当前的 ForkJoinPool 上调度所有的子任务
for (FindDirFiles subTask : invokeAll(subTasks)) {
subTask.join();
}
}
}

}
}

由于不需要返回结果,所以我们继承 RecursiveAction 即可,在 compute 中,我们将为每个目录创建一个 Task 加入 list 中,最后一并 invokeAll 执行。

三、CountDownLatch

它的作用是等待一个线程或者多个线程执行完成后再执行后续的操作。比如说要等所有的初始化工作(多个子线程)执行完成之后,才进入主线程执行。

对于 CountDownLatch 而言,计数器和线程数量是没关系的,有可能在同一个线程中做 2 个任务执行 2 次 countDown 操作。

线程做完 countDown 操作之后,能够继续执行,并不是说一定就要关闭之类的。

CountdownLatch 的 await 方法(即等待所有的扣减到 0 后才能执行的地方)可以在多个地方使用,比如同时在 主线程调用 countDownLatch.await() 以及在某个子线程中调用 countDownLatch.await() ,当 countDownLatch 计数器减为 0 的时候,主线程和那个子线程都会被唤醒接着执行。

四、CyclicBarrier

它的主要思想是,在某个点设置一个屏障, 比如 3 个线程,如果 A 线程先执行完成,它会 await 在那里等待;同样 B 线程也会这样,一直到 C 线程也执行完成,此时,3个线程才一起执行。它有 2 个主要构造方法:

1
2
3
4
//parties 表示操控的线程数量
public CyclicBarrier(int parties) {

}

以及:

1
2
3
public CyclicBarrier(int parties, Runnable barrierAction) {

}

其中第二个 构造方法的 barrierAction 表示的是,当所有的操控的线程都执行到这个屏障的时候,此时优先执行一下这个 Rnnable ,之后三个线程才继续执行。这样做的一个好处就是:

举个例子吧,如果3 个线程计算一个任务的 3 个部分,这时候,我们可以用设置这个 Runnable 来合并这 3 部分结果,之后,这 3 个线程都拿着合并后的结果执行后面的操作。

五、CountDownLatch 与 CyclicBarrier 的区别

虽然二者在功能上很相似,但其实是有很大区别的,主要体现在:

  • CountDownLatch 定义的数字只能扣减一次,CyclicBarrier 能多次调用 await ,后续的 await 还都能触发 barrierAction 的执行

  • CountDownLatch 用外部线程协调;而 CyclicBarrier 本身相互协调

  • CountDownLatch 扣减数和线程数不一样,比如一个线程可以 countdown 多次;但 CyclicBarrier 初始化时传入的 数量必须是管控的线程数量

  • 这点没大听懂,大体是说 CountDownLatch 运行中不允许其他线程执行;而CyclicBarrier 可以通过 barrierAction 执行其他线程任务。还需要额外确认这点

六、Semaphore

主要用于流控的场景。比如,数据库最多只能有 10 个连接,那么我们可以通过 Semaphore 发放 10 个 许可证。在正常使用的时候通过 acquire 和 release 来控制获取和释放操作。还是看如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
static class ConnectionImpl implements Connection {
。。。省略实现
}


public class SemaphoreDemo {
//许可证数量
private final static int POOL_SIZE = 10;

//2个新号指示器,分别表示 池子还有可用的连接 / 已用的连接
private final Semaphore useful, uselessful;
//存放资源的池子
private static LinkedList<Connection> pool = new LinkedList<>();

static {
//初始化放入资源
for (int i = 0; i < POOL_SIZE; i++) {
pool.addLast(new ConnectionImpl());
}
}

public SemaphoreDemo() {
this.useful = new Semaphore(POOL_SIZE);
this.uselessful = new Semaphore(0);
}

//归还连接
public void returnConnection(Connection connection) throws Exception {
if (connection != null) {
System.out.println("当前有" + useful.getQueueLength() + "个线程等待连接,"
+ "可用连接数:" + useful.availablePermits());
//通过 uselessful 的 acquire 来获取空位
synchronized (pool) {
pool.addLast(connection);
}
//许可证释放
useful.release();
}
}

//获取连接
public Connection takeConnect() throws Exception {
//获取许可,如果没有许可可能阻塞
useful.acquire();
Connection connection;
//拿到许可,为什么还需要同步?
//这是因为如果有 4 个许可,那么可能同时有 4 个线程过来取值,所以,还需要对池子同步
synchronized (pool) {
connection = pool.removeFirst();
}
//空位释放
uselessful.release();
return connection;
}
}

接下来就是编写 Demo 使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private static class BusiThread extends Thread {
private static SemaphoreDemo semaphoreDemo = new SemaphoreDemo();

@Override
public void run() {
super.run();
//随机,让每个线程持有的连接时间不一样
Random r = new Random();
long start = System.currentTimeMillis();

try {
Connection connection = semaphoreDemo.takeConnect();
System.out.println("Thread_" + Thread.currentThread().getName()
+ "获取数据库连共耗时【" + (System.currentTimeMillis() - start) + "】ms");
//模拟业务操作
Thread.sleep(100 + r.nextInt(100));
System.out.println("查询数据完成,归还连接");
semaphoreDemo.returnConnection(connection);
} catch (Exception e) {
e.printStackTrace();
}
}
}


public static void main(String[] args) {
for (int i = 0; i < 50; i++) {
Thread t = new BusiThread();
t.start();
}
}

上述代码会显示出前面 10 个线程几乎是 0ms 就获取到资源,而后续的会等待:

Thread_Thread-1获取数据库连共耗时【0】ms
Thread_Thread-0获取数据库连共耗时【0】ms
Thread_Thread-2获取数据库连共耗时【0】ms
Thread_Thread-3获取数据库连共耗时【0】ms
Thread_Thread-4获取数据库连共耗时【0】ms
Thread_Thread-5获取数据库连共耗时【0】ms
Thread_Thread-6获取数据库连共耗时【0】ms
Thread_Thread-7获取数据库连共耗时【0】ms
Thread_Thread-8获取数据库连共耗时【0】ms
Thread_Thread-9获取数据库连共耗时【1】ms
查询数据完成,归还连接
当前有40个线程等待连接,可用连接数:0
Thread_Thread-10获取数据库连共耗时【113】ms
查询数据完成,归还连接
当前有39个线程等待连接,可用连接数:0
Thread_Thread-11获取数据库连共耗时【124】ms
查询数据完成,归还连接
当前有38个线程等待连接,可用连接数:0
Thread_Thread-12获取数据库连共耗时【141】ms

。。。省略一部分运行数据

这里可能大家有个疑问,为什么需要定义 2 个 Semaphore: useful, uselessful。其中 前者是代表可用的许可,后者代表空位。为什么这么设计?还得从 Semaphore 的机制说起:即使没有调用 acquire ,而是直接调用 release ,这样也是可以的,可以在release 的时候传入一个 new 出来的Connection 对象 !!!,假如出现这种情况的话,我们的许可证会越来越多,而不是我们初衷的限制了。

acquire 只是做流控,但是不能做同步操作,比如,它有 4 个许可证,意味着可以允许 4 个线程同时去取资源,这时候我们还是需要对存储资源的池子做同步的。 所以,我们上述的代码 takeConnect() 方法中,在 acquire 之后,还需要 synchronized (pool) 操作:

1
2
3
4
5
6
7
8
9
public Connection takeConnect() throws Exception {
//获取许可,如果没有许可可能阻塞
useful.acquire();
//省略无关代码。。。
synchronized (pool) {
connection = pool.removeFirst();
}
//省略无关代码。。。
}

七、Exchange

Exchange 的使用场景很少。它的含义是:两个线程 A 和 B,在某一位置设定一个点,当某个先执行完了,就会等待另一个线程。当 2 个线程都执行到这个点时,二者交换数据,然后接着继续执行,没错,只是交换数据。

八、Callable 、Task

我们前面说了开启一个线程有 2 种方式:new Thread 和 new Runnable 。但是,由于 Runnable 类的 run 方法是 void 类型的,没有返回值:

1
2
3
public interface Runnable {
void run();
}

因此 Java 中给准备了 Callable 这样的类,让方法有返回值:

1
2
3
public interface Callable<V> {
V call() throws Exception;
}

由于Thread 中只能接收 Runnable 参数,因此 Callable 不能直接传入使用,因此就有了 Task 这种类型,可能课程中的这张图更形象一点:

Runnable和Callable和Future

关于 Callable 的使用,我们可以参考下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class CallableDemo {

private static class UseCallable implements Callable<Integer>{
private int sum;

@Override
public Integer call() throws Exception {
System.out.println("Callable 子线程开始计算");
for (int i = 0;i<5000;i++) {
sum += i;
}
System.out.println("Callable 子线程计算结束,结果:" + sum);
return sum;
}
}

public static void main(String[] args) throws Exception{
UseCallable useCallable = new UseCallable();
FutureTask<Integer> task = new FutureTask<>(useCallable);
new Thread(task).start();
System.out.println("main 方法中获取结果:" + task.get());
}
}

不过,需要注意的是,我们如果想取消 Task ,可以直接调用 task.cancel(true); ,其中传入的 true 是中断线程的标志,前面章节有说我们不建议使用 volatile 类型的变量去停止线程,还是希望使用 interrupt 去停止。所以,我们光调用 task.cancel(true);还不行,还得在线程中配合中断操作,如下所示:

1
2
3
4
5
6
7
8
9
@Override
public Integer call() throws Exception {
for (int i = 0;i<5000;i++) {
if (Thread.currentThread().isInterrupted()) {
return null;
}
//删除无关代码
}
}
谢谢你的鼓励