第10章-Executor框架

本章介绍了Executor框架的整体结构和成员组件,并以ThreadPoolExecuto、ScheduledThreadPoolExecuto、FutureTask三个核心成员类为例来剖析内部原理。

Java的线程既是工作单元,也是执行机制。从JDK 5开始,把工作单元与执行机制分离开
来。工作单元包括Runnable和Callable,而执行机制由Executor框架提供。

Executor框架简介

Executor框架的两级调度模型

应用程序通过Executor框架控制上层的调度;而下层的调度由操作系统内核控制,下层的调度不受应用程序的控制。
Alt text

Executor框架的结构与成员

  1. Executor框架主要由3大部分组成如下
  • 任务:Runnable接口或Callable接口。
  • 任务的执行:Executor、ExecutorServic
  • 异步计算的结果:FutureTask

Alt text

  1. Executor框架的成员
    本节将介绍Executor框架的主要成员:ThreadPoolExecutor、ScheduledThreadPoolExecutor、Future接口、Runnable接口、Callable接口和Executors。
  • ThreadPoolExecutor
    • FixedThreadPoo
    • SingleThreadExecuto
    • CachedThreadPoo
  • ScheduledThreadPoolExecutor
    • ScheduledThreadPoolExecutor
    • SingleThreadScheduledExecutor
  • Future接口
  • Runnable接口和Callable接口

ThreadPoolExecutor详解

Executor框架最核心的类是ThreadPoolExecutor,它是线程池的实现类,主要由下列4个组件构成。

  • corePool:核心线程池的大小。
  • maximumPool:最大线程池的大小。
  • BlockingQueue:用来暂时保存任务的工作队列。
  • RejectedExecutionHandler:达到了最大线程池大小且工作队列已满,execute()方法将要调用的Handler。
    通过Executor框架的工具类Executors,可以创建3种类型的ThreadPoolExecutor。

FixedThreadPool

FixedThreadPool被称为可重用固定线程数的线程池。下面是FixedThreadPool的源代码实现。

[案例]
1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

SingleThreadExecutor

SingleThreadExecutor是使用单个worker线程的Executor。下面是SingleThreadExecutor的源代码实现。

[案例]
1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

CachedThreadPool

CachedThreadPool是一个会根据需要创建新线程的线程池。下面是创建CachedThreadPool的源代码。

[案例]
1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

ScheduledThreadPoolExecutor详解

ScheduledThreadPoolExecutor继承自ThreadPoolExecutor。它主要用来在给定的延迟之后运行任务,或者定期执行任务。

ScheduledThreadPoolExecutor的运行机制

Alt text

ScheduledThreadPoolExecutor的实现

ScheduledThreadPoolExecutor会把待调度的任务(ScheduledFutureTask)放到一个DelayQueue中。ScheduledFutureTask主要包含3个成员变量,如下。

  • long型成员变量time,表示这个任务将要被执行的具体时间。
  • long型成员变量sequenceNumber,表示这个任务被添加到ScheduledThreadPoolExecutor中的序号。
  • long型成员变量period,表示任务执行的间隔周期。

任务执行步骤:
Alt text

FutureTask详解

Future接口和实现Future接口的FutureTask类,代表异步计算的结果。

FutureTask简介

FutureTask除了实现Future接口外,还实现了Runnable接口。
状态迁移的示意图:
Alt text

执行示意图:
Alt text

FutureTask的使用

[案例]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private final ConcurrentMap<Object, Future<String>> taskCache =
new ConcurrentHashMap<Object, Future<String>>();
private String executionTask(final String taskName)
throws ExecutionException, InterruptedException {
while (true) {
Future<String> future = taskCache.get(taskName);// 1.1,2.1
if (future == null) {
Callable<String> task = new Callable<String>() {
public String call() throws InterruptedException {
return taskName;
}
};
FutureTask<String> futureTask = new FutureTask<String>(task);
future = taskCache.putIfAbsent(taskName, futureTask); // 1.3
if (future == null) {
future = futureTask;
futureTask.run(); // 1.4执行任务
}
}
try {
return future.get();// 1.5,2.2
} catch (CancellationException e) {
taskCache.remove(taskName, future);
}
}
}

代码的执行示意图:
Alt text

FutureTask的实现

FutureTask的实现基于AbstractQueuedSynchronizer(以下简称为AQS)。
设计示意图:
Alt text

级联唤醒示意图:
Alt text