Java之线程池ThreadPoolExecutor

发布于 2020-01-22  160 次阅读


前言

首先提出两个问题,1.什么是线程池?2.为什么要使用线程池?

什么是线程池

顾名思义,一个池子,里面放的都是线程。

规范点说,线程池是一个存储线程的容器,将线程先创建好后,放入池中交由线程池去管理维护。

为什么使用线程池

借用《Java 并发编程的艺术》提到的来说一下使用线程池的好处

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

比喻一下:不用线程池的时候,程序里运行的那些线程,就像海里的那些鱼儿,不好管理,难以维护控制,说不定还给跑没影了;用线程池的话,就像把那些鱼儿给放入了一个大池塘里,圈养起来,方便管理。

其实管理只是一方面,上面也提到了,还有资源消耗以及响应速度,一个线程可以复用,如果不复用,不断地创建线程->销毁线程也是很消耗资源的,而且创建也需要时间。

而且现在的程序世界里,池化技术无处不在

数据库有数据库连接池,比如常用的阿里的Druid、 DBCP(依赖于commons-pool )、 C3P0

http有http连接池,比如PoolingHttpClientConnectionManager

这两种目的都比较像,复用连接,管理连接,像http池,复用的话就省去TCP3次握手4次挥手的时间,而且连接也不用人为的释放创建,其实都一个意思,就是交由池子去管理维护.

ThreadPoolExecutor类分析

首先看一下ThreadPoolExecutor类结构

ThreadPoolExecutor类结构

Executor接口中定义了一个方法void execute(Runnable command),用于执行线程

ExecutorService接口中定义了shutdown、shutdownNow、isShutdown、isTerminated、awaitTermination、submit、invokeAll、invokeAny,这些方法,用于管理线程以及查看状态

AbstractExecutorService抽象类只实现了invoke,submit,newTaskFor方法,抽象类可以不实现接口的方法

ThreadPoolExecutor类则是我们要使用的线程池,就是具体的实现了

下面挑一个ThreadPoolExecutor类中参数最多的构造函数说一下

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
}

ThreadPoolExecutor参数分析

最重要的三个参数

  • corePoolSize : 核心线程数线程数定义了最小可以同时运行的线程数量。
  • maximumPoolSize : 当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
  • workQueue: 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。

其他几个参数

  • keepAliveTime: 当线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime才会被回收销毁;
  • unit: keepAliveTime 参数的时间单位。
  • threadFactory: executor 创建新线程的时候会用到。
  • handler: 饱和策略,关于饱和策略下面单独介绍一下,四种(AbortPolicy,CallerRunsPolicy,DiscardPolicy,DiscardOldestPolicy)

ThreadPoolExecutor饱和策略

如果当前同时运行的线程数量达到最大线程数量,并且队列也已经被放满了任务时,ThreadPoolExecutor定义了四种策略,都是 ThreadPoolExecutor中的静态内部类

  • ThreadPoolExecutor.AbortPolicy:抛出RejectedExecutionException来拒绝新任务的处理。
  • ThreadPoolExecutor.CallerRunsPolicy:调用执行自己的线程运行任务,但是这种策略会降低对于新任务提交速度,影响程序的整体性能。另外,这个策略喜欢增加队列容量。如果您的应用程序可以承受此延迟并且你不能任务丢弃任何一个任务请求的话,你可以选择这个策略。
  • ThreadPoolExecutor.DiscardPolicy:不处理新任务,直接丢弃掉
  • ThreadPoolExecutor.DiscardOldestPolicy: 此策略将丢弃最早的未处理的任务请求。

ThreadPoolExecutor的使用

下面放一个简单练习的代码,注意其中的各项参数值,还有饱和策略采用的是CallerRunsPolicy

public class Main {
    private static final int CORE_POOL_SIZE = 3; //核心池线程运行数量
    private static final int MAXIMUM_POOL_SIZE = 3; //最大线程运行数量
    private static final int QUEUE_CAPACITY = 5; //线程队列容量
    private static final Long KEEP_ALIVE_TIME = 1L; //保持存活时间 单位为秒

    public static void main(String[] args) {
        //创建线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAXIMUM_POOL_SIZE,
                KEEP_ALIVE_TIME,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(QUEUE_CAPACITY),
                new ThreadPoolExecutor.CallerRunsPolicy()
        );

        for (int i = 0; i < 10; i++) {
            Runnable worker = new MyRunnable(i);
            try {
                //execute用于执行Runnable
                executor.execute(worker);
            } catch (Exception e) {
                //加入任务到池中后,因饱和策略而定可能抛出异常
                e.printStackTrace();
            }
        }
        //关闭线程池,会等待线程执行完成
        executor.shutdown();
        //等待线程池结束
        while (!executor.isTerminated()) {
        }
        System.out.println("Finished all threads");
    }
}

class MyRunnable implements Runnable {
    public int id;
    public MyRunnable(int id) {
        this.id = id;
    }
    @Override
    public void run() {
        SimpleDateFormat df = new SimpleDateFormat("hh:mm:ss");
        System.out.printf(Thread.currentThread().getName() + " Runnable[%d] start time:%s \n",
                id, df.format(new Date()));
        process();
        System.out.printf(Thread.currentThread().getName() + " Runnable[%d] end time:%s \n",
                id, df.format(new Date()));
    }
    private void process() {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
main Runnable[8] start time:11:23:11 
pool-1-thread-1 Runnable[0] start time:11:23:11 
pool-1-thread-2 Runnable[1] start time:11:23:11 
pool-1-thread-3 Runnable[2] start time:11:23:11 
pool-1-thread-3 Runnable[2] end time:11:23:16 
main Runnable[8] end time:11:23:16 
pool-1-thread-2 Runnable[1] end time:11:23:16 
pool-1-thread-1 Runnable[0] end time:11:23:16 
pool-1-thread-2 Runnable[4] start time:11:23:16 
pool-1-thread-3 Runnable[3] start time:11:23:16 
pool-1-thread-1 Runnable[5] start time:11:23:16 
pool-1-thread-1 Runnable[5] end time:11:23:21 
pool-1-thread-2 Runnable[4] end time:11:23:21 
pool-1-thread-3 Runnable[3] end time:11:23:21 
pool-1-thread-2 Runnable[7] start time:11:23:21 
pool-1-thread-1 Runnable[6] start time:11:23:21 
pool-1-thread-3 Runnable[9] start time:11:23:21 
pool-1-thread-3 Runnable[9] end time:11:23:26 
pool-1-thread-1 Runnable[6] end time:11:23:26 
pool-1-thread-2 Runnable[7] end time:11:23:26 
Finished all threads

是不是发现输出结果中有一个main Runnable[8],这是因为采用的饱和策略 CallerRunsPolicy , 它会调用执行自己的线程运行任务

我们再将饱和策略改为AbortPolicy,队列满时会抛出异常,如下

java.util.concurrent.RejectedExecutionException: Task MyRunnable@330bedb4 rejected from java.util.concurrent.ThreadPoolExecutor@2503dbd3[Running, pool size = 3, active threads = 3, queued tasks = 5, completed tasks = 0]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
	at Main.main(Main.java:43)
java.util.concurrent.RejectedExecutionException: Task MyRunnable@7ea987ac rejected from java.util.concurrent.ThreadPoolExecutor@2503dbd3[Running, pool size = 3, active threads = 3, queued tasks = 5, completed tasks = 0]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
	at Main.main(Main.java:43)
pool-1-thread-2 Runnable[1] start time:11:26:56 
pool-1-thread-1 Runnable[0] start time:11:26:56 
pool-1-thread-3 Runnable[2] start time:11:26:56 
pool-1-thread-3 Runnable[2] end time:11:27:01 
pool-1-thread-2 Runnable[1] end time:11:27:01 
pool-1-thread-1 Runnable[0] end time:11:27:01 
pool-1-thread-2 Runnable[4] start time:11:27:01 
pool-1-thread-3 Runnable[3] start time:11:27:01 
pool-1-thread-1 Runnable[5] start time:11:27:01 
pool-1-thread-3 Runnable[3] end time:11:27:06 
pool-1-thread-2 Runnable[4] end time:11:27:06 
pool-1-thread-3 Runnable[6] start time:11:27:06 
pool-1-thread-2 Runnable[7] start time:11:27:06 
pool-1-thread-1 Runnable[5] end time:11:27:06 
pool-1-thread-2 Runnable[7] end time:11:27:11 
pool-1-thread-3 Runnable[6] end time:11:27:11 
Finished all threads

ThreadPoolExecutor的execute方法

为了更好理解线程池,我们看一下他的execute方法,首先放张图便于理解