百度360必应搜狗淘宝本站头条
当前位置:网站首页 > IT技术 > 正文

克服 Java 枚举陷阱:线程池队列共享问题的解决之道

wptr33 2025-09-19 03:55 2 浏览

引言

2025年已开始,也很久没有更新文章了。是因为前一段时间一直在忙不同的事情。现在总结前一段时间遇到的一个深坑。今天遇到的坑关于线程池中时使用共享任务队列导致并发问题。

正文

概述

在实际的多线程开发中,合理设计线程池是提升系统性能和任务管理的重要手段。然而,如果线程池的设计不够严谨,可能会引发一些隐晦的问题,尤其是线程池之间的隔离性和任务管理的问题。本文将结合上述代码,深入分析 线程池共享 BlockingQueue 问题的原因、排查过程及解决方案。

背景

在多线程开发中,线程池通常由以下关键元素组成:

  • 核心线程池大小(corePoolSize) 和 最大线程池大小(maxPoolSize)。
  • 任务队列(BlockingQueue):用于存放等待执行的任务。
  • 线程工厂(ThreadFactory):用于生成线程,并为线程命名。
  • 拒绝策略(RejectedExecutionHandler):当任务无法提交到线程池时的处理方式。

正常线程池流程图:



在实际开发中,会通过配置这些参数来适应不同的业务需求。例如,下面是写的一个线程工厂类。

package com.dereksmart.crawling.spring.util;


import com.dereksmart.crawling.util.string.StringUtil;

import org.springframework.scheduling.concurrent.CustomizableThreadFactory;

import java.util.Map;
import java.util.concurrent.*;

/**
 * @Author derek_smart
 * @Date 2025/01/16 7:55
 * @Description 线程工厂
 */
public class ThreadPool {

    private static final ConcurrentHashMap<PoolType, ConcurrentHashMap<String, ThreadPoolExecutor>> POOL_CACHE
            = new ConcurrentHashMap<>();

    public static ThreadPoolExecutor getSingleThreadPool(String key) {
        return getCommonPool(key, PoolType.AUTO_TEST);
    }

    private static ThreadPoolExecutor getCommonPool(String code, PoolType poolType) {
        if (StringUtil.isEmpty(code)) {
            code = "defaultCode";
        }
        // 使用 computeIfAbsent 来确保原子性操作
        ConcurrentHashMap<String, ThreadPoolExecutor> poolMap = POOL_CACHE.computeIfAbsent(poolType, k -> new ConcurrentHashMap<>());

        String finalCode = code;
        return poolMap.computeIfAbsent(code, k -> {
            int corePoolSize = 1; // 如果需要动态获取corePoolSize,可以在这里添加逻辑
            return new ThreadPoolExecutor(corePoolSize, corePoolSize, 0, TimeUnit.SECONDS,
                    poolType.getQueue(),
                    new CustomizableThreadFactory(String.format(poolType.getFactoryNameFormat(), finalCode)),
                    poolType.getRejectedExecutionHandler());
        });
    }

    enum PoolType {
        AUTO_TEST("自动测试", "AUTO-TEST-%s-",
                null, new ThreadPoolExecutor.DiscardOldestPolicy(),
                new LinkedBlockingQueue<>(1)),
        ;

        /**
         * 功能描述
         */
        private String desc;
        /**
         * 线程工厂名称
         */
        private String factoryNameFormat;

        /**
         * 核心线程数,系统参数
         */
        private String poolSize;

        private RejectedExecutionHandler rejectedExecutionHandler;

        private BlockingQueue<Runnable> queue;

        PoolType(String desc, String factoryNameFormat, String poolSize,
                 RejectedExecutionHandler rejectedExecutionHandler, BlockingQueue<Runnable> queue) {
            this.desc = desc;
            this.factoryNameFormat = factoryNameFormat;
            this.poolSize = poolSize;
            this.rejectedExecutionHandler = rejectedExecutionHandler;
            this.queue = queue;
        }

        public String getDesc() {
            return desc;
        }

        public String getFactoryNameFormat() {
            return factoryNameFormat;
        }

        public String getPoolSize() {
            return poolSize;
        }

        public RejectedExecutionHandler getRejectedExecutionHandler() {
            return rejectedExecutionHandler;
        }

        public BlockingQueue<Runnable> getQueue() {
            return queue;
        }
    }
    
}

上述代码本身是想通过单一线程进行根据code类型进行并发控制,防止同一个的code下面同时进行计算,但是在项目运行发现,居然有多个线程同时执行。本想通过AUTO-TEST-*** 通过线程号轻松判断从哪个code 在执行任务,结果打印日志乱套了。debug时候,发现最后获取的线程就是对应code,但是到了submit任务时候打印当前的任务号就是各种各样的。

故此上述代码通过枚举 PoolType 来管理线程池的配置,并为每种类型的线程池指定了特定的队列、工厂和拒绝策略。

问题的核心在于 PoolType 中的 BlockingQueue,它是静态初始化的,并且所有线程池实例都共享同一个队列。这种设计虽然看似简单,但会在多线程环境下引发潜在问题。

共享任务队列流程图:

问题描述

代码片段

以下是问题代码的核心片段:

enum PoolType {
    AUTO_TEST("自动测试", "AUTO-TEST-%s-",
              null, new ThreadPoolExecutor.DiscardOldestPolicy(),
              new LinkedBlockingQueue<>(1)),
    ;

    private String desc;
    private String factoryNameFormat;
    private String poolSize;
    private RejectedExecutionHandler rejectedExecutionHandler;
    private BlockingQueue<Runnable> queue;

    PoolType(String desc, String factoryNameFormat, String poolSize,
             RejectedExecutionHandler rejectedExecutionHandler, BlockingQueue<Runnable> queue) {
        this.desc = desc;
        this.factoryNameFormat = factoryNameFormat;
        this.poolSize = poolSize;
        this.rejectedExecutionHandler = rejectedExecutionHandler;
        this.queue = queue;
    }

    public BlockingQueue<Runnable> getQueue() {
        return queue;
    }
}

在这个代码中:

每个 PoolType 枚举实例(如 AUTO_TEST)在类加载时会被初始化。 BlockingQueue 是在枚举构造时创建的静态实例:

new LinkedBlockingQueue<>(1)

因此,AUTO_TEST 所有线程池实例都共用同一个 BlockingQueue。

Java 的枚举是单例的。每个枚举常量(如 AUTO_TEST)在 JVM 的生命周期中只会被实例化一次。 因此,
PoolType.AUTO_TEST.getQueue() 无论调用多少次,都会返回同一个 LinkedBlockingQueue 实例。

问题现象

假设通过以下代码创建了两个线程池:

ThreadPoolExecutor pool1 = ThreadPool.getSingleThreadPool("key1");
ThreadPoolExecutor pool2 = ThreadPool.getSingleThreadPool("key2");

由于 pool1 和 pool2 都基于 PoolType.AUTO_TEST 创建,它们共享同一个队列。会导致以下问题:

任务混乱:

任务 A 被提交到 pool1 的队列,但可能会被 pool2 的线程取走并执行。 不同线程池之间的任务处理相互干扰,逻辑混乱。

任务丢失:

当队列已满时,多个线程池可能对同一个队列并发操作,导致任务被拒绝或覆盖。 某些任务被无意删除,无法被任何线程池处理。

线程池隔离性丧失:

理论上,每个线程池应该有独立的队列,互不干扰。但由于队列共享,不同线程池的任务管理存在耦合性,隔离性丧失。

排查过程

面对上述问题,可以通过以下步骤进行排查:

1.检查线程池的任务行为 首先,观察任务提交到线程池后的运行情况。通过打印日志,检查任务是否被提交到预期的线程池中执行。

例如,在任务中打印当前线程名称和任务来源:

Runnable task = () -> {
    System.out.println("Running task from pool: " + Thread.currentThread().getName());
};

如果观察到同一个线程处理了来自不同线程池的任务,就可以确定队列被共享了。

  1. 检查线程池的 BlockingQueue 实例 通过调试代码,检查线程池中绑定的 BlockingQueue 是否是同一个实例。可以通过以下方式验证:
System.out.println("Queue instance for pool1: " + pool1.getQueue().hashCode());
System.out.println("Queue instance for pool2: " + pool2.getQueue().hashCode());

如果输出的哈希值相同,说明两个线程池共享了同一个 BlockingQueue。

  1. 分析 PoolType 枚举的初始化逻辑 深入查看 PoolType 的实现,发现 queue 是在枚举实例化时直接赋值的静态对象。由于枚举实例是全局唯一的,这就导致了共享问题。

解决方案

针对上述问题,可以采用以下解决方案。

方案 1:为每个线程池生成独立的队列

每次创建线程池时,为其生成一个新的 BlockingQueue 实例,而不是直接使用 PoolType 中的 queue。

修改方法 在 getCommonPool 方法中,动态创建队列:

private static ThreadPoolExecutor getCommonPool(String code, PoolType poolType) {
    if (StringUtil.isEmpty(code)) {
        code = "defaultCode";
    }

    ConcurrentHashMap<String, ThreadPoolExecutor> poolMap = POOL_CACHE.computeIfAbsent(poolType, k -> new ConcurrentHashMap<>());

    String finalCode = code;
    return poolMap.computeIfAbsent(code, k -> {
        int corePoolSize = 1; // 如果需要动态获取corePoolSize,可以在这里添加逻辑
        BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(1); // 动态创建队列
        return new ThreadPoolExecutor(corePoolSize, corePoolSize, 0, TimeUnit.SECONDS,
                queue,
                new CustomizableThreadFactory(String.format(poolType.getFactoryNameFormat(), finalCode)),
                poolType.getRejectedExecutionHandler());
    });
}

优点: 每个线程池有独立的队列,避免了共享问题。 简单易行,修改范围小。

方案 2:通过工厂方法动态生成队列

将 PoolType 的 queue 字段替换为一个工厂方法,每次调用 getQueue() 时生成一个新的队列。

修改枚举

java
enum PoolType {
    AUTO_TEST("自动测试", "AUTO-TEST-%s-",
              null, new ThreadPoolExecutor.DiscardOldestPolicy(),
              () -> new LinkedBlockingQueue<>(1)), // 工厂方法替代静态队列
    ;

    private String desc;
    private String factoryNameFormat;
    private String poolSize;
    private RejectedExecutionHandler rejectedExecutionHandler;
    private QueueFactory queueFactory;

    PoolType(String desc, String factoryNameFormat, String poolSize,
             RejectedExecutionHandler rejectedExecutionHandler, QueueFactory queueFactory) {
        this.desc = desc;
        this.factoryNameFormat = factoryNameFormat;
        this.poolSize = poolSize;
        this.rejectedExecutionHandler = rejectedExecutionHandler;
        this.queueFactory = queueFactory;
    }

    public BlockingQueue<Runnable> getQueue() {
        return queueFactory.create();
    }
}

@FunctionalInterface
public interface QueueFactory {
    BlockingQueue<Runnable> create();
}

优点 灵活性更高,可以根据需求动态调整队列类型和容量。 保证线程池隔离性。

方案 3:限制每个 PoolType 只创建一个线程池

如果业务允许,可以通过限制每个 PoolType 只能创建一个线程池,避免队列共享问题。

修改 POOL_CACHE 结构 将 POOL_CACHE 的结构从双层 ConcurrentHashMap 改为单层:

private static final ConcurrentHashMap<PoolType, ThreadPoolExecutor> POOL_CACHE = new ConcurrentHashMap<>();

private static ThreadPoolExecutor getCommonPool(PoolType poolType) {
    return POOL_CACHE.computeIfAbsent(poolType, k -> {
        int corePoolSize = 1;
        return new ThreadPoolExecutor(corePoolSize, corePoolSize, 0, TimeUnit.SECONDS,
                poolType.getQueue(),
                new CustomizableThreadFactory(poolType.getFactoryNameFormat()),
                poolType.getRejectedExecutionHandler());
    });
}

优点 简化代码逻辑。 确保每个 PoolType 只有一个线程池实例,避免共享问题。 缺点 如果需要为同一个 PoolType 创建多个线程池,就无法满足需求。

总结

在多线程开发中,队列共享问题可能导致任务混乱、任务丢失以及线程池隔离性丧失。通过分析代码,可以发现问题的根源在于 BlockingQueue 的静态实例化。通过动态生成独立队列或限制线程池实例数量,可以有效解决此问题。

推荐在需要多个线程池实例的场景下使用 方案 1 或 2,而在简单场景下可以使用 方案 3 简化设计。

相关推荐

高性能并发队列Disruptor使用详解

基本概念Disruptor是一个高性能的异步处理框架,是一个轻量的Java消息服务JMS,能够在无锁的情况下实现队列的并发操作Disruptor使用环形数组实现了类似队列的功能,并且是一个有界队列....

Disruptor一个高性能队列_java高性能队列

Disruptor一个高性能队列前言说到队列比较熟悉的可能是ArrayBlockingQueue、LinkedBlockingQueue这两个有界队列,大多应用在线程池中使用能保证线程安全,但其安全性...

谈谈防御性编程_防御性策略

防御性编程对于程序员来说是一种良好的代码习惯,是为了保护自己的程序在不可未知的异常下,避免带来更大的破坏性崩溃,使得程序在错误发生时,依然能够云淡风轻的处理,但很多程序员入行很多年,写出的代码依然都是...

有人敲门,开水开了,电话响了,孩子哭了,你先顾谁?

前言哎呀,这种情况你肯定遇到过吧!正在家里忙活着,突然——咚咚咚有人敲门,咕噜咕噜开水开了,铃铃铃电话响了,哇哇哇孩子又哭了...我去,四件事一起来,人都懵了!你说先搞哪个?其实这跟我们写Java多线...

面试官:线程池如何按照core、max、queue的执行顺序去执行?

前言这是一个真实的面试题。前几天一个朋友在群里分享了他刚刚面试候选者时问的问题:"线程池如何按照core、max、queue的执行循序去执行?"。我们都知道线程池中代码执行顺序是:co...

深入剖析 Java 中线程池的多种实现方式

在当今高度并发的互联网软件开发领域,高效地管理和利用线程资源是提升程序性能的关键。Java作为一种广泛应用于后端开发的编程语言,为我们提供了丰富的线程池实现方式。今天,就让我们深入探讨Java中...

并发编程之《彻底搞懂Java线程》_java多线程并发解决方案详解

目录引言一、核心概念:线程是什么?...

Redis怎么实现延时消息_redis实现延时任务

一句话总结Redis可通过有序集合(ZSET)实现延时消息:将消息作为value,到期时间戳作为score存入ZSET。消费者轮询用ZRANGEBYSCORE获取到期消息,配合Lua脚本保证原子性获取...

CompletableFuture真的用对了吗?盘点它最容易被误用的5个场景

在Java并发编程中,CompletableFuture是处理异步任务的利器,但不少开发者在使用时踩过这些坑——线上服务突然雪崩、异常悄无声息消失、接口响应时间翻倍……本文结合真实案例,拆解5个最容易...

接口性能优化技巧,有点硬_接口性能瓶颈

背景我负责的系统到2021年初完成了功能上的建设,开始进入到推广阶段。随着推广的逐步深入,收到了很多好评的同时也收到了很多对性能的吐槽。刚刚收到吐槽的时候,我们的心情是这样的:...

禁止使用这5个Java类,每一个背后都有一段&quot;血泪史&quot;

某电商平台的支付系统突然报警:大量订单状态异常。排查日志发现,同一笔订单被重复支付了三次。事后复盘显示,罪魁祸首竟是一行看似无害的SimpleDateFormat代码。在Java开发中,这类因使用不安...

无锁队列Disruptor原理解析_无锁队列实现原理

队列比较队列...

Java并发队列与容器_java 并发队列

【前言:无论是大数据从业人员还是Java从业人员,掌握Java高并发和多线程是必备技能之一。本文主要阐述Java并发包下的阻塞队列和并发容器,其实研读过大数据相关技术如Spark、Storm等源码的,...

线程池工具及拒绝策略的使用_线程池处理策略

线程池的拒绝策略若线程池中的核心线程数被用完且阻塞队列已排满,则此时线程池的资源已耗尽,线程池将没有足够的线程资源执行新的任务。为了保证操作系统的安全,线程池将通过拒绝策略处理新添加的线程任务。...

【面试题精讲】ArrayBlockingQueue 和 LinkedBlockingQueue 区别?

有的时候博客内容会有变动,首发博客是最新的,其他博客地址可能会未同步,认准...