Java线程池

在项目中为了提高系统性能,并发处理任务,减少线程创建的高开销,一般会用到线程池,java线程池构造如下:

通过ThreadPoolExecutor创建

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

参数含义

  • corePoolSize: 线程池核心线程数最大值
  • maximumPoolSize: 线程池最大线程数大小
  • keepAliveTime: 线程池中非核心线程空闲的存活时间大小
  • unit: 线程空闲存活时间单位
  • workQueue: 存放任务的阻塞队列
  • threadFactory: 用于设置创建线程的工厂,可以给创建的线程设置有意义的名字,可方便排查问题。
  • handler: 线城池的饱和策略事件,主要有四种类型。

运行流程

  • 提交一个任务,线程池里存活的核心线程数小于线程数corePoolSize时,线程池会创建一个核心线程去处理提交的任务。
  • 如果线程池核心线程数已满,即线程数已经等于corePoolSize,一个新提交的任务,会被放进任务队列workQueue排队等待执行。
  • 当线程池里面存活的线程数已经等于corePoolSize了,并且任务队列workQueue也满,判断线程数是否达到maximumPoolSize,即最大线程数是否已满,如果没到达,创建一个非核心线程执行提交的任务。
  • 如果当前的线程数达到了maximumPoolSize,还有新的任务过来的话,直接采用拒绝策略处理。

工作队列

  • ArrayBlockingQueue:ArrayBlockingQueue(有界队列)是一个用数组实现的有界阻塞队列,按FIFO排序量。插入和删除数据,只采用了一个 lock,并发效率略低于LinkedBlockingQueue。

  • LinkedBlockingQueue:LinkedBlockingQueue(可设置容量队列)基于链表结构的阻塞队列,按FIFO排序任务,容量可以选择进行设置,不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE,插入和删除分别采用了putLock和takeLock,这样可以降低线程由于无法获取到 lock 而进入 WAITING 状态的可能性,从而提高了线程并发执行的效率。吞吐量通常要高于ArrayBlockingQuene;newFixedThreadPool线程池使用了这个队列。

  • DelayQueue:DelayQueue(延迟队列)是一个任务定时周期的延迟执行的队列。根据指定的执行时间从小到大排序,否则根据插入到队列的先后排序。newScheduledThreadPool线程池使用了这个队列

  • PriorityBlockingQueue:PriorityBlockingQueue(优先级队列)是具有优先级的无界阻塞队列;

  • SynchronousQueue:SynchronousQueue(同步队列)一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,newCachedThreadPool线程池使用了这个队列。

拒绝策略

  • AbortPolicy:中止策略,线程池默认的拒绝策略,也是我们最常用的拒绝策略。当线程池满载的时候,丢弃任务,并抛出RejectedExecutionException 异常信息。

  • DiscardPolicy:丢弃策略,一般我们都不会选择它,因为它直接就把任务丢弃掉了,我们毫无感知。如果任务不重要,丢弃掉也没有没关系,就可以使用它。

  • DiscardOldestPolicy:丢弃阻塞队列 workQueue 中最老的一个任务,并将新任务加入

  • CallerRunsPolicy:交给线程池调用所在的线程进行处理。

通过Executors创建

  • newFixedThreadPool:固定数目线程的线程池,核心线程数和最大线程数大小一样,没有所谓的非空闲时间,即keepAliveTime为0,阻塞队列为无界队列LinkedBlockingQueue。newFixedThreadPool使用了无界的阻塞队列LinkedBlockingQueue,如果线程获取一个任务后,任务的执行时间比较长(比如,上面demo设置了10秒),会导致队列的任务越积越多,导致机器内存使用不停飙升, 最终导致OOM。FixedThreadPool 适用于处理CPU密集型的任务,确保CPU在长期被工作线程使用的情况下,尽可能的少的分配线程,即适用执行长期的任务。

  • newCachedThreadPool:可缓存线程的线程池,核心线程数为0,最大线程数为Integer.MAX_VALUE,阻塞队列是SynchronousQueue,非核心线程空闲存活时间为60秒。当提交任务的速度大于处理任务的速度时,每次提交一个任务,就必然会创建一个线程。极端情况下会创建过多的线程,耗尽 CPU 和内存资源。由于空闲 60 秒的线程会被终止,长时间保持空闲的 CachedThreadPool 不会占用任何资源。

  • newSingleThreadExecutor:单线程的线程池,核心线程数为1,最大线程数也为1,阻塞队列是LinkedBlockingQueue,keepAliveTime为0,适用于串行执行任务的场景,一个任务一个任务地执行。

  • newScheduledThreadPool:定时及周期执行的线程池,最大线程数为Integer.MAX_VALUE,阻塞队列是DelayedWorkQueue,keepAliveTime为0,scheduleAtFixedRate() :按某种速率周期执行,scheduleWithFixedDelay():在某个延迟后执行。适用于周期性执行任务的场景,需要限制线程数量的场景。

  • newWorkStealingPool(java8新增):基于ForkJoinPool,使用所有可用处理器作为目标并行度,创建一个窃取线程的池。如果一个线程完成了工作并且无事可做,则可以从另一线程的队列中”窃取”工作,适合于大型计算递归任务。