Java线程池

[TOC]

1.为什么需要使用线程池

  合理利用线程池能够带来三个好处。
  第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  第二:提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即行。
  第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

2.线程池的创建

  可以通过 ThreadPoolExecutor 来创建一个线程池。

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {

  corePoolSize表示线程池的基本大小:当提交一个任务到线程池时,如果线程池中的线程数小于corePoolSize,就会创建一个新线程来执行任务, 即使有空闲的线程,直到线程池中的线程数量等到corePoolSize。如果调用了线程池的 prestartAllCoreThreads 方法,线程池会提前创建并启动所有基本线程。
  maximumPoolSize是线程池最大大小,线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。
  keepAliveTime:线程池中的空闲线程所能持续的最长时间。
  workQueue任务队列,用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列:

  1. ArrayBlockingQueue: 是一个基于数组结构的有界阻塞队列, 此队列按 FIFO(先进先出)原则对元素进行排序。
  2. LinkedBlockingQueue: 一个基于链表结构的阻塞队列, 此队列按 FIFO (先进先出) 排序元素, 吞吐量通常要高于 ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
  3. SynchronousQueue: 一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态, 吞吐量通常要高于 LinkedBlockingQueue,静态工厂方法 Executors.newCachedThreadPool 使用了这个队列。
  4. PriorityBlockingQueue: 一个具有优先级得无限阻塞队列。

  threadFactory:线程工厂,主要用来创建线程;
  handler:表示当拒绝处理任务时的策略,(也就是中断策略,由于线程池中的线程容器已经放不的任务了,饱和了,必须要有一个相应的策略来处理)有以下四种取值:

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常,(中止)它是默认的策略。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常,(遗弃)策略,它默认会放弃这个任务。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)(遗弃最旧的)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务,(调用者运行), 它既不会丢弃任务,也不会抛出任何异常,它会把任务推回到调用者那里去,以此缓解任务流

  

3.向线程池提交任务

  可以使用 execute 提交的任务,但是 execute 方法没有返回值,所以无法判断任务知否被线程池执行成功。

1
2
3
4
5
6
threadPoolExecutor.execute(new Runnable(){
@Override
public void run() {
// TODO Auto-generated method stub
}
});

  也可以使用 submit 方法来提交任务,它会返回一个 future, 通过这个 future 来判断任务是否执行成功,通过 future 的 get 方法来获取返回值, get 方法会阻塞住直到任务完成,而使用get(long timeout, TimeUnit unit)方法则会阻塞一段时间后立即返回,这时有可能任务没有执行完。

1
2
3
4
5
6
7
8
9
10
11
Future<Object> future = executor.submit(harReturnValuetask);
try {
Object s = future.get();
} catch (InterruptedException e) {
// 处理中断异常
} catch (ExecutionException e) {
// 处理无法执行任务异常
} finally {
// 关闭线程池
executor.shutdown();
}

  

4.线程池的关闭

  通过调用线程池的 shutdown 或 shutdownNow 方法来关闭线程池,它们的原理是遍历线程池中的工作线程,然后逐个调用线程的 interrupt 方法来中断线程,所以无法响应中断的任务可能永远无法终止。 但是它们存在一定的的区别, shutdownNow 首先将线程池的状态设置成 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,而 shutdown 只是将线程池的状态设置成 SHUTDOWN 状态,然后中断所有没有正在执行任务的线程。
  只要调用了这两个关闭方法的其中一个, isShutdown 方法就会返回 true。当所有的任务都已关闭后, 才表示线程池关闭成功,这时调用 isTerminaed 方法会返回 true。至于我们应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用 shutdown 来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow。

  

5.线程池的分析

  线程池的主要工作流程如图:

  从上图可以看出,当提交一个新任务到线程池时,线程池的处理流程如下:
  1) 首先线程池判断基本线程池是否已满?没满, 创建一个工作线程来执行任务。 满了,则进入下个流程。
  2) 其次线程池判断工作队列 是否已满?没满,则将新提交的任务存储在工作队列里。 满了,则进入下个流程。
  3) 最后线程池判断整个线程池是否已满?没满,则创建一个新的工作线程来执行任务,满了,则交给饱和策略来处理这个任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
    public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to start a new thread with the given command as its first task. The call to addWorker atomically checks runState and workerCount, and so prevents false alarms that would add threads when it shouldn't, by returning false.
* 2. If a task can be successfully queued, then we still need to double-check whether we should have added a thread (because existing ones died since last checking) or that the pool shut down since entry into this method. So we recheck state and if necessary roll back the enqueuing if stopped, or start a new thread if there are none.
* 3. If we cannot queue task, then we try to add a new thread. If it fails, we know we are shut down or saturated and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}

  

6.Executors

  一般而言,创建线程池都是使用Executors提供的一系列工厂方法用于创先线程池,返回的线程池都实现了ExecutorService接口。
  ExecutorService接口继承自Executor接口,它提供了更丰富的实现多线程的方法,比如,ExecutorService提供了关闭自己的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。可以调用ExecutorService的shutdown()方法来关闭 ExecutorService,调用该方法后,将导致ExecutorService停止接受任何新的任务且等待已经提交的任务执行完成(已经提交的任务会分两类:一类是已经在执行的,另一类是还没有开始执行的),当所有已经提交的任务执行完毕后将会关闭ExecutorService。因此一般用该接口来实现和管理多线程。
  ExecutorService的生命周期包括三种状态:运行、关闭、终止。创建后便进入运行状态,当调用了shutdown()方法时,便进入关闭状态,此时意味着ExecutorService不再接受新的任务,但它还在执行已经提交了的任务,当已经提交了的任务执行完后,便到达终止状态。如果不调用shutdown()方法,ExecutorService会一直处在运行状态,不断接收新的任务,执行新的任务,服务器端一般不需要关闭它,保持一直运行即可。

  Executors的工厂方法:

1
2
public static ExecutorService newFixedThreadPool(int nThreads)
创建固定数目线程的线程池。
1
2
3
public static ExecutorService newCachedThreadPool()
创建一个可缓存的线程池,调用execute将重用以前构造的线程(如果线程可用)。
如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。
1
2
public static ExecutorService newSingleThreadExecutor()
创建一个单线程化的Executor。
1
2
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。

  这四种方法都是用的Executors中的ThreadFactory建立的线程,下面就以上四个方法做个比较
  newCachedThreadPool(): -缓存型池子,先查看池中有没有空闲的线程,如果有,就reuse;如果没有,就建一个新的线程加入池中。
  newFixedThreadPool(int): -newFixedThreadPool与cacheThreadPool差不多,也是能reuse就用,但不能随时建新的线程。其独特之处:任意时间点,最多只能有固定数目的活动线程存在,此时如果有新的线程要建立,只能放在另外的队列中等待,直到当前的线程中某个线程终止直接被移出池子。fixed池线程数固定,并且是0秒IDLE(无IDLE),cache池线程数支持0-Integer.MAX_VALUE,60秒IDLE。
  newScheduledThreadPool(int): -调度型线程池,这个池子里的线程可以按schedule依次delay执行,或周期执行。
  newSingleThreadExecutor():-单例线程,任意时间池中只能有一个线程。用的是和cache池和fixed池相同的底层池,但线程数目是1,0秒IDLE(无IDLE)

  

7.线程池的监控

  ThreadPoolExecutor提供接口获取内部运行情况,用于监控线程池:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* 线程池需要执行的任务数
*/
long taskCount = threadPoolExecutor.getTaskCount();
/**
* 线程池在运行过程中已完成的任务数
*/
long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();
/**
* 曾经创建过的最大线程数
*/
long largestPoolSize = threadPoolExecutor.getLargestPoolSize();
/**
* 线程池里的线程数量
*/
long poolSize = threadPoolExecutor.getPoolSize();
/**
* 线程池里活跃的线程数量
*/
long activeCount = threadPoolExecutor.getActiveCount();

  注意 getLargestPoolSize 返回在这个线程池里曾经同时存在过的最大线程数。largestPoolSize是历史最大值,只增不减。

8.如何设置线程数

1、CPU密集型:一般线程数设置为:CPU核数 + 1。
2、IO密集型:((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目,比如CPU核数为4核,一个任务线程cpu耗时为20ms,IO耗时80ms,那最佳线程数目:( 80 + 20 )/20 * 4 = 20。