# 线程池简单说明

线程池其实就是一种对象池,用来管理线程资源

在执行任务之前,需要把线程从线程池拿出来

在任内结束后,需要把线程放回线程池里去

通过这种方式,可以有效避免直接创建线程所带来的问题

线程池主要的流程图:

流程图


使用线程池可以带来的好处:

  • 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
  • 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
  • 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
  • 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池 ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。

# 构造方法

ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

ThreadPoolExecutor 有 4 个构造方法,参数说明:

  • corePoolSize(必须):线程池核心线程数量
  • maximumPoolSize(必须):线程池最大线程数量。
  • keepAliveTime(必须):当活跃线程数大于核心线程数时,空闲的多余线程最大存活时间
  • unit(必须):keepAliveTime 存活时间的单位
  • workQueue(必须):任务队列。
  • threadFactory(可选):线程工厂。指定线程池创建线程的方式。
  • handler(可选):拒绝策略,超出线程范围和队列容量的任务的处理程序。

  • corePool : 核心线程池
  • maximumPool : 线程池
  • BlockQueue :队列
  • RejectedExecutionHandler : 拒绝策略
    • AbortPolicy(默认):直接抛出 RejectedExecutionException
    • CallerRunsPolicy :在当前线程中执行
    • DiscardPolicy:直接丢弃线程
    • DiscardOldestPolicy:丢弃一个未被处理的最久的线程,然后重试

# 任务队列

ThreadPoolExecutor 线程池的 API 文档中,推荐了三种等待队列: SynchronousQueueLinkedBlockingQueueArrayBlockingQueue

  1. SynchronousQueue(默认):同步队列,无内部容量的阻塞队列,在执行入队时,只有遇上其他线程出队才能成功入队,否则则进行等待。
  2. LinkedBlockingQueue:无界队列(其实有上限,上限为 Integer.MAX_VALUE ,可以指定大小,未指定则默认为上限),一个单向链表实现的阻塞队列。该队列按 FIFO(先进先出)排序,这种队列可以提高线程池吞吐量,但代价是牺牲内存空间,甚至会导致内存溢出。
  3. ArrayBlockingQueue:有界队列,基于数组实现。在线程池初始化时,指定队列的容量,后续无法再调整。这种有界队列有利于防止资源耗尽,但可能更难调整和控制。

Java 还有另外的队列:

  1. PriorityBlockingQueue优先级无界阻塞队列 ,存放在此队列的元素必须实现 Comparable 接口,这样才能使用 compareTo () 进行排序。优先级最高的元素总是在队头。
  2. DelayQueue:延迟队列,基于二叉堆实现,同时具备:无界队列、阻塞队列、优先队列的特征。存放的对象,必须是实现 Delayed 接口的类对象。
  3. LinkedBlockingDeque:双端队列。基于链表实现,既可以从尾部插入 / 取出元素,还可以从头部插入元素 / 取出元素。
  4. LinkedTransferQueue:由链表结构组成的无界阻塞队列。

# 线程工厂

线程工厂是指定创建线程的方式。

Executors 提供了一个默认的线程工厂

DefaultThreadFactory
static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;
    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }
    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

在大部分的时候,我们只需要定义线程名;也可以根据不同的需求自己去实现 ThreadFactory () 接口。

示例:

最简单的线程工厂
public class SimpleThreadFactory implements ThreadFactory{
    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r);
    }
}
// 只是创建了一个新线程,其他什么都没干。一般不会这么简单

# execute()

ThreadPoolExecutor的execute()方法
public void execute(Runnable command) {
        // 任务为空,抛出 NPE
        if (command == null)
            throw new NullPointerException();
        // 获取当前工作线程数和线程池运行状态(共 32 位,前 3 位为运行状态,后 29 位为运行线程数)
        int c = ctl.get();
        // 如果当前工作线程数小于核心线程数
        if (workerCountOf(c) < corePoolSize) {
            // 如果当前工作线程数小于核心线程数
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 如果当前线程池状态为 RUNNING,并且任务成功添加到阻塞队列
        if (isRunning(c) && workQueue.offer(command)) {
            // 双重检查,因为从上次检查到进入此方法,线程池可能已成为 SHUTDOWN 状态
            int recheck = ctl.get();
            // 如果当前线程池状态不是 RUNNING 则从队列删除任务
            if (! isRunning(recheck) && remove(command))
                // 执行拒绝策略
                reject(command);
            // 当线程池中的 workerCount 为 0 时,此时 workQueue 中还有待执行的任务,则新增一个 addWorker,消费 workqueue 中的任务
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 阻塞队列已满才会走下面的逻辑
        // 尝试增加工作线程执行 command
        else if (!addWorker(command, false))
            // 如果当前线程池为 SHUTDOWN 状态或者线程池已饱和
       		reject(command); // 执行拒绝策略
            
    }

addWorker()方法
private boolean addWorker(Runnable firstTask, boolean core) {
    retry: // 循环退出标志位
    for (;;) { // 无限循环
        int c = ctl.get();
        int rs = runStateOf(c); // 线程池状态
        // 仅在必要时检查队列是否为空.
        if (rs >= SHUTDOWN && 
            ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()) 
            // 等价于 (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty ())
           )
           // 返回 false 的条件就可以分解为:
           //(1)线程池状态为 STOP,TIDYING,TERMINATED
           //(2)线程池状态为 SHUTDOWN,且要执行的任务不为空
           //(3)线程池状态为 SHUTDOWN,且任务队列为空
            return false;
        //cas 自旋增加线程个数
        for (;;) {
            int wc = workerCountOf(c); // 当前工作线程数
            // 工作线程数 >= 线程池容量 || 工作线程数 >=(核心线程数 || 最大线程数)
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize)) 
                return false;
            if (compareAndIncrementWorkerCount(c)) // 执行 cas 操作,添加线程个数
                break retry; // 添加成功,退出外层循环
            // 通过 cas 添加失败
            c = ctl.get();  
            // 线程池状态是否变化,变化则跳到外层循环重试重新获取线程池状态,否者内层循环重新 cas
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    // 简单总结上面的 CAS 过程:
    //(1)内层循环作用是使用 cas 增加线程个数,如果线程个数超限则返回 false,否者进行 cas
    //(2)cas 成功则退出双循环,否者 cas 失败了,要看当前线程池的状态是否变化了
    //(3)如果变了,则重新进入外层循环重新获取线程池状态,否者重新进入内层循环继续进行 cas
    // 走到这里说明 cas 成功,线程数 + 1,但并未被执行
    boolean workerStarted = false; // 工作线程调用 start () 方法标志
    boolean workerAdded = false; // 工作线程被添加标志
    Worker w = null;
    try {
        w = new Worker(firstTask); // 创建工作线程实例
        final Thread t = w.thread; // 获取工作线程持有的线程实例
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock; // 使用全局可重入锁
            mainLock.lock(); // 加锁,控制并发
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get()); // 获取当前线程池状态
                // 线程池状态为 RUNNING 或者(线程池状态为 SHUTDOWN 并且没有新任务时)
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // 检查线程是否处于活跃状态
                        throw new IllegalThreadStateException();
                    workers.add(w); // 线程加入到存放工作线程的 HashSet 容器,workers 全局唯一并被 mainLock 持有
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock(); //finally 块中释放锁
            }
            if (workerAdded) { // 线程添加成功
                t.start(); // 调用线程的 start () 方法
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted) // 如果线程启动失败,则执行 addWorkerFailed 方法
            addWorkerFailed(w);
    }
    return workerStarted;
}

addWorkerFailed()
private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (w != null)
            workers.remove(w); // 线程启动失败时,需将前面添加的线程删除
        decrementWorkerCount(); //ctl 变量中的工作线程数 - 1
        tryTerminate(); // 尝试将线程池转变成 TERMINATE 状态
    } finally {
        mainLock.unlock();
    }
}

tryTerminate()
final void tryTerminate() {
    for (;;) {// 无限循环
        int c = ctl.get();
        // 以下情况不会进入 TERMINATED 状态:
        //(1)当前线程池为 RUNNING 状态
        //(2)在 TIDYING 及以上状态
        //(3)SHUTDOWN 状态并且工作队列不为空
        //(4)当前活跃线程数不等于 0
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        if (workerCountOf(c) != 0) { // 工作线程数!=0
            interruptIdleWorkers(ONLY_ONE); // 中断一个正在等待任务的线程
            return;
        }
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 通过 CAS 自旋判断直到当前线程池运行状态为 TIDYING 并且活跃线程数为 0
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated(); // 调用线程 terminated ()
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0)); // 设置线程池状态为 TERMINATED,工作线程数为 0
                    termination.signalAll(); // 通过调用 Condition 接口的 signalAll () 唤醒所有等待的线程
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

# 线程池中的任务去向说明

向线程池提交任务时由线程池的状态线程池中的线程数来决定任务的去向。

ThreadPoolExecutor 内部使用了一个 AtomicInteger 类型的整数 ctl 来表示这两个参数。

线程池的状态
volatile int runState;
// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

runState 表示当前线程池的状态,是一个 volatile 变量 ,用来保证线程之间的可见性。

  • RUNNING:创建线程池后,初始时线程池处于 RUNNING 状态
  • SHUTDOWN:调用 shutdown () 方法后,线程池处于 SHUTDOWN 状态,此时线程池不能接受新的任务,要等待所有的任务执行完毕。
  • STOP:调用 shutdownNow () 方法,则线程池处于 STOP 状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务。
  • TERMINATED:当线程池处于 SHUTDOWN STOP 状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为 TERMINATED 状态。


ctl
public class ThreadPoolExecutor extends AbstractExecutorService {
    // Integer.SIZE = 32. 所以 COUNT_BITS= 29
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 1 右移 29 位然后减 1
    // 00011111 11111111 11111111 11111111 这个值可以表示线程池的最大线程容量
    private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
    // 将 - 1 左移 29 位得到 RUNNING 状态的值
    private static final int RUNNING    = -1 << COUNT_BITS;    
    // 线程池运行状态和线程数
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    
    private static int ctlOf(int rs, int wc) { return rs | wc; }
}

# 线程池的使用

使用线程池
import java.util.concurrent.*;
public class ThreadPool_test{
    public static void main(String[] args) {
		// 创建线程池,参数:核心线程数,最大线程数,存活时间数,存活时间数的单位,任务队列
		ThreadPoolExecutor threadPool = new ThreadPoolExecutor(4, 5, 5, TimeUnit.SECONDS,
				new ArrayBlockingQueue<Runnable>(5));
		// 向线程池提交任务
		for (int i = 0; i < threadPool.getCorePoolSize(); i++) {
			threadPool.execute(new Runnable() {
				@Override
				public void run() {
					for (int x = 0; x < 2; x++) {
						System.out.println(Thread.currentThread().getName() + ":" + x);
						try {
							Thread.sleep(5000);
						} catch (InterruptedException e) {
							e.printStackTrace();
						}
					}
				}
			});
		}
		// 关闭线程池
		threadPool.shutdown(); // 设置线程池的状态为 SHUTDOWN,然后中断所有没有正在执行任务的线程
		//threadPool.shutdownNow (); // 设置线程池的状态为 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,该方法要慎用,容易造成不可控的后果
	}
}

输出结果
pool-1-thread-1:0
pool-1-thread-4:0
pool-1-thread-3:0
pool-1-thread-2:0
pool-1-thread-4:1
pool-1-thread-1:1
pool-1-thread-3:1
pool-1-thread-2:1

# Executors 封装好了的功能线程池

封装好的线程池
// 创建单一线程的线程池
public static ExecutorService newSingleThreadExecutor();
// 创建固定数量的线程池
public static ExecutorService newFixedThreadPool(int nThreads);
// 创建带缓存的线程池
public static ExecutorService newCachedThreadPool();
// 创建定时调度的线程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize);
// 创建工作窃取线程池
public static ExecutorService newWorkStealingPool();

# 单一线程池 SingleThreadExecutor

特点是线程池中只有一个线程(核心线程),线程执行完任务立即回收,使用有界阻塞队列(容量未指定,使用默认值 Integer.MAX_VALUE ,当线程空闲的时候,按照 FIFO 的方式进行处理。

// 使用的都是默认的线程工厂,当然可以制定线程工厂并加入参数中
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

使用示例
// 1. 创建单线程线程池
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
// 2. 创建 Runnable(任务)
Runnable task = new Runnable(){
  public void run() {
     System.out.println(Thread.currentThread().getName() + "--->运行");
  }
};
// 3. 向线程池提交任务
singleThreadExecutor.execute(task);

# 固定容量线程池 FixedThreadPool

特点是最大线程数就是核心线程数,意味着线程池只能创建核心线程keepAliveTime 为 0,即线程执行完任务立即回收。任务队列未指定容量,代表使用默认值 Integer.MAX_VALUE 。多个任务被提交到此,处理过程:

  1. 当线程数量小于指定线程数时,则创建线程来执行任务
  2. 当线程数等于指定的线程数时,并且有线程是空闲的,则取出空闲线程执行任务
  3. 如果没有线程是空闲的,则将任务缓存到队列(队列长度为 Integer.MAX_VALUE )。当线程空闲的时候,按照 FIFO 的方式进行处理
FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

示例
// 1. 创建线程池对象,设置核心线程和最大线程数为 5
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
// 2. 创建 Runnable(任务)
Runnable task =new Runnable(){
  public void run() {
     System.out.println(Thread.currentThread().getName() + "--->运行");
  }
};
// 3. 向线程池提交任务
fixedThreadPool.execute(task);

# 带缓存的线程池 CachedThreadPool

没有核心线程,普通线程数量为 Integer.MAX_VALUE ,任务队列使用 SynchronousQueue 这种无容量的同步队列,

CachedThreadPool
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

使用示例
// 1. 创建缓存线程池
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
// 2. 创建 Runnable(任务)
Runnable task = new Runnable(){
  public void run() {
     System.out.println(Thread.currentThread().getName() + "--->运行");
  }
};
// 3. 向线程池提交任务
cachedThreadPool.execute(task);

# 定时调度线程池 ScheduledThreadPool

指定核心线程数量,普通线程数量 Integer.MAX_VALUE ,线程执行完任务立即回收,任务队列为延时阻塞队列。这是一个比较特别的线程池,它返回的是 ScheduledThreadPoolExecutor 类型的线程池。

ScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
// 继承了 ThreadPoolExecutor
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor
        implements ScheduledExecutorService {
	public ScheduledThreadPoolExecutor(int corePoolSize) {
    	super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          	new DelayedWorkQueue());
	}
	
	// 延时执行任务
	public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        ...
    }
	// 定时执行任务
	public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {...}
}

使用示例
// 1. 创建定时线程池
ExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
// 2. 创建 Runnable(任务)
Runnable task = new Runnable(){
  public void run() {
     System.out.println(Thread.currentThread().getName() + "--->运行");
  }
};
// 3. 向线程池提交任务
scheduledThreadPool.schedule(task, 2, TimeUnit.SECONDS); // 延迟 2s 后执行任务
scheduledThreadPool.scheduleAtFixedRate(task,50,2000,TimeUnit.MILLISECONDS);// 延迟 50ms 后、每隔 2000ms 执行任务

# 工作窃取线程池 WorkStealingPool

通过工作窃取的方式,使得多核的 CPU 不会闲置,总会有活着的线程让 CPU 去运行。

public static ExecutorService newWorkStealingPool() {
    return new ForkJoinPool
        (Runtime.getRuntime().availableProcessors(),
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,
         null, true);
}
public static ExecutorService newWorkStealingPool(int parallelism) {
    return new ForkJoinPool
        (parallelism,
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,
         null, true);
}

# 使用场景

  1. CachedThreadPool:适用于服务器负载较轻,执行很多短期异步任务。
  2. FixedThreadPool:适用于可以预测线程数量的业务中,或者服务器负载较重,对当前线程数量进行限制。
  3. SingleThreadExecutor不会有多个线程是活动的场景。
  4. ScheduledThreadPool:适用于执行定时或周期性的任务
  5. WorkStealingPool:适用于大耗时的操作,可以并行来执行

参考:

云深不知处博客_深入 java 线程池

Java 线程池实现原理及其在美团业务中的实践 - 美团技术团队

Java 线程池的使用 - 简书