前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >完蛋了,线程池死锁,生产出Bug了

完蛋了,线程池死锁,生产出Bug了

作者头像
xiaoyi
发布2024-04-10 15:58:13
860
发布2024-04-10 15:58:13
举报
文章被收录于专栏:小义思小义思

生产环境系统excel报表导不出,挨客户投诉,内心慌得一批,赶紧查看日志,结果发现是线程池死锁,这锅真是湿手捏干面,不背也得背了。

遇事不要慌,来杯82年的java压压惊。还好之前做了开关配置,小义赶紧切换开关恢复旧页面,先解决客户问题。排查了半天日志,原来是因为父子任务共用同个线程池,造成循环依赖,直接堵死了导出请求。

下面来模拟一下当时的导出场景,客户导出一天的订单,分批按100条去查每个订单详情,然后每个订单有关联的运单信息需要另外分批按10个10个的去查。不要问小义为什么一次查询的数量这么少,数据我管不着,接口都是第三方服务的,他们只能这么支持。

为提高查询效率,只能利用多线程了。先新建一个通用的线程池和线程工厂。

  • 线程工厂
代码语言:javascript
复制
import com.sun.istack.NotNull;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @description: 线程工厂,可自定义线程名
 */
public class NamedThreadFactory implements ThreadFactory {

    /**
     * 线程名前缀
     */
    private final String prefix;

    /**
     * 线程编号
     */
    private final AtomicInteger threadNumber = new AtomicInteger(1);

    /**
     * 创建线程工厂
     *
     * @param prefix 线程名前缀
     */
    public NamedThreadFactory(String prefix) {
        this.prefix = prefix;
    }

    @Override
    public Thread newThread(@NotNull Runnable r) {
        return new Thread(null, r, prefix + threadNumber.getAndIncrement());
    }
}
  • 配置线程池
代码语言:javascript
复制
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * Description:线程池基础配置
 */
@Configuration
public class ExecutorConfig implements WebMvcConfigurer {

    @Bean(value = "orderExecutorService")
    public ExecutorService aaasMiniExecutorService() {
        return new MdcThreadPoolExecutor(5, 10, 60,
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(3000),
                new NamedThreadFactory("order"));
    }

}
  • 执行代码

假设总共有5000条订单,分5000/100=50个父线程去查订单详情,每个父线程再新建100/10=10个子线程去查运单号。所以父线程要等待子线程执行完然后组装订单信息。

写个单元测试复现一下

代码语言:javascript
复制
public class ThreadPoolTest {

    @Autowired
    private ExecutorService orderExecutorService;

    @Test
    public void executorTest() {
        List<CompletableFuture<List<OrderDetailDTO>>> orderFutureList = Lists.newArrayList();
        for (int i = 0; i < 50; i++) {
            int finalPageNo = i;
            orderFutureList.add(CompletableFuture.supplyAsync(() -> {
                List<OrderDetailDTO> orderInfoList = queryOrderInfo(finalPageNo,100);
                return orderInfoList;
            }, orderExecutorService));
        }
        List<OrderDetailDTO> orderDetailDTOList = orderFutureList.stream().map(f -> f.join())
                .flatMap(l -> l.stream()).collect(Collectors.toList());
        return;
    }

    private List<OrderDetailDTO> queryOrderInfo(int pageNo, int pageSize) {
        try {
            //查询订单信息
            List<OrderDetailDTO> orderList = queryOrderDetail(pageNo, pageSize);
            List<String> orderIdList = orderList.stream().map(OrderDetailDTO::getOrderId).collect(Collectors.toList());
            List<List<String>> orderIdListList = Lists.partition(orderIdList, 10);
            Map<String, String> idMap = queryBatchTrackId(orderIdListList);
            orderList.stream().forEach(orderDetailDTO -> {
                orderDetailDTO.setTrackId(idMap.getOrDefault(orderDetailDTO.getOrderId(),""));
            });
        } catch (Exception e) {
            log.error("查询订单详情异常:{}",e.getMessage(),e);
        }
        return Lists.newArrayList();
    }

    private Map<String, String> queryBatchTrackId(List<List<String>> orderIdListList) {
        Map<String, String> map = new HashMap<>();
        List<CompletableFuture<List<TrackInfo>>> trackFutureList = Lists.newArrayList();
        if (CollUtil.isEmpty(orderIdListList)) return map;
        try{
            for (List<String> list : orderIdListList) {
                trackFutureList.add(CompletableFuture.supplyAsync(() -> {
                    List<TrackInfo> trackList = queryTrackInfoByOrderId(list);
                    return trackList;
                }, orderExecutorService));
            }
            map = trackFutureList.stream().map(f -> f.join()).flatMap(l -> l.stream())
                    .collect(Collectors.toMap(TrackInfo::getOrderId, TrackInfo::getTrackId));
        }catch (Exception e) {
            log.error("批量查询运单id异常:{}",e.getMessage(),e);
        }
        return map;
    }

    private List<OrderDetailDTO> queryOrderDetail(int pageNo, int pageSize) {
        //...
    }

    private List<TrackInfo> queryTrackInfoByOrderId(List<String> list) {
        //...
    }
}

利用jstack命令分析一下线程状态

代码语言:javascript
复制
"order5" #23 prio=5 os_prio=0 tid=0x000000001a2b4000 nid=0x6c04 waiting on condition [0x000000002311e000]
   java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for  <0x00000000e06b6598> (a java.util.concurrent.CompletableFuture$Signaller)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
 at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
 at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
 at java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1947)
 at org.coco.cat.utils.ThreadPoolTest.lambda$queryBatchTrackId$5(ThreadPoolTest.java:81)
 at org.coco.cat.utils.ThreadPoolTest$$Lambda$735/383614241.apply(Unknown Source)
 at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
 at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
 at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
 at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
 at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
 at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
 at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
 at org.coco.cat.utils.ThreadPoolTest.queryBatchTrackId(ThreadPoolTest.java:82)
 at org.coco.cat.utils.ThreadPoolTest.queryOrderInfo(ThreadPoolTest.java:60)
 at org.coco.cat.utils.ThreadPoolTest.lambda$executorTest$0(ThreadPoolTest.java:45)
 at org.coco.cat.utils.ThreadPoolTest$$Lambda$723/1058101486.get(Unknown Source)
 at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
 - <0x00000000dfdfa878> (a java.util.concurrent.ThreadPoolExecutor$Worker)

可以看到线程已经被锁住了,无法执行任务

  • 问题根源

orderExecutorService核心线程数是5,最大线程是10,队列长度3000。因为父任务过多,一小子就把5个核心线程全部占有了,其他父任务和子任务只能到队列中等候,只有队列塞满了,才会另外起工作线程。这时候所有核心线程因为要等待子任务完成才能结束,而子任务又切好躺在队列中无法执行,所以就造成了循环依赖,也就是死锁,线程池被阻塞,无法工作了。

吃一堑长一智,总结是为了更好的提升,祝大家一起变得更强。

本文参与?腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2024-01-01,如有侵权请联系?cloudcommunity@tencent.com 删除

本文分享自 程序员小义 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与?腾讯云自媒体分享计划? ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
腾讯云 BI
腾讯云 BI(Business Intelligence,BI)提供从数据源接入、数据建模到数据可视化分析全流程的BI能力,帮助经营者快速获取决策数据依据。系统采用敏捷自助式设计,使用者仅需通过简单拖拽即可完成原本复杂的报表开发过程,并支持报表的分享、推送等企业协作场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
http://www.vxiaotou.com