线程安全-要么只读,要么加锁

人类越来越厉害,发明了CPU,后来把CPU变成了多线程的,后来又把CPU变成了多核心的,甚至多个CPU在同一块主板上一起工作。为了满足人类越来越快的需求,充分利用这些计算资源,所以人们开始写多线程的程序,并发编程带来了运行上的效率,同时也带来了更多的并发问题,因此需要用更为复杂的逻辑来解决这些问题。

要么只读,要么加锁

在《java并发编程实战》中写到:所有类,只要没有标注线程安全的,一律按照线程不安全来处理。然而日常开发里却没有那么多需要一定要线程安全才行的场景,要是都写成线程安全的,那绝对是过度设计。线程安全问题只在多线程环境下才有讨论意义,单线程的串行执行不存在线程安全不安全的问题。保证高并发场景下的线程安全,无非就是要遵守:要么只读,要么加锁

加锁和共享

正确的完成一个并发程序,就是解决“在访问共享的可变状态时,能够正确的对这个状态进行管理”。因此思路很明确:当并发有问题的时候,就把有问题的代码锁起来,让这段代码能够单线程执行,只允许有一个进行访问和修改,这样一定就没有问题了。但加锁带来了新的问题,加锁导致了性能过差,cpu利用率上不去,因此需要进行改造,需要完成在多个线程能同时安全的访问某个共享对象这一功能。

加锁

日常所说的锁都是一些概念,而不是具体实现,比如

乐观锁 悲观锁
共享锁 排他锁 互斥锁 读写锁
公平锁 非公平锁
可重入锁
分段锁

以上都是用锁的特征或者锁的设计原理来表述的,都只是概念名称,并不是真正的实现。

悲观锁和乐观锁

二者的区分在于多线程情况下是否预定会产生数据影响,假定一定有影响就直接加锁,这就是悲观锁;可能没影响,尝试修改前检查,这就是乐观锁。
Java中悲观锁的体现:synchronized,不论如何,先加把锁再说。
乐观锁:CAS(Compare-And-Swap)机制,修改前检查,或者SQL中的

1
update device set order_count = 88 where device_id = 1 and order_count = 87

相对而言,乐观锁再非大量竞争的情况下,效率远高于悲观锁。很多时候有人把CAS称为自旋锁或者无锁,其实都有道理,毕竟这个是用了逻辑保证了数据的安全更新,不是强行加了阻止竞争的锁。

互斥锁和读写锁

这个和排他锁和共享锁几乎一样,可以说是排他锁和共享锁的一种实现方式。
读写锁,在写入的时候进行排他,在读取的时候进行共享,Java中有ReadWriteLock,对应的实现ReentranReadWriteLock、WeakSafeReadWriteLock等。

公平锁和非公平锁

多个线程竞争同一把锁,会出现竞争,有一个先来后到,如果需要按照一定优先级(不仅包括先后顺序的实现,还可以做其他的优先级)进行排队按顺序拿到锁,就是公平锁;无序竞争,饿死不论的情况就是非公平锁。很明显公平锁相对更复杂一些,因为还需要维护一个安全的排队对列,不仅是维护的那把锁要线程安全,排队的队列一样需要线程安全。
Java中有synchronized关键字实现的就是非公平锁,谁运气好抢到了谁先执行,老抢不到就很倒霉。ReentrantLock构造方法中可以指定是否为公平锁(默认实现的是非公平锁):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}

/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

可重入锁

感觉上应该还有一个不可重入锁才对,但是很少看到有这种东西,不可重入会带来大量的死锁问题,根本就无法使用,所以要设计一个能用的锁,一定是要可重入的。Java中的ReentrantLock synchronized 都是可重入的,Reentry本身这个单词的意思就是再次进入、折返。
Java在设计Object和synchronized关键字的时候,加入了两位标识位作为“获取计数值”和一个线程所有者标识。计数值默认是0,线程A拿到object的锁,标识当前线程作为所有者线程,在计数值上加一,发生重入时再次加一;释放锁就减一,直到减到0,删掉所有者线程标识,完全释放锁。

锁升级

为了线程竞争造成错误的问题加入了锁,使得程序多执行一些逻辑导致运行成本变高,比如把对象锁起来,每次进来之后先检查锁,线程间切换,这都是成本。然而在真实的生产环境中并不真的一直都需要加锁,这些成本是可以避免的,完全没有必要每次都向操作系统申请锁,完全可以使用更轻量级的锁,例如无锁状态(CAS)、JVM的偏向锁(其实这个也不是个锁)、JVM自己维护的轻量级锁等。用这种方式来减少向操作系统申请锁的操作。具体的可以参考JDK1.6中引入的synchronized的锁升级。

对象的共享

锁的思路是防止某个线程正在使用对象状态,而另外一个线程同时在修改该对象状态的场景出现。但日常使用中,同步是希望当一个线程修改了对象状态后,其他线程能够看到修改后的状态变化。所以对象安全的共享和加锁同等重要。
在并发程序中,使用和共享对象时,可以从以下几个维度来设计:

线程封闭 把对象封闭在一个线程内,成为一个“线程局部变量”,只有这个线程可以访问的到,常用的ThreadLocal就是如此。
对象只读共享 所有线程都可以读,都无法写,或者写了不生效,可以使用String Integer之类的不可变对象,或者做成事实性不可变对象(Effectively Immutable Object,发布完就不改了就是这种)。
线程安全共享 某些类的对象,其属性和方法在类的对象都已经实现了同步,多线程访问的时候天然的就线程安全,这种无需再加同步了。比如StringBuffer,内部加了超多synchronized。
同步和锁 以上三种都没办法做到,那只有加上同步和锁来保证并发更新操作的安全险,需要开发者自己保证多线程下逻辑没有问题。

总的来说,“要么只读要么加锁”的限制保证了多线程高并发场景下的线程安全,但加锁就会带来效率低下,只读有时候又无法保证业务需求,JDK又提供了CAS的方式来把修改时的锁轻量化,变成自旋锁或者有人叫“无锁”,这也是加锁的一种,没有逃脱“要么加锁”这一范畴。

以上内容大多都是多线程的基础和理论,java都做了很多对应的实现,都包含在JUC并发包(java.util.concurrent)中,如果有需要可以直接使用这些JDK默认提供的工具。

工具

线程同步工具类

同步的意思就是异步的反义词,线程同步就是把原先多线程执行的程序,按照指定的先后顺序来执行。
Java一开始的时候,只有wait()和notify() 用来做同步,一个线程wait()了,另外一个线程notify()来叫醒它继续,后来JUC包中又加入了很多的线程协调的场景的工具,逐步去替代wait()和notify() 。

CountDownLatch

门闩,倒数用的门闩,当到0的时候,门闩打开,任务执行,这个可以用于多个线程之间的同步,比如123线程都执行完了之后,才能唤醒第四个线程执行。

例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class CountDownLatchDemo {  
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch=new CountDownLatch(3);//三个工人的协作
Worker worker1=new Worker("zhangsan", 1000, latch);
Worker worker2=new Worker("lisi", 2000, latch);
Worker worker3=new Worker("HaiziGe", 3000, latch);
worker1.start();//
worker2.start();//
worker3.start();//
latch.await();//等待所有工人完成工作
System.out.println("all work done at "+sdf.format(new Date()));
}

static class Worker extends Thread{
String workerName;
int workTime;
CountDownLatch latch;
public Worker(String workerName ,int workTime ,CountDownLatch latch){
this.workerName=workerName;
this.workTime=workTime;
this.latch=latch;
}
public void run(){
System.out.println("Worker "+workerName+" do work begin");
doWork();//工作了
System.out.println("Worker "+workerName+" do work complete");
latch.countDown();//工人完成工作,计数器减一
}

private void doWork(){
try {
Thread.sleep(workTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

P2P下载是个超好的例子,比如开启100个线程按数据片下载,下载完这100个之后,由主线程负责把这些数据片拼装成一个完整的文件。
CountDownLatch的底层实现是AQS,在state上做文章,state大于0时,await做阻塞,每countDown一次就减一,直到把state减到0,await阻塞打开。

CyclicBarrier

这个是环形的栏杆,每满多少个线程,就全部放行,有点像火车站春运安检,所有人排队,每20个人放进去一波进行安检,20个满了就放行,循环往复。

CyclicBarrier

例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
final CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println("all threads through barrier"));

for (int i = 0; i < 5; i++) {
final int finalI = i + 1;
new Thread(() -> {
System.out.println("thread " + finalI + " is started");
Random random = new Random();
try {
Thread.sleep(random.nextInt(10000) + 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("thread " + finalI + " has been completed");

try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}

System.out.println("thread " + finalI + " to continue");
}).start();
}

相比于CountDownLatch,CyclicBarrier是可以多次执行的,CountDownLatch是一次性的。而CyclicBarrier所有任务执行状态都是一致的(相互等待),都是达到同一状态,然后集体继续执行,而CountDownLatch是等待所有线程全部处理好打开门闩去处理另外的主任务。

底层实现的原理是利用了ReentrantLock的condition,每次循环都是一个新的condition,用count标识剩余开放栅栏的此时,每次await的时候,就把count减一,减到0,触发既定任务。

Semaphore

限流使用。简单的加锁只能加一把锁,而且没有办法控制排队等着拿锁的线程数(ReentrantLock可以复杂的实现),Semaphore就可以简单实现限流,通过acquire确认通过,通过release释放,当流量达到阈值的时候acquire会阻塞,等到其余的线程release掉之后才放行。

例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class TestCar {

//停车场同时容纳的车辆10
private static Semaphore semaphore=new Semaphore(10);

public static void main(String[] args) {

//模拟100辆车进入停车场
for(int i=0;i<100;i++){

Thread thread=new Thread(new Runnable() {
public void run() {
try {
System.out.println("===="+Thread.currentThread().getName()+"来到停车场");
if(semaphore.availablePermits() == 0){
System.out.println("车位不足,请耐心等待");
}
semaphore.acquire();//获取令牌尝试进入停车场
System.out.println(Thread.currentThread().getName()+"成功进入停车场");
Thread.sleep(new Random().nextInt(10000));//模拟车辆在停车场停留的时间
System.out.println(Thread.currentThread().getName()+"驶出停车场");
semaphore.release();//释放令牌,腾出停车场车位
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},i+"号车");
thread.start();

}

}
}

其实现原理同样的还是利用了AQS,在state上做文章,acquire一下,就给state减一,减到0就阻塞住,进入排队,release一下就给state加一,大于0就唤醒队列中的某个线程,并且这是非公平的。

Phaser

分段执行任务,分段的门闩。每全体执行完一个阶段的任务之后,进入下一阶段。没啥用,知道就好了。

JUC中的锁

LuckSupport

有了wait()、notify()、await()等函数,写多线程代码的时候仍然很麻烦,需要写在synchronized里面,同时有很多的限制,不够方便,java又提供了一种更加简洁的API,LockSupport。

例如用LockSupport完成1A2B3C4D。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class LockSupport1A2B3C4D {

static Thread t1 = null;
static Thread t2 = null;
public static void main(String[] args) {

String[] s1 = {"1","2","3","4","5","6","7"};
String[] s2 = {"A","B","C","D","E","F","G"};

t1 = new Thread(() -> {
for (String s : s1) {
System.out.print(s);
LockSupport.unpark(t2);
LockSupport.park();
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

t2 = new Thread(() -> {
for (String s : s2) {
LockSupport.park();
System.out.println(s);
LockSupport.unpark(t1);

}
});

t1.start();
t2.start();

}

}

park就停止,unpark就继续,简单方便。LockSupport具有如下的特征:

不需要加synchronized就能实现线程阻塞和唤醒

unpark可以在park之间运行,但是park了之后会抵消掉unpark的作用,不阻塞

处于等待状态的线程,连续两次park,这个线程就会阻塞死,永远无法唤醒

其实现原理是用c实现的,所用的锁是找操作系统要的重量锁,AQS中几乎所有的线程阻塞唤醒都是使用的LockSupport,在AQS源码里会经常看到它。

ReentrantLock

最开始只有synchronized,并且它还是个单纯的重量级锁的时候,Doug Lea就很讨厌这样,写了新的锁,效率更高,并且支持公平和非公平这两种,功能比synchronized强大的多,其底层也是AQS。但是它仍然有些功能没实现,比如对某个String的值进行加锁

1
2
3
4
// 这样做的坏处就是intern()会在heap里面产生空间,直接扔到老年代里面,影响fullGC
synchronized (("" + userId).intern()) {
// TODO:something
}

这种场景还是别作了,老老实实写个自己的锁拿来用好了,别没事用String.intern(),更不要作死去用synchronized锁String的Object。

ReentrantLock除了这个没实现外,其余的功能都已经超越了synchronized,同样用它实现1A2B3C4D。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class ReentrantLockImpl {

static Thread t1 = null;
static Thread t2 = null;
public static void main(String[] args) {

String[] s1 = {"1","2","3","4","5","6","7"};
String[] s2 = {"A","B","C","D","E","F","G"};

Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();

t1 = new Thread(() -> {
try {
lock.lock();
for (String s : s1) {
System.out.print(s);
condition.signal();
condition.await();
TimeUnit.MILLISECONDS.sleep(300);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}

});

t2 = new Thread(() -> {
try {
lock.lock();
for (String s : s2) {
condition.await();
System.out.println(s);
condition.signal();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
});
t2.start();
t1.start();
}
}

ReentrantLock用完一定不要忘记unlock,并且他还有丰富的api。

功能 ReentrantLock synchronized
重入
公平 公平/非公平 非公平
加锁/解锁 手动加锁解锁 手动加锁,自动解锁
锁KEY X 锁定对象
中断 lockInterruptibly() X
尝试加锁 tryLock() X
锁超时 tryLock(timeout, unit) X
重入次数查询 getHoldCount() X
等待线程列表 getWaitingThreads() X
锁状态 isLocked() X
条件锁 多个condition,await/signal/singalAll 一个条件,wait/notify/notifyAll

虽然都说AQS实现效率高,但是实际测试发现synchronized并不比ReentrantLock差,所以重入锁并不会替代掉内置锁,只是在某些场景下更好用而已。

AQS 占坑

// todo AQS才是最复杂的东西,源码必看,这里放链接

并发容器类

Java一开始的时候,也是都写了很多线程安全的容器,HashTable、Vector之类的,后来发现大部分的场景都没有必要用这种线程安全的容器,有点浪费性能了,于是开发出了HashMap一类的线程不安全但是性能相对好得多的工具。之后为了彻底解决掉并发线程安全但性能差的问题,开发了ConcurrentHashMap、ConcurrentSkipListMap、CopyOnWriteArrayList一堆这种专用的线程安全容器,还开发了Queue的众多实现专门给常用的队列场景来用。

// todo 这个专门写一篇吧,这个是之前挖的坑,专门来填。

多线程

线程

因为线程才有了今天这篇博客,Java的每个线程都会在操作系统中也申请一个线程出来,由操作系统和JVM共同管理。

线程,启动!

线程有几种写法这个事其实和“回字的四种写法”不一样。Thread、Runable、Callable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// 这种其实不符合里氏替换原则,所以不是很推荐
static class MyThread extends Thread {
@Override
public void run() {
System.out.println("MyThread.run......running");
}
}

// 发生异常后没办法抛出,
// 只有设置了setDefaultUncaughtExceptionHandler()才能在主线程中捕获这个异常
static class MyRun implements Runnable {
@Override
public void run() {
System.out.println("MyRun.run......running");
}
}
static class MyCall implements Callable<String> {

@Override
public String call() {
System.out.println("MyCall.run......running");
return "Nick Fury is MOTHERFUCKER-MEN";
}
}

public static void main(String[] args) {
new MyThread().start();
new Thread(new MyRun()).start();
new Thread(() -> System.out.println("MyLambda.run......running")).start();

new Thread(new FutureTask<>(new MyCall())).start();

ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> System.out.println("Executors.run......running"));
// 必须要写停止,要不然会一直运行下去
executorService.shutdown();

// use google guava for setThreadName
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();

//Common Thread Pool
ExecutorService pool = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(2), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());

pool.execute(()-> {
System.out.println(Thread.currentThread().getName() + ".run......running");
});
//gracefully shutdown
pool.shutdown();

}

想起来两年前,我写了一个接收XX银行每日对仗数据的TCP服务端程序,用Netty写的,用线程启动Netty,然后阻塞住,直到Netty启动的Server关闭,结果每次都启动到阻塞就卡住,不再运行其他的,怎么调试都不行,最后发现,我居然沙雕的写成了 new Thread(*).run()。。。最后找出来问题的时候,被旁边的同事嘲笑死了,到底还是年轻啊/(ㄒoㄒ)/~~

线程状态

NEW(新建状态)、RUNNABLE(就绪状态)、RUNNING(运行状态)、BLOCKED(阻塞状态)、DEAD(终止状态)五种,状态之间的转换如下图

线程状态

现代CPU都是按照时间片来执行,给每个线程一个时间片(非常小,所以CPU本身也是“伪多线程的”),执行完了就换下一个,直到线程执行完进入DEAD状态。

线程和异常

多线程运行的时候并不抛出异常,有了异常线程直接进入DEAD状态,不执行了,这时候别的等待执行的代码会进来,导致程序可能会出现一些不可控的情况,例如异常后其它线程访问到了上一个异常情况的代码等。

ThreadLocal

前段时间,我从gitee上看到了一个别人搭建的工程叫野驴,我把它改造了一下作为我自己用的基础工程,在用的过程中发现,他的日志写得其实有很大的问题,多个线程共享了同一个SysLog对象,像URL、耗时、入参出参之类的数据都有问题,都会被后来的一个线程改掉,这种情况下只要把那个SysLog对象用ThreadLocal改造掉就好了。

1
2
3
4
private ThreadLocal<SysLog> threadLocalLog = new ThreadLocal<>();
threadLocalLog.set(new SysLog());
SysLog log = threadLocalLog.get();
threadLocalLog.remove();// 用完后删掉,防止内存泄漏

ThreadLocal的原理

ThreadLocal是和Thread一起使用的,本质上是每个Thread在初始化的时候都初始化一个ThreadLocalMap(并不是Map的实现,而是自己实现的一种类似Map的结构),其Key为ThreadLocal对象本身,value为实际用的泛型对象,Key是当前ThreadLocal的弱引用,在发生GC的时候,会把只剩下弱引用的对象给回收掉,那么key就成为了null,value作为强引用依然存在,这样就出现了经常说的内存泄漏。因此ThreadLocal在get、set、remove的时候会再检查一遍是否有key为null的情况,有的话会把对应的entry释放掉,保证不产生长时间的内存泄漏。

针对ThreadLocal的内存泄漏问题,我实际在写代码中都已经规避完了,两个办法:

ThreadLocal 使用static装饰起来,保证永远都有强引用。

每次用完ThreadLocal对象后,执行remove方法。

在没有充分了解ThreadLocal之前,我就按照上面两个办法来做。。。可能是看了阿里的规范的缘故。

假设key并不是弱引用,是最常见的强引用,那么

1
2
3
private ThreadLocal<SysLog> tl = new ThreadLocal<>();
tl.set(new SysLog());
tl = null;

第二行执行之后,由于ThreadLocalMap中的key指向的为new ThreadLocal<>()对象,tl指向的也是new ThreadLocal<>()对象,当第三行执行之后,tl对象为空,准备回收tl,但是这时ThreadLocalMap中的key仍为new ThreadLocal<>()对象,导致new ThreadLocal<>()对象并不能被回收,进而Memory Leak了。

key作为弱引用就不会有这个问题,当tl变null之后,只有key一个引用指向new ThreadLocal<>(),而且还是个弱引用,可以直接被回收。但是仍然会有内存泄漏的问题,原因在上文中已经描述过了。

要使用到弱引用的根本原因就是ThreadLocal和Thread配合使用,ThreadLocal本身不存储数据,只是帮忙把ThreadLocalMap存到Thread中去的工具,并且把工具本身作为了key。

几种ThreadLocal的改造

ThreadLocal的内存泄漏问题并不是什么事,设计上也很ok,实用,还行。但是也存在着一些缺点,比如当ThreadLocalMap存在大量hash冲突的时候,查找和插入效率都会下降不少等。为了解决这种问题,有些人也做了一定的努力,比如Netty中从线程开始就全部重写掉,引入与ThreadLocal完全兼容的FastThreadLocal,甚至为了这个事完全重写了Thread。
FastThreadLocal保存了对应的index,每个index都在FastThreadLocal实例化的时候产生,进行专门的递增序列,这样把数据根据index放到FastThreadLocalThread(继承至Thread)的数组中,每次取的时候,直接就可以拿FastThreadLocal中的index直接取了,相对于线性探测的Map实现,速度相对快了不少。

ThreadLocal同时还存在着子线程访问父线程ThreadLocal失败的问题,它不支持这个功能,jdk中支持这个功能的是InheritableThreadLocal,带遗传的ThreadLocal。其原理是利用inheritableThreadLocals这个ThreadLocalsMap在进行创建子线程的时候,把父线程的inheritableThreadLocals复制(浅拷贝)一份到子线程的inheritableThreadLocals属性里。

日常使用中大量使用线程池,也就使得池化的线程生命周期超级长,此时线程是复用的,也就是说父线程中的ThreadLocal传递到子线程意义不大,反而是要在任务提交给线程池时把要传递的值传递到线程执行时。这时候可以使用阿里的TransmittableThreadLocal,简称TTL。我没用过,不写了,抽个时间再研究下原理。

线程池

Thread和ThreadLocal在JDK最初的版本中就已经有了,但是真正好用起来还是线程池的加入。池化的思想随处可以见:数据库连接成本大,线连接好放起来,取的时候直接用;httpclient连接每次都要初始化,池化它等。凡是创建和销毁耗费资源,并且重用度高的东西,池化都能带来性能上的帮助。
开启一个线程需要开辟虚拟机栈、本地方法栈、程序计数器等线程的私有空间,还需要向操作系统申请线程,销毁的时候同样要注销这些资源,所以我们需要对线程进行池化,线程池也就是如此。同时线程池还提供了一些其他的好处:

对线程进行管理,复用线程,控制线程数量等
线程全忙时,对线程友好的进行排队或者拒绝
定时任务
不同线程池之间线程隔离,减小线程间影响

阿里的Java规范中,强制要求线程池不许使用Executors,只能使用ThreadPoolExecutor这种来创建线程池,原因是Java自带的这些线程池例如FixedThreadPool、SingleThreadPool、CachedThreadPool都存在允许最大的的请求队列或者创建的线程数为2^32个,肯定回导致OOM,因此使用ThreadPoolExecutor来创建,并且需要把7个参数都写清楚,构造方法如下:

参数 用法
corePoolSize 核心线程数,线程创建一开始就有的线程数
maximumPoolSize 最大线程数,超过这个线程要进入排队队列,要大于等于核心线程数
keepAliveTime 空闲超时时间,超过这个时间没事做,会把池内的线程注销到只剩下核心线程数的线程
TimeUnit 超时时间的单位
BlockingQueue 超出的线程在此排队,可以采用LinkedBlockingQueue,必须要指定容量
ThreadFactory 创建线程的工厂,最重要的一点是给每个线程加名字,有利于调试和排查
RejectedExecutionHandler 拒绝策略,超过BlockingQueue之外的线程,走一定的拒绝策略,作为限流

写一个简单的ThreadFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class MyFactory implements ThreadFactory {

private final String namePrefix;

private final AtomicInteger nextid = new AtomicInteger(1);

public MyFactory(String FeatureGroup) {
this.namePrefix = "MyFactory-" + FeatureGroup + "-worker-";
}

@Override
public Thread newThread(Runnable r) {
String name = namePrefix + nextid.getAndIncrement();
return new Thread(null, r, name,0);
}
}

写一个简单的决绝策略

1
2
3
4
5
6
7
public class MyRejectHandler implements RejectedExecutionHandler {

@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System. out .println ("线程池已经满了,当前申请已拒绝" + executor.toString());
}
}

拼一下,整一个简单的线程池实验

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class MyTreadPoolExecutor {

public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 3,
0, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(5), new MyThreadFactory("myTest"), new MyRejectHandler());

Runnable task = new MyTask();
for (int i = 0; i < 10; i++) {
threadPoolExecutor.execute(task);
}
threadPoolExecutor.shutdown();

}

public static class MyTask implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " is running");
}
}
}

结果输出

1
2
3
4
5
6
7
8
9
10
MyFactory-myTest-worker-2 is running
MyFactory-myTest-worker-3 is running
MyFactory-myTest-worker-1 is running
MyFactory-myTest-worker-1 is running
MyFactory-myTest-worker-1 is running
线程池已经满了,当前申请已拒绝java.util.concurrent.ThreadPoolExecutor@60e53b93[Running, pool size = 3, active threads = 3, queued tasks = 5, completed tasks = 0]
线程池已经满了,当前申请已拒绝java.util.concurrent.ThreadPoolExecutor@60e53b93[Running, pool size = 3, active threads = 3, queued tasks = 5, completed tasks = 0]
MyFactory-myTest-worker-1 is running
MyFactory-myTest-worker-1 is running
MyFactory-myTest-worker-2 is running

线程池的源码

线程池状态

线程的状态和线程池的状态是不一样的,线程池的状态定义在ThreadPoolExecutor源码的一开始,它采用32位int的高三位来标识线程的状态,采用了低29位来标识当前线程中线程的数量,这样做可以用一个int来标识了两块信息,同时又使用位运算来提高效率,网上各种人都在说用位运算怎么怎么好,怎么怎么高,可是我相信每个写过单片机程序的开发者都很自然而然的写出这种类似的位运算,毫无难度。下表表示了线程池的几种状态(其实我只关心RUNNING状态),二进制列用横线隔开了高3位和低29位:

状态 用处 二进制
RUNNING 接收新任务,旧任务继续 111-00000000000000000000000000000
SHUTDOWN 拒绝新任务,旧任务继续执行 000-00000000000000000000000000000
STOP 拒绝新任务,中断正在执行的旧任务 001-00000000000000000000000000000
TIDYING 拒绝新任务,所有任务都已中止 010-00000000000000000000000000000
TERMINATED 拒绝新任务,线程已经被清理 011-00000000000000000000000000000

这样这五个状态就可以进行大小比较,RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED,在判断是否是RUNNING 状态是可以直接判断是否小于SHUTDOWN。

execute()

添加一个线程,作者在注释中写清了步骤

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
// 拿到线程数和线程池状态
int c = ctl.get();
// 线程数未到核心线程数
if (workerCountOf(c) < corePoolSize) {
// 创建一个核心线程,并执行,创建成功就返回
if (addWorker(command, true))
return;
// 创建失败了,说明外部有人动了线程池或者线程池自己状态变化了。
// 这里是个Double-Check的思想
c = ctl.get();
}
// 核心线程满了,判断线程池是否是RUNNING状态,是的话就加到workQueue里面
if (isRunning(c) && workQueue.offer(command)) {
// Double-Check
int recheck = ctl.get();
// 在执行的过程中,是不是被shutdown了,是的话就直接拒绝掉
if (!isRunning(recheck) && remove(command))
reject(command);
// 核心线程可以设置为0,这样就会导致都进入等待队列,永远都无法运行
// 这里添加一个null,可以保证有一个线程在运行
// 这个null之后就会从workQueue中取新的线程来执行
// 而command已经被加到command中去,所以这里是作者的小心思
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 线程池不是RUNNING状态,加到
else if (!addWorker(command, false))
reject(command);
}

execute()方法还算比较简单,主要是需要经常考虑到线程池的状态被其他线程中间改掉了,发送脏读脏写的事。

addworker()

addworker()本身包含了作者太多的小心思,很多都是他精炼了代码来巧妙设计逻辑,这样就牺牲了超多的可读性,代码也不再是自解释的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
// 外层自旋,用于goto语句的再次continue和break
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// 这个条件写得比较难懂,我对其进行了调整,和下面的条件等价
// (rs > SHUTDOWN) ||
// (rs == SHUTDOWN && firstTask != null) ||
// (rs == SHUTDOWN && workQueue.isEmpty())
// 1. 线程池状态大于SHUTDOWN时,直接返回false
// 2. 线程池状态等于SHUTDOWN,且firstTask不为null,直接返回false
// 3. 线程池状态等于SHUTDOWN,且队列为空,直接返回false
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
return false;

// 内层自旋
for (;;) {
int wc = workerCountOf(c);
// worker数量超过容量,直接返回false
if (wc >= CAPACITY
|| wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 使用CAS的方式增加worker数量。
// 若增加成功,则直接跳出外层循环进入到第二部分
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 线程池状态发生变化,对外层循环进行自旋
if (runStateOf(c) != rs)
continue retry;
// 其他情况,直接内层循环进行自旋即可
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// worker的添加必须是串行的,因此需要加锁
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
// 这儿需要重新检查线程池状态
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// worker已经调用过了start()方法,则不再创建worker
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// worker创建并添加到workers成功
workers.add(w);
// 更新`largestPoolSize`变量
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 启动worker线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// worker线程启动失败,说明线程池状态发生了变化(关闭操作被执行),需要进行shutdown相关操作
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

去深究每一行代码的时候其实意义不是很大,读源码的意义在于了解作者是如何实现的,了解这么实现的优劣,在使用的时候避开比较差的使用方法,而不在于完全解读作者的每一行代码每一个技巧,这么做意义不大。
我找了以上这个理由暂停了线程源码的阅读。

0%