0%

Java筑基-:(09-1)线程基本使用入门-1

一、错误加锁的问题

Java 的 Object 类中有 hashCode() 方法,它是用对象在内存中的地址来产生这个哈希值,当然,我们可以重写 hashCode() 方法。如果在重写了 hashCode() 方法之后还想获取到这个由内存中地址产生的哈希值,可以使用 identityHashCode() 方法。

在使用 synchronized 关键字去做同步的时候,要确保 synchronized 里面的对象是同一个对象,否则会达不到效果,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public static void main(String[] args) {
Work work = new Work(1);
for (int i = 0; i < 5; i++) {
new Thread(work).start();
}
}

static class Work implements Runnable {
Integer i;

public Work(Integer integer) {
i = integer;
}

@Override
public void run() {
synchronized (i) {
i++;
System.out.println("当前 i = " + i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

本来预期输出 2~6 的值,但是这段代码输出的数字是不可控的。这是为什么呢?问题就在 i ++ 上, 反编译可以看到, i ++ 之后就会生成新的 Integer 对象,这样一来,导致后续的程序获取的 Integer 就不是同一个对象了。

改进方法可以是自己创建一个 Object obj = new Object() ;因为这个对象不会改变,所以我们可以用它来作为锁对象。

1
2
3
4
5
6
7
8
9
10
static class Work implements Runnable {

Object obj = new Object();
@Override
public void run() {
synchronized (obj) {
//省略代码。。。
}
}
}

二、volatile 关键字

比较简单,略

三、ThreadLocal 使用

Spring 在事务中用到了 ThreadLocal ,为什么要这么做?这是因为我们想要为每个线程保存自己的数据库 connection (连接)。这样比较容易控制事务的边界。

关于 ThreadLocal 需要注意的有 2 点:

  • ThreadLocal 能够给初始值

  • 每个线程单独操作自己的线程中的ThreadLocalMap ,所以压根不会有冲突

1
2
3
4
5
6
7
//ThreadLocal 给初始值
private static ThreadLocal<Integer> num = new ThreadLocal<>(){
@Override
protected Integer initialValue() {
return 1;
}
};

四、ThreadLocal 的实现

如果要理解 ThreadLocal 的原理,就从它最简单的 get() 方法开始看(以下代码是基于我本地的 JDK 20 ,不过和前面版本相差也不太大):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public T get() {
//Thread.currentThread() 表示当前线程
return get(Thread.currentThread());
}

private T get(Thread t) {
//根据当前线程,获得 ThreadLocalMap 对象
ThreadLocal.ThreadLocalMap map = getMap(t);
//。。。 省略无关代码

//这里getEntry 传入了 this ,也就是以 ThreadLocal 本身为 key
ThreadLocal.ThreadLocalMap.Entry e = map.getEntry(this);
T result = (T) e.value;
return result;
}

也就是说,首先根据当前线程的 thread 对象 获得 ThreadLocalMap 对象 map ,之后,以ThreadLocal 自身(那个this) 为 key ,从 map 中取出 value ,这个 value 就是我们想要的值了,来看下 getMap 的实现:

1
2
3
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}

这里直接返回了 Thread.threadLocals,这意味着 ThreadLocalMap 类型的 map 是 Thread 的成员变量 !那就没什么好说的了,每个 thread 对象自己都有这么个成员变量,每次操作的时候都是对线程对象 thread 自己的 ThreadLocalMap 类型的成员变量 threadLocals 进行操作,当然不会有线程问题了。

这里还有个问题需要澄清,就是如果定义了多个 ThreadLocal 对象 threadLocal1、threadLocal2… ,那么又是怎么保存的呢?我们来看下 ThreadLocal 的内部类 ThreadLocalMap 就知道了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ThreadLocal<T> {
// 。。。 省略无关代码
static class ThreadLocalMap {
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;

Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}

//The initial capacity -- MUST be a power of two.
private static final int INITIAL_CAPACITY = 16;

private ThreadLocal.ThreadLocalMap.Entry[] table;
}
// 。。。 省略无关代码
}

ThreadLocal 中有个 ThreadLocalMap 类,并且 ThreadLocalMap 中有个 Entry [] 数组类型的 table ,用来存放所有的 Entry 。

这里有个细节, Entry [] 数组的初始大小是16,并且官方建议这个值应该是 2^n 这样的值

上面看到 ThreadLocal 的 get() 方法最终会调用 ThreadLocalMap 的 getEntry 方法,看ThreadLocalMap.getEntry 应该更能理解:

1
2
3
4
5
6
7
8
9
10
11
12
# ThreadLocalMap 类
private ThreadLocal.ThreadLocalMap.Entry getEntry(ThreadLocal<?> key) {
//根据 key 获取 在数组 table 中的index
int i = key.threadLocalHashCode & (table.length - 1);
//根据 index 获取 table[index]
ThreadLocal.ThreadLocalMap.Entry e = table[i];
//处理可能的 hash 冲突和空值
if (e != null && e.refersTo(key))
return e;
else
return getEntryAfterMiss(key, i, e);
}

看了 get() 方法之后,ThreadLocal 的 set 的逻辑应该也能很容易理解了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# java.lang.ThreadLocal 

public void set(T value) {
//将当前线程传过去
set(Thread.currentThread(), value);
}

private void set(Thread t, T value) {
//根据当前线程获取它的 ThreadLocalMap 对象
ThreadLocal.ThreadLocalMap map = getMap(t);
//。。。省略无关代码
if (map != null) {
//ThreadLocalMap 对象不空,调用其 set 方法,以ThreadLocal 本身作为 key
map.set(this, value);
} else {
//如果 ThreadLocalMap 对象还是空的,就创建个
createMap(t, value);
}
}

最后调用到 ThreadLocalMap 的 set 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void set(ThreadLocal<?> key, Object value) {
//前面说的 Entry[] 数组
ThreadLocal.ThreadLocalMap.Entry[] tab = table;
//通过某种方式获取到即将插在数组中的位置 i
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);

//省略无关代码。。。
//可能位置 i 会有冲突,就 nextIndex,也有可能直接覆盖之前的值
//nextIndex 其实就是 + 1 操作,如果超出了数组长度,就又从 0 开始

//创建 Entry 对象,存入value,然后放入 tab[i] 中
tab[i] = new ThreadLocal.ThreadLocalMap.Entry(key, value);

//省略无关代码。。。
}

就是经过某种方法计算出在数组中的位置 i (可能会有位置冲突,就需要重新计算,或者之前有设置过了,就需要覆盖),然后创建 Entry 实体,存入 table[i] 中。

经过以上代码就整个流程能够串起来了:

  1. ThreadLocalMap 是 ThreadLocal 的内部类,ThreadLocalMap 中有一个 Entry[] 数组,用于存放(该线程)所有使用 ThreadLocal 存放的值(以 ThreadLocal 对象本身为key ,得出 index, 再根据 index 从 Entry 数组中取值)

  2. 每个线程对象 thread 都有一个 ThreadLocalMap 对象 threadLocals

  3. 当创建一个 ThreadLocal 对象,调用其 set 方法时,首先检测当前线程是否存在threadLocals,不存在就创建

  4. 然后以 ThreadLocal 对象本身为 key ,设置的值为 value ,存入ThreadLocalMap 中的 Entry[] 数组中(会有冲突处理过程)

  5. 当从某个线程中 get 某个 ThreadLocal 对象的值时,首先获取到该线程,然后取其 threadLocals 对象

  6. 之后以 ThreadLocal 对象本身作为 key ,调用 threadLocals 对象的 getEntry 方法

  7. 根据 key 计算该 Entry 在 table 数组中可能的位置 index ,这个 index 可能会冲突, 如果冲突了就会做前面说的 nextIndex 操作(其实就是 + 1 操作,如果超出了数组长度,就又从 0 开始),重新获取新的 index 再次尝试

  8. 最后,通过获取的 index 从 Entry[] 数组类型的变量 table 中获取 Entry: Entry entry = table[index]

五、ThreadLocal 可能造成内存泄漏

ThreadLocal 用不好会造成内存泄露。

在使用完后,需要调用remove方法将其Entry移除,否则会导致内存泄露,直到线程对象 thread 被销毁才能被回收。首先看下 ThreadLocal 的原理图 :

ThreadLocal内存泄漏示意图

结合源码我们知道,Entry类是继承WeakReference的,当 ThreadLocal 对象没有被引用之后,可能就被回收了,所以 Entry 中的key就成为了 null ,不可能再被使用到了。

但是,由于我们没有调用 remove 方法(事实上,get和set方法调用过程也可能触发清理 key 为 null 的 Entry),这个 Entry 实例还是存在的,虽然 key 没有了,value 还是在的,所以造成了泄露。

在ThreadLocal 新增或者删除一个元素的时候,会触发这种 key 为 null 的 Entry 扫描操作,这也是不断添加 ThreadLocal 对象并不断移除这些 ThreadLocal 对象时查看内存变化会维持在一个不高不低的阶段的原因。

六、ThreadLocal 可能引发线程不安全

当 ThreadLocal 中使用的是一个静态变量时,可能产生线程不安全的情况,因为各个线程大家的value都是指向同一份引用,比如如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class ThreadLocalUnsafe implements Runnable{

static Number number = new Number(0);

@Override
public void run() {
number.value = number.value + 1;
threadLocal.set(number);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

System.out.println(Thread.currentThread().getName() + "中 integer 的值 为:" + threadLocal.get().value);

}

public static ThreadLocal<Number> threadLocal = new ThreadLocal<>();

public static void main(String[] args) {

for (int i = 0 ;i<5;i++) {
new Thread(new ThreadLocalUnsafe()).start();
}
}

static class Number {
public int value;
public Number(int num){
this.value = num;
}
}
}

在我运行后,输出全部是 5 :

Thread-4中 integer 的值 为:5
Thread-0中 integer 的值 为:5
Thread-3中 integer 的值 为:5
Thread-2中 integer 的值 为:5
Thread-1中 integer 的值 为:5

一般来说,number 初始值是 0 ,每个线程给它 + 1 操作后存入 ThreadLocal 中,后续通过 threadLocal 取出来的时候值应该不一样才对,但是我们看到的是各个线程的值都是一样的。这是因为 ThreadLocal 在存入 value 的时候,存入的都是 number 这同一个对象,所以只要一处改了,就到处都改了,这种情况要注意。

七、线程配合

wait 和 notify 都要在同步代码块中使用,他们有标准的使用范式,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
//wait 过程
synchronized (对象) {
while (条件不满足) {
对象.wait();
}
//业务逻辑
}

//notify 过程
synchronized (对象) {
//业务逻辑改变条件
对象.notifyAll();
}

wait在休眠之前就会释放锁,而notify 与notifyAll不会,他调用了之后,还要等同步区间执行完了,才释放锁。

范式中间为什么要用while循环而不是if判断条件?

如果对象执行 notifyAll ,那么很多线程都会被唤醒,比如是 5 个线程在等待打印机,当某个线程打印完成 notifyAll 的时候,这 5 个线程都被唤醒,由于是使用 if 语句判断的,唤醒之后直接执行后续的代码了,即业务逻辑(打印)了,此时就会出问题。但是如果是while 循环的话,唤醒之后还会判断条件是否满足(打印机是否被别人占了),这样逻辑才正确。

八、等待超时处理

线程等待超时处理,比如使用池化技术的时候,我们去从 pool 中获取资源,但是会有超时提醒,这时候就能用 wait/notify 去做这样的事情。视频中的例子主要需要注意 remain 时长计算,因为唤醒一次就要重新计算等待时长,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public Connection fetchConnection(long mills) throws InterruptedException {
synchronized (pool) {
if (mills < 0) {//永不超时
while(pool.isEmpty()) {
wait();
}
return pool.removeFirst();
} else {
//超时时刻
long future = System.currentTimeMillis() + mills;
//等待时长
long remain = mills;
while(pool.isEmpty() && remain > 0) {
wait(remain);
//重新计算等待时长
remain = future - System.currentTimeMillis();
}
Connection connection = null;
if (!pool.isEmpty()) {
connection = pool.removeFirst();
}
return connection;
}
}
}

为什么需要重新计算?因为唤醒后可能抢不到锁,还会在while中接着等待。

谢谢你的鼓励