创建线程池

使用Executors创建线程池

Java 1.5引入了Executors API,通过该API可以直接创建线程池。

1
2
3
4
5
6
7
8
9
10
11
//创建一个可重用的线程池,任务会选择线程池中已创建的线程,如果没有可用线程,则会在线程池中创建新线程,如果线程60秒内没被使用,将被关闭并移除
Executors.newCachedThreadPool();

//创建一个固定大小的线程池,支持定时以及周期任务
Executors.newScheduledThreadPool(10);

//创建单个工作线程的Executor,所有提交的任务会按照提交的顺序依次执行
Executors.newSingleThreadExecutor();

//创建固定长度的线程池,任何时候最多只会创建指定个数的线程来执行任务,如果提交任务时没有可用线程,该任务将会的队列中等待,知道线程可用
Executors.newFixedThreadPool(10);

手动创建线程池

虽然通过Executors API可以很方便的创建线程池, 但是阿里巴巴Java开发手册中确是不允许这么做的,提示我们Executors存在OOM的风险,应使用ThreadPoolExecutor手动创建线程池。

mark

ThreadPoolExecutor构造函数如下:

1
2
3
4
5
6
7
8
9
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
//....
}
  • corePoolSize:核心线程池数量,除非通过allowCoreThreadTimeOut()设置超时,否则无论线程是否闲置,线程池中都将保留该数量的线程
  • maximumPoolSize:线程池所允许线程的最大数量
  • keepAliveTime:存活时间,当线程池内的线程数量超过corePoolSize时,闲置线程等待的时间。超过该时间线程将被关闭
  • unit:keepAliveTime的单位
  • workQueue:存放未处理的任务队列
  • ThreadFactory:executor创建新线程时使用的ThreadFactory
  • RejectedExecutionHandler:当线程都在使用且队列内等待的任务数量达到队列上限时调用

线程池核心以及最大数量

ThreadPoolExecutor会根据corePoolSizemaximumPoolSize自动调整线程池大小,当提交新任务时,若当前线程数小于corePoolSize,无论是否有闲置线程都将创建新的线程。如果线程数介于corePoolSizemaximumPoolSize之间,只有不存在闲置线程时才会创建新线程。

预先启动核心线程

默认情况下,ThreadPoolExecutor只会创建线程,只有通过execute()提交任务时才会启动线程。如果参数传入的为非空队列,可通过prestartCoreThread()直接启动线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();

Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread());
}
};

queue.add(runnable);

ThreadPoolExecutor executor = new ThreadPoolExecutor(1,5,60,TimeUnit.SECONDS,queue);

executor.prestartCoreThread();//注释掉该行上面的runnable不会执行
executor.shutdown();

创建新线程

线程池通过参数中传入的ThreadFactory创建新线程,默认为Executors.defaultThreadFactory(),使用自定义ThreadFactory可以为线程指定有意义的名称,方便出错时定位,还可以设置线程优先级,设置为守护线程等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();

AtomicInteger i = new AtomicInteger(1);

//自定义ThreadFactory
ThreadFactory threadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(Thread.currentThread().getThreadGroup(),r,"my_thread_" + i.getAndIncrement(),0);
return thread;
}
};

ThreadPoolExecutor executor = new ThreadPoolExecutor(1,5,5,
TimeUnit.SECONDS,queue,threadFactory);

executor.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
});

executor.shutdown();

阻塞队列

BlockingQueue用来存放提交的任务,根据任务数量的不同分为以下三种使用情况:

  • 如果运行的线程数少于corePoolSize,executor会优先创建新的线程,而非加入队列
  • 若运行的线程数大于corePoolSize,executor会优先将任务加入队列
  • 如果队列已满,则会创建新的线程,若线程数量大于maximumPoolSize,该任务会被拒绝

拒绝策略

如果Executor被关闭或者队列已满且线程数量已达maximumPoolSize,则通过execute()提交的任务会被拒绝。发生上述情况会调用RejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor)。预定义的处理策略有如下四种:

  • ThreadPoolExecutor.AbortPolicy:默认值,抛出一个运行时异常RejectedExecutionException
  • ThreadPoolExecutor.CallerRunsPolicy: 直接在调用 execute 方法的线程执行该任务
  • ThreadPoolExecutor.DiscardPolicy:任务无法执行直接丢弃
  • ThreadPoolExecutor.DiscardOldestPolicy:若 executor 没有关闭,队列前面的任务会被丢弃,然后重试(如果失败重复上步)