Proxy服务负责消费CKafka消息并解析,并分发消息至不同的CKafka topic。近期发现Proxy服务消费CKafka有消息堆积,且服务所在CVM CPU与内存资源大概只占用50%左右。
如图所示可以看到,在数据量峰值的的时候,生产流量可以达到2000MB/小时,但是消费流量达不到这么多,说明该服务有消息堆积。
其他说明:CKafka partition数量与服务实例数量正好一比一关系,CKafka 消费Client Concurrence设置为1。Proxy服务维护一个线程池,用于解析与分发消费的每一条消息。每当有消息进入服务时,每条消息会用一个线程进行解析消息并发送数据。
@KafkaListener(topics = "topic") public void consumerKafkaMsg(List<ConsumerRecord<?, String>> records) throws Exception { for (ConsumerRecord<?, String> record : records) { log.debug("kafka topic = {}, value:\n{}", record.topic(), record.value()); service.handleMsg(record); } }
@PostConstruct public void init() { BlockingQueue<Runnable> workingQueue = new ArrayBlockingQueue<Runnable>(consumerCount); RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy(); ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("kafka-consumer-%d").build(); threadPool = new ThreadPoolExecutor(consumerCount, consumerCount, 0L, TimeUnit.MILLISECONDS, workingQueue, namedThreadFactory, rejectedExecutionHandler); } public void handleMsg(ConsumerRecord<?, String> record) { threadPool.execute(new ThreadPoolTask(recorde)); }
使用Arthas工具分析一下堆栈,如下图,可以看到每个线程都在TIMED_WAITING的等待状态,CPU消耗也很低,初步判断消费堆积并不是因为线程数量不够,而是卡在IO。
查看线程状态也可以看到线程池中每个线程都在等待,卡在方法dosent上面,有可能是CKafka集群限流。
上面代码2中可以看到线程池队列长度是和线程数保持一致,因为线程池的策略是线程数达到队列最大时就由主线程去执行线程作业,从而导致主线程没有继续拉取数据,其他线程执行完成之后没有数据就如上图所示等待主线程完成作业后再去拉取数据。
增大了线程池队列长度之后,发现线程状态还是变化不大,也还是一直在等待主线程。
方法一效果并不是很明显,我们可以换一个思路。在代码1中每条消息都会有一个线程去执行任务,因为消息较多,每个消息一个线程会有点效率低下,可以尝试将一批数据放入一个线程,提高线程的CPU利用率,从而解决问题。
public void handleMsg(List<ConsumerRecord<?, String>> records) { threadPool.execute(new ThreadPoolTask(records)); }
修改完成后发现线程CPU利用率上升明显。
同时重新查看Arthas里面每个线程的状态,线程卡在kafkaTemplate里面的dosent方法,再往上是awiat
查看await方法源码,发现其实是在等待batchSize。因为压测时batchSize设置得比较大,在正式环境中数据量没达到压测大数据量,但是因为这个方法一直在等待batchSize填充完成,所以才导致线程一直在等待没有发送Kafka消息,卡在dosent上面。
PS: 还有一个LingerMs参数控制发送,batchSIze与lingerMs时间哪一个先达到则就发送。LingerMs的默认时间是1分钟。结果与总结
腾讯云监控还是起了很大作用,在调优过程有很大参考意义,Ckafka或者组件都需要进行适当的参数调整才能发挥最大作用
效果还是比较明显可以看到机器CPU负载提高显著,未消费的Kafka消息也慢慢降低,达到预期。
1.世界上最悲哀的事莫过于睡眠不足,尤其是在感冒的时候。 2.我是一个很有原则...
1.我手机掉进厕所了怎么办?是屎在给我发信息吗? 2.生活就像新闻联播,不是换...
?又到一年毕业季,即将开启(实习及正式)职场生涯的同学们会有不少疑惑。比如,...
年味到底是什么?不同的时代,人们迎接春节的方式也在变换。 在70后的眼里,年味...
数据目录已成为企业数据管理策略的重要组成部分,但选择合适的数据目录并不是简...
我们将创建以下三个Shell脚本来锁定和解锁多个帐户和查看账号状态。 创建锁定用...
作者 许力 阿里云原生多模数据库Lindorm与东软云科技推出联合解决方案 共建面向...
新冠肺炎疫情的全球大流行不仅深刻影响了世界政治经济格局的发展演进,而且加速...
Knative 是基于 Kubernetes 的开源 Serverless 应用编排框架。阿里云 Knative 在...
1.男女之间是真的可以有纯友谊的,只要一个打死不说一个装傻到底。 2.不要迷恋...