原创

ExecutorService线程池的创建、钩子、销毁等相关操作

创建线程池

执行说明

图解

file

步骤

  • 1、创建线程池对线时候不会创建线程,当有第一个线程任务时,才会创建线程。
  • 2、如果线程数少于核心线程数时,当有新的线程来,不管之前的线程是否空闲,都会继续创建新的线程,直到达到核心线程数。
  • 3、当线程数等于核心线程数时,当有新线程来,先放入任务队列中等待核心线程执行完毕之后取出任务队列中任务执行,直到线程队列达到最大容量。
  • 4、当线程数大于队列最大容量时候,会创建非核心线程来执行任务,直到达到最大线程数(maximumPoolSize)。
  • 5、当线程数等于最大线程数时,触发线程池拒绝策略(RejectedExecutionHandler)。

例子

corePoolSize为5,maximumPoolSize为10,workQueue的size为100。
当请求来时,最多创建5个核心线程来执行任务,剩下的被添加到任务队列,当队列慢了后会创建非核心线程执行任务直到10个,再来任务就拒绝。

构造函数

ExecutorService executorService = new ThreadPoolExecutor(
        int corePoolSize,
        int maximumPoolSize,
        long keepAliveTime,
        TimeUnit unit,
        BlockingQueue<Runnable> workQueue,
        ThreadFactory threadFactory,
        RejectedExecutionHandler handler);

参数说明:

  • corePoolSize:核心线程数。一旦创建,就不会被释放销毁。
  • maximumPoolSize:最大线程数。线程池允许创建的最大线程数量。
  • keepAliveTime:线程空闲时间。达到这个时间非核心线程将会被回收销毁。(allowCoreThreadTimeOut修改这个属性为ture,核心线程也会被回收)
  • unit:线程空闲时间单位。
  • workQueue:任务队列。当任务对大于核心线程时候,就会被放到这个队列中等待。
  • threadFactory:线程工厂。创建新线程时,通过线程工厂来创建。
  • handler:线程池拒绝策略。

workQueue任务队列

SynchronousQueue:同步阻塞单一队列

该队列put之后会阻塞当前线程,阻塞到等有别的线程来拿走。同步队列没有任何内部容量。不要使用add,因为这个队列内部没有任何容量,所以会抛出异常“IllegalStateException”。翻译一下:这是一个内部没有任何容量的阻塞队列,任何一次插入操作的元素都要等待相对的删除/读取操作,否则进行插入操作的线程就要一直等待,反之亦然。

不需要队列存储的时候就会使用这种,比如Executors.newCachedThreadPool()。

示例:

package com.weilai;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;

/**
 * @author weilai
 * @email 352342845@qq.com
 * @date 2019-12-02 10:53
 */
public class ThreadLocalTest {

    public static void main(String[] args) {
        SynchronousQueue<String> queue = new SynchronousQueue<>();
        // 不要使用add,因为这个队列内部没有任何容量,所以会抛出异常“IllegalStateException”
        // queue.add(new Object());
        ExecutorService service = Executors.newSingleThreadExecutor();
        service.submit( () -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            String poll = queue.poll();
            System.out.println(poll);
        });
        try {
            System.out.println(1);
            // 操作线程会在这里被阻塞,直到有其他操作线程取走这个对象
            queue.put("hello");
            System.out.println(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

运行结果:输出1,3秒后输出hello、2

1
hello
2

ArrayBlockingQueue:数组实现的有界阻塞队列

  • 该队列put到队列的capacity容量之后阻塞当前线程,阻塞到等有别的线程来拿走(take)时。
  • fair为true是采用FIFO(先进先出)方式,fair为false时采用非公平锁。

示例:

package com.weilai;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author weilai
 * @email 352342845@qq.com
 * @date 2019-12-02 10:53
 */
public class ThreadLocalTest {

    public static void main(String[] args) {
        ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(2, true);
        // 不要使用add,因为这个队列内部没有任何容量,所以会抛出异常“IllegalStateException”
        // queue.add(new Object());
        ExecutorService service = Executors.newSingleThreadExecutor();
        service.submit( () -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            String poll = queue.poll();
            System.out.println(poll);
            poll = queue.poll();
            System.out.println(poll);
            poll = queue.poll();
            System.out.println(poll);
        });
        try {
            System.out.println(1);
            queue.put("hello1");
            queue.put("hello2");
            System.out.println(2);
            // 操作线程会在这里被阻塞,直到有其他操作线程取走这个对象
            queue.put("hello3");
            System.out.println(3);
            queue.put("hello4");
            System.out.println(4);

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

执行结果:先输出1、2,然后put(hello3)阻塞,取出hello1时队列数量小与等于capacity,唤醒当前线程输出3,并放入hello4阻塞当前线程,子线程输出hello2、hello3,队列数量小与等于capacity,唤醒当前线程输出4。

1
2
hello1
3
hello2
hello3
4

LinkedBlockingQueue:链表实现的无界阻塞队列

由于是无界限的队列,所以核心线程数就是这个线程池可创建的最大线程数量。

LinkedBlockingDeque:链表实现的无界双向阻塞队列

与上面区别就是他是个双向列表。

PriorityBlockingQueue:按优先级排列好的有届阻塞队列。

队列中的元素必须实现Comparable接口,通过compareTo方法实现比较。此队列指保证优先级最高的在队列头部,并不保证相同元素的位置,也不保证除了最高优先级之外元素的排序。

LinkedTransferQueue:链表实现的生产/消费模式队列

具有一般队列的操作特性外(先进先出),还具有一个阻塞特性:LinkedTransferQueue可以由一对生产者/消费者线程进行操作,当生产者将一个新的元素插入队列后,生产者线程将会一直等待,直到某一个消费者线程将这个元素取走,反之亦然。

DelayedWokQueue:延时队列

Executors.newScheduledThreadPool()就只用这种队列来实现。

handler线程池拒绝策略

触发情况:

  • 线程池被终止时,有新任务投递会触发拒绝策略。
  • 线程池使用有界限队列最大线程数量达到最大maximumPoolSize时,有新任务投递会触发拒绝策略。

ThreadPoolExecutor.AbortPolicy

直接抛出异常。

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() +
                                         " rejected from " +
                                         e.toString());
}

ThreadPoolExecutor.CallerRunsPolicy

当线程池未关闭时,在调用者的线程中执行该任务。

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        r.run();
    }
}

ThreadPoolExecutor.DiscardPolicy

拒绝处理,直接丢弃该任务。

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}

ThreadPoolExecutor.DiscardOldestPolicy

当线程池未关闭时,直接丢弃最老的任务,也就是最前面存在时间最长的那个。

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        e.getQueue().poll();
        e.execute(r);
    }
}

常见特点说明

  • 1、corePoolSize与maximumPoolSize相等,可以固定线程池的大小。
  • 2、maximumPoolSize值越来,理论上可以处理的并发越多,因为可以创建很多非核心线程。
  • 3、如果使用无界队列(LinkedBlockQueue),线程池中线程数就不会超过corePoolSize。

线程池中线程数量建议

  • CPU密集型(经常复杂计算):CPU核心数量的1~2倍。
  • IO耗时型:可以大于CPU核心数很多倍。可以以JVM线程监控显示繁忙情况为依据。
  • 参考线程数=CPU核心数 * (1 + 平均等待时间 / 平均工作时间)。
  • 压测。

销毁线程池

通过创建线程池返回的ExecutorService来操作

void shutdown();

该方法不会立刻停止线程池,会等待线程中的任务和队列中的任务执行完毕后销毁,调用之后会拒绝新任务的投递

List shutdownNow();

该方法会立刻停止线程池,调用之后会拒绝新任务的投递,返回未执行的任务列表。线程任务会抛出InterruptedException异常

boolean isShutdown();

该方法返回当前线程池状态是否是停止状态(调用过shutdown)

boolean isTerminated();

该方法返回当前线程是否是完全执行完状态

boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

等待一定时间后检测线程池是否是完全执行完状态

钩子

重写ThreadPoolExecutor的beforeExecute、afterExecute、terminated等方法,可以实现线程执行前后的相关操作。

实现可暂停的线程池

import java.util.concurrent.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author weilai
 * @email 352342845@qq.com
 * @date 2020/3/16 6:31 下午
 */
public class PauseThreadPollExecutor extends ThreadPoolExecutor {

    private ReentrantLock lock = new ReentrantLock();

    private Condition condition = lock.newCondition();

    private boolean isPaused;


    public PauseThreadPollExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public PauseThreadPollExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public PauseThreadPollExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public PauseThreadPollExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        lock.lock();
        try {
            while (isPaused) {
                try {
                    condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        } finally {
            lock.unlock();
        }
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        System.out.println("执行完成");
    }

    public void pause() {
        lock.lock();
        try {
            isPaused = true;
        } finally {
            lock.unlock();
        }
    }

    public void resume() {
        lock.lock();
        try {
            isPaused = false;
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        PauseThreadPollExecutor executor = new PauseThreadPollExecutor(5, 5, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
        Runnable runnable = () -> {
            System.out.println("执行");
            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        for (int i = 0; i < 1000; i++) {
            executor.execute(runnable);
        }
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("暂停");
        executor.pause();
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("恢复");
        executor.resume();
    }
}
正文到此结束
本文目录