本文通过jdk的线程池构建一个快速消费线程池,并且采用动态代理扩展拒绝策略,使用构建者模式创建线程池。参考自如何解决JDK线程池中不超过最大线程数下快速消费任务open in new window

JDK线程池

jdk自带的线程池相关的知识参考jdk线程池

快速消费线程池

快速消费线程池,顾名思义,即在任务到来时,需要尽可能的提前消费。也就是在核心线程都处于工作时,不去将任务放入队列中,而是创建非核心线程,当所有的线程都在工作时,才加入任务队列。

要想实现快速消费线程池,就要从任务队列的offer方法入手,因为在JDK线程池ThreadPoolExecutor中的execute方法中,如果核心线程都在工作时,会先去检查任务队列是否满了,如果满了才去创建非核心线程。

我们就要从这里入手,我们设计的任务队列在核心线程满了的时候就返回false,使得线程池直接创建非核心线程。如果线程全部分配完了,我们再执行真正的offer操作。

任务队列核心代码:

public boolean offer(Runnable runnable) {
    int currentPoolThreadSize = executor.getPoolSize();
    // 如果有核心线程正在空闲,将任务加入阻塞队列,由核心线程进行处理任务
    if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
        return super.offer(runnable);
    }
    // 当前线程池线程数量小于最大线程数,返回 False,根据线程池源码,会创建非核心线程
    if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
        return false;
    }
    // 如果当前线程池数量大于最大线程数,任务加入阻塞队列
    return super.offer(runnable);
}

在这里需要线程池维护当前正在工作的线程数,采用原子整数防止线程安全问题:

线程池核心代码:

private final AtomicInteger submittedTaskCount = new AtomicInteger(0);
public int getSubmittedTaskCount() {
    return submittedTaskCount.get();
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
    submittedTaskCount.decrementAndGet();
}
 @Override
public void execute(Runnable command) {
    submittedTaskCount.incrementAndGet();
    try {
        super.execute(command);
    }catch (RejectedExecutionException rex) {
        // 如果触发了拒绝策略,在执行拒绝策略之前,再尝试将任务放入队列一次
        TaskQueue queue = (TaskQueue) getQueue();
        try {
            if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
                submittedTaskCount.decrementAndGet();
                throw new RejectedExecutionException("Queue capacity is full.", rex);
            }
        } catch (InterruptedException iex) {
            submittedTaskCount.decrementAndGet();
            throw new RejectedExecutionException(iex);
        }
    }
    catch (Exception ex) {
        submittedTaskCount.decrementAndGet();
        throw ex;
    }
}

拒绝策略

在这里可以通过动态代理的思想,对现有的拒绝策略进行扩展,十分的方便。

首先要创建我们的代理类,继承自InvocationHandler,重写其中的invoke方法,在其中可以维护一些其他的信息,比如触发拒绝策略的次数:

@Slf4j
@AllArgsConstructor
public class RejectedProxyInvocationHandler implements InvocationHandler {

    /**
     * Target object
     */
    private final Object target;

    /**
     * Reject count
     */
    private final AtomicLong rejectCount;

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        rejectCount.incrementAndGet();
        try {
            log.error("线程池执行拒绝策略, 此处模拟报警...");
            return method.invoke(target, args);
        } catch (InvocationTargetException ex) {
            throw ex.getCause();
        }
    }
}

代理类创建入口:

public static RejectedExecutionHandler createProxy(RejectedExecutionHandler rejectedExecutionHandler, AtomicLong rejectedNum) {
    // 动态代理模式: 增强线程池拒绝策略,比如:拒绝任务报警或加入延迟队列重复放入等逻辑
    return (RejectedExecutionHandler) Proxy
        .newProxyInstance(
        rejectedExecutionHandler.getClass().getClassLoader(),
        new Class[]{RejectedExecutionHandler.class},
        new RejectedProxyInvocationHandler(rejectedExecutionHandler, rejectedNum));
}

构建者模式

构建者模式可以很方便的帮助调用方设置参数,尤其是存在大量参数且部分参数是可选的情况。

以线程工厂举例,基于默认线程工厂,加入自定义的参数,大部分的参数都是可选的。

  • 首先创建builder静态方法,返回构建器。
  • 设置一些可选参数,创建方法,并对其赋值(可以做一些参数校验之类的),并把对象返回。
  • 创建build方法,创建实际工厂。

同样线程池也可以通过构建者创建。

测试

最后,测试一下上面的代码:

public static void main(String[] args) {
    // 通过构建者创建线程工厂
    ThreadFactory factory = ThreadFactoryBuilder.builder()
        .prefix("zzys-")
        .build();
    // 通过构建者创建快速消费线程池
    EagerThreadPoolExecutor threadPoolExecutor = EagerThreadPoolBuilder.builder()
        .corePoolSize(1)
        .maximumPoolSize(3)
        .keepAliveTime(1024,TimeUnit.SECONDS)
        // 注意要使用自定义的任务队列
        .workQueue(new TaskQueue<>(1))
        .threadFactory(factory)
        .build();
    // 采用最普通的拒绝策略
    ThreadPoolExecutor.AbortPolicy abortPolicy = new ThreadPoolExecutor.AbortPolicy();
    AtomicLong rejectedNum = new AtomicLong();
    // 创建拒绝策略代理类, 代理普通的抛出异常策略
    RejectedExecutionHandler proxyRejectedExecutionHandler = RejectedProxyUtil.createProxy(abortPolicy, rejectedNum);
    threadPoolExecutor.setRejectedExecutionHandler(proxyRejectedExecutionHandler);

    for (int i = 0; i < 5; i++) {
        try {
            threadPoolExecutor.execute(() -> ThreadUtil.sleep(100000L));
            log.info("线程池中线程数目:{},队列中等待执行的任务数目:{}",
                     threadPoolExecutor.getPoolSize(), threadPoolExecutor.getQueue().size());
        } catch (Exception ignored) {
            ignored.printStackTrace();
        }
    }
    System.out.println("================ 线程池拒绝策略执行次数: " + rejectedNum.get());
}

//14:17:13.456 [main] INFO com.zzys.railway.framework.starter.common.threadpool.proxy.RejectedProxyUtil -- 线程池中线程数目:1,队列中等待执行的任务数目:0
//14:17:13.462 [main] INFO com.zzys.railway.framework.starter.common.threadpool.proxy.RejectedProxyUtil -- 线程池中线程数目:2,队列中等待执行的任务数目:0
//14:17:13.462 [main] INFO com.zzys.railway.framework.starter.common.threadpool.proxy.RejectedProxyUtil -- 线程池中线程数目:3,队列中等待执行的任务数目:0
//14:17:13.462 [main] INFO com.zzys.railway.framework.starter.common.threadpool.proxy.RejectedProxyUtil -- 线程池中线程数目:3,队列中等待执行的任务数目:1
//14:17:13.462 [main] ERROR //com.zzys.railway.framework.starter.common.threadpool.proxy.RejectedProxyInvocationHandler -- 线程池执行拒绝策略, 此处模拟报警...
//================ 线程池拒绝策略执行次数: 1
//java.util.concurrent.RejectedExecutionException: Queue capacity is full.
//...
//...

上次更新:
Contributors: YangZhang