小组同学在使用kafka官方工具kafka-consumer-groups.sh
批量导集群消费组详情时,发现某一个集群基于broker的某些消费组会出现异常,主要表现:
兵马未动,粮草先行。在分析问题前我们需要具体梳理一下潜在的线索:
kafka-consumer-groups.sh
特殊场景下的Bug?1) strace相关进程,发现进程确实阻塞住了
2) 查看/proc/pid/stack看则有下面的堆栈输出
$ cat /proc/12097/stack [<ffffffff81097b6b>] futex_wait_queue_me+0xdb/0x140 [<ffffffff81097e46>] futex_wait+0x166/0x250 [<ffffffff81099f1e>] do_futex+0xde/0x570 [<ffffffff8109a421>] SyS_futex+0x71/0x150 [<ffffffff81b2a202>] system_call_fastpath+0x16/0x1b [<ffffffffffffffff>] 0xffffffffffffffff
3) 查看机器版本
与现网其他机器相比,没有太大的差异,机器层面相关性可能不是很大。
__consumer_offsets
topic排查基于broker消费的消费组,其偏移量的元数据信息是存储在__consumer_offsets
这个topic下的。笔者之前在《kafka部分group无法正常消费数据排查》一文中曾介绍过因__consumer_offsets
问题导致group异常的情况,所以便查看了一下__consumer_offsets
的情况,发现一切正常:
几经周折,没有发现什么进展,还是决定回到kafka-consumer-groups.sh
本身,从查看进程堵塞原因出发。此时就需要我们jstack查看一下进程内诸线程的情况,我们发现:
进程阻塞在获取某个分区的HW
(HighWatermark
)上(注意:LEO
对消费者是不可见的,所以这里虽然调用的方法是getLogEndOffset
,但实际上是获取HW
),这时我们就要从源码中进行深入的分析。
kafka-consumer-groups.sh
获取基于broker消费組信息,即调用kafka.admin.ConsumerGroupCommand
的KafkaConsumerGroupService.describeGroup
。相关实现如下:
class KafkaConsumerGroupService(val opts: ConsumerGroupCommandOptions) extends ConsumerGroupService { private val adminClient = createAdminClient() // `consumer` is only needed for `describe`, so we instantiate it lazily private var consumer: KafkaConsumer[String, String] = null def list() { adminClient.listAllConsumerGroupsFlattened().foreach(x => println(x.groupId)) } protected def describeGroup(group: String) { val consumerSummaries = adminClient.describeConsumerGroup(group) if (consumerSummaries.isEmpty) println(s"Consumer group `${group}` does not exist or is rebalancing.") else { val consumer = getConsumer() // 打印描述头 printDescribeHeader() consumerSummaries.foreach { consumerSummary => val topicPartitions = consumerSummary.assignment.map(tp => TopicAndPartition(tp.topic, tp.partition)) val partitionOffsets = topicPartitions.flatMap { topicPartition => Option(consumer.committed(new TopicPartition(topicPartition.topic, topicPartition.partition))).map { offsetAndMetadata => topicPartition -> offsetAndMetadata.offset } }.toMap describeTopicPartition(group, topicPartitions, partitionOffsets.get, _ => Some(s"${consumerSummary.clientId}_${consumerSummary.clientHost}")) } } } // 获取HW的值 protected def getLogEndOffset(topic: String, partition: Int): LogEndOffsetResult = { val consumer = getConsumer() val topicPartition = new TopicPartition(topic, partition) consumer.assign(List(topicPartition).asJava) consumer.seekToEnd(topicPartition) val logEndOffset = consumer.position(topicPartition) LogEndOffsetResult.LogEndOffset(logEndOffset) } //省略中间一部分不重要的代码 private def createNewConsumer(): KafkaConsumer[String, String] = { val properties = new Properties() val deserializer = (new StringDeserializer).getClass.getName val brokerUrl = opts.options.valueOf(opts.bootstrapServerOpt) properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl) properties.put(ConsumerConfig.GROUP_ID_CONFIG, opts.options.valueOf(opts.groupOpt)) // 不自动提交offset很重要,否则会影响消费组正常的消费(丢数据) properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000") properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer) properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer) if (opts.options.has(opts.commandConfigOpt)) properties.putAll(Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))) new KafkaConsumer(properties) } }
CURRENT OFFSET
和LOG END OFFSET
计算规则其中关键部分:
CURRENT OFFSET
计算规则:consumer.committed(new TopicPartition(topicPartition.topic, topicPartition.partition))).map { offsetAndMetadata => topicPartition -> offsetAndMetadata.offset }
HW
(脚本显示的LOG END OFFSET
)计算规则consumer.seekToEnd(topicPartition) val logEndOffset = consumer.position(topicPartition)
从jstack分析来看,是这一步卡住了。
参考HW
(脚本显示的LOG END OFFSET
)计算规则,实现了一个简单的HW
采集程序,分别采集异常消费组下2个topic的情况,来看看具体是哪一步卡住?卡住前后是否有相关日志或输出?
我们发现:
HW
值HW
值,可能异常进而发现补录的topic存在leader为-1的情况。
推测:因为离线补录的topic大部分是不会在线上生产数据,只会在某些特点场景下由平台侧往里面的一次性导入数据,所有这个古老的集群当时下掉若干个节点时并没有迁移这些一次性的topic,从而在使用kafka-consumer-groups.sh
获取消费组产生异常。
1、 这次问题分析走了一些弯路,但还是加强了对kafka-consumer-groups.sh
实现原理的理解
2、topic leader为-1会造成各种各样奇怪的问题,哪怕是一些不重要的topic。
目前所有高版本的集群针对这类场景有完善的监控,而0.9.0.1这种古老集群还相对不完善,等最近裁撤迁移到新集群后会有很大改善。
云服务器 2核 4核有什么区别?有区别的,这是指的 云服务器 的CPU,也就是服务器...
现代科学技术的发展为智慧城市的建设提供了可能,尤其是 大数据 时代的到来,使...
简介 正则表达式是我们做数据匹配的时候常用的一种工具 虽然正则表达式的语法并...
被贴标签,是生活在这个时代的每个人,都无法避免的事情。 沉迷游戏、农村子弟、...
随着越来越多的企业基于 mPaaS 搭建并上线新的 App App 的上线质量也成为各个客...
参考: 中国信息通信研究院发布的《全球数字治理白皮书(2020)》,《全球数字经济...
公司简介 广州市藏星网络科技有限公司的主要产品是移动互联网应用“上学帮”,产...
1. 接口描述 接口请求域名: cvm.tencentcloudapi.com 。 本接口(InquirePricePu...
不能保证。EIP被释放后进入地址池重新随机分配,若只是短暂停用并希望后续继续使...
问题背景 Linux操作系统安装Oracle数据库时会校验Swap分区大小,如果操作系统自...