前言
首先提出两个问题,1.什么是线程池?2.为什么要使用线程池?
什么是线程池
顾名思义,一个池子,里面放的都是线程。
规范点说,线程池是一个存储线程的容器,将线程先创建好后,放入池中交由线程池去管理维护。
为什么使用线程池
借用《Java 并发编程的艺术》提到的来说一下使用线程池的好处:
- 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
- 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
比喻一下:不用线程池的时候,程序里运行的那些线程,就像海里的那些鱼儿,不好管理,难以维护控制,说不定还给跑没影了;用线程池的话,就像把那些鱼儿给放入了一个大池塘里,圈养起来,方便管理。
其实管理只是一方面,上面也提到了,还有资源消耗以及响应速度,一个线程可以复用,如果不复用,不断地创建线程->销毁线程也是很消耗资源的,而且创建也需要时间。
而且现在的程序世界里,池化技术无处不在
数据库有数据库连接池,比如常用的阿里的Druid、 DBCP(依赖于commons-pool )、 C3P0
http有http连接池,比如PoolingHttpClientConnectionManager
这两种目的都比较像,复用连接,管理连接,像http池,复用的话就省去TCP3次握手4次挥手的时间,而且连接也不用人为的释放创建,其实都一个意思,就是交由池子去管理维护.
ThreadPoolExecutor类分析
首先看一下ThreadPoolExecutor类结构
Executor接口中定义了一个方法void execute(Runnable command),用于执行线程
ExecutorService接口中定义了shutdown、shutdownNow、isShutdown、isTerminated、awaitTermination、submit、invokeAll、invokeAny,这些方法,用于管理线程以及查看状态
AbstractExecutorService抽象类只实现了invoke,submit,newTaskFor方法,抽象类可以不实现接口的方法
ThreadPoolExecutor类则是我们要使用的线程池,就是具体的实现了
下面挑一个ThreadPoolExecutor类中参数最多的构造函数说一下
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
ThreadPoolExecutor参数分析
最重要的三个参数
- corePoolSize : 核心线程数线程数定义了最小可以同时运行的线程数量。
- maximumPoolSize : 当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
- workQueue: 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。
其他几个参数
- keepAliveTime: 当线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime才会被回收销毁;
- unit: keepAliveTime 参数的时间单位。
- threadFactory: executor 创建新线程的时候会用到。
- handler: 饱和策略,关于饱和策略下面单独介绍一下,四种(AbortPolicy,CallerRunsPolicy,DiscardPolicy,DiscardOldestPolicy)
ThreadPoolExecutor饱和策略
如果当前同时运行的线程数量达到最大线程数量,并且队列也已经被放满了任务时,ThreadPoolExecutor定义了四种策略,都是 ThreadPoolExecutor中的静态内部类
- ThreadPoolExecutor.AbortPolicy:抛出RejectedExecutionException来拒绝新任务的处理。
- ThreadPoolExecutor.CallerRunsPolicy:调用执行自己的线程运行任务,但是这种策略会降低对于新任务提交速度,影响程序的整体性能。另外,这个策略喜欢增加队列容量。如果您的应用程序可以承受此延迟并且你不能任务丢弃任何一个任务请求的话,你可以选择这个策略。
- ThreadPoolExecutor.DiscardPolicy:不处理新任务,直接丢弃掉。
- ThreadPoolExecutor.DiscardOldestPolicy: 此策略将丢弃最早的未处理的任务请求。
ThreadPoolExecutor的使用
下面放一个简单练习的代码,注意其中的各项参数值,还有饱和策略采用的是CallerRunsPolicy
public class Main {
private static final int CORE_POOL_SIZE = 3; //核心池线程运行数量
private static final int MAXIMUM_POOL_SIZE = 3; //最大线程运行数量
private static final int QUEUE_CAPACITY = 5; //线程队列容量
private static final Long KEEP_ALIVE_TIME = 1L; //保持存活时间 单位为秒
public static void main(String[] args) {
//创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAXIMUM_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(QUEUE_CAPACITY),
new ThreadPoolExecutor.CallerRunsPolicy()
);
for (int i = 0; i < 10; i++) {
Runnable worker = new MyRunnable(i);
try {
//execute用于执行Runnable
executor.execute(worker);
} catch (Exception e) {
//加入任务到池中后,因饱和策略而定可能抛出异常
e.printStackTrace();
}
}
//关闭线程池,会等待线程执行完成
executor.shutdown();
//等待线程池结束
while (!executor.isTerminated()) {
}
System.out.println("Finished all threads");
}
}
class MyRunnable implements Runnable {
public int id;
public MyRunnable(int id) {
this.id = id;
}
@Override
public void run() {
SimpleDateFormat df = new SimpleDateFormat("hh:mm:ss");
System.out.printf(Thread.currentThread().getName() + " Runnable[%d] start time:%s \n",
id, df.format(new Date()));
process();
System.out.printf(Thread.currentThread().getName() + " Runnable[%d] end time:%s \n",
id, df.format(new Date()));
}
private void process() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
main Runnable[8] start time:11:23:11
pool-1-thread-1 Runnable[0] start time:11:23:11
pool-1-thread-2 Runnable[1] start time:11:23:11
pool-1-thread-3 Runnable[2] start time:11:23:11
pool-1-thread-3 Runnable[2] end time:11:23:16
main Runnable[8] end time:11:23:16
pool-1-thread-2 Runnable[1] end time:11:23:16
pool-1-thread-1 Runnable[0] end time:11:23:16
pool-1-thread-2 Runnable[4] start time:11:23:16
pool-1-thread-3 Runnable[3] start time:11:23:16
pool-1-thread-1 Runnable[5] start time:11:23:16
pool-1-thread-1 Runnable[5] end time:11:23:21
pool-1-thread-2 Runnable[4] end time:11:23:21
pool-1-thread-3 Runnable[3] end time:11:23:21
pool-1-thread-2 Runnable[7] start time:11:23:21
pool-1-thread-1 Runnable[6] start time:11:23:21
pool-1-thread-3 Runnable[9] start time:11:23:21
pool-1-thread-3 Runnable[9] end time:11:23:26
pool-1-thread-1 Runnable[6] end time:11:23:26
pool-1-thread-2 Runnable[7] end time:11:23:26
Finished all threads
是不是发现输出结果中有一个main Runnable[8],这是因为采用的饱和策略 CallerRunsPolicy , 它会调用执行自己的线程运行任务。
我们再将饱和策略改为AbortPolicy,队列满时会抛出异常,如下
java.util.concurrent.RejectedExecutionException: Task MyRunnable@330bedb4 rejected from java.util.concurrent.ThreadPoolExecutor@2503dbd3[Running, pool size = 3, active threads = 3, queued tasks = 5, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at Main.main(Main.java:43)
java.util.concurrent.RejectedExecutionException: Task MyRunnable@7ea987ac rejected from java.util.concurrent.ThreadPoolExecutor@2503dbd3[Running, pool size = 3, active threads = 3, queued tasks = 5, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at Main.main(Main.java:43)
pool-1-thread-2 Runnable[1] start time:11:26:56
pool-1-thread-1 Runnable[0] start time:11:26:56
pool-1-thread-3 Runnable[2] start time:11:26:56
pool-1-thread-3 Runnable[2] end time:11:27:01
pool-1-thread-2 Runnable[1] end time:11:27:01
pool-1-thread-1 Runnable[0] end time:11:27:01
pool-1-thread-2 Runnable[4] start time:11:27:01
pool-1-thread-3 Runnable[3] start time:11:27:01
pool-1-thread-1 Runnable[5] start time:11:27:01
pool-1-thread-3 Runnable[3] end time:11:27:06
pool-1-thread-2 Runnable[4] end time:11:27:06
pool-1-thread-3 Runnable[6] start time:11:27:06
pool-1-thread-2 Runnable[7] start time:11:27:06
pool-1-thread-1 Runnable[5] end time:11:27:06
pool-1-thread-2 Runnable[7] end time:11:27:11
pool-1-thread-3 Runnable[6] end time:11:27:11
Finished all threads
ThreadPoolExecutor的execute方法
为了更好理解线程池,我们看一下他的execute方法,首先放张图便于理解
// 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int workerCountOf(int c) {
return c & CAPACITY;
}
private final BlockingQueue<Runnable> workQueue;
public void execute(Runnable command) {
// 如果任务为null,则抛出异常。
if (command == null)
throw new NullPointerException();
// ctl 中保存的线程池当前的一些状态信息
int c = ctl.get();
// 下面会涉及到 3 步 操作
// 1.首先判断当前线程池中之行的任务数量是否小于 corePoolSize
// 如果小于的话,通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2.如果当前之行的任务数量大于等于 corePoolSize 的时候就会走到这里
// 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态才会被并且队列可以加入任务,该任务才会被加入进去
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。
if (!isRunning(recheck) && remove(command))
reject(command);
// 如果当前线程池为空就新创建一个线程并执行。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//3. 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
//如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。
else if (!addWorker(command, false))
reject(command);
}
利用Executors创建线程池
一个可根据实际情况调整线程数量的线程池,线程池的线程数量不确定,但若有空闲线程可以复用,
则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。
所有线程在当前任务执行完毕后,将返回线程池进行复用。
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
一个固定线程数量的线程池,该线程池中的线程数量始终不变。当有一个新的任务提交时,
线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,
待有线程空闲时,便处理在任务队列中的任务。
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
一个固定数量的线程池,支持定时及周期性任务执行,scheduledThreadPool.scheduleWithFixedDelay()方法设置周期
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
只有一个线程的线程池,若多余一个任务被提交到该线程池,
任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();