当前位置:主页 > 查看内容

kafka消费组信息采集异常(hang住)排查

发布时间:2021-05-11 00:00| 位朋友查看

简介:一、问题描述 小组同学在使用kafka官方工具 kafka-consumer-groups.sh 批量导集群消费组详情时,发现某一个集群基于broker的某些消费组会出现异常,主要表现: 结果不全: 只有部分分区的信息 进程会阻塞: 不会像导他消费组时那样,执行完自动退出 image.png……

一、问题描述

小组同学在使用kafka官方工具kafka-consumer-groups.sh批量导集群消费组详情时,发现某一个集群基于broker的某些消费组会出现异常,主要表现:

  • 结果不全: 只有部分分区的信息
  • 进程会阻塞: 不会像导他消费组时那样,执行完自动退出
image.png

二、问题分析

1、信息梳理

兵马未动,粮草先行。在分析问题前我们需要具体梳理一下潜在的线索:

  • 集群的版本是0.9.0.1的bug?(古老的版本,最近半年会裁撤掉)但目前0.9.0.1版本集群中只有这个出现这样的问题
  • 集群部署不规范,broker和zk端口不统一的问题?但之前也有类似部署不规范的集群,没有出现过这样的问题
  • 集群机器有异常?
  • broker消费特殊场景下的bug?此次异常的消费组大多同时消费2个topic: 一个是日常三副本的topic,一个是离线补录的二副本的topic,确实存在bug的可能性。
  • kafka-consumer-groups.sh特殊场景下的Bug?

2、机器问题排查

1) strace相关进程,发现进程确实阻塞住了

2) 查看/proc/pid/stack看则有下面的堆栈输出

$ cat /proc/12097/stack

[<ffffffff81097b6b>] futex_wait_queue_me+0xdb/0x140
[<ffffffff81097e46>] futex_wait+0x166/0x250
[<ffffffff81099f1e>] do_futex+0xde/0x570
[<ffffffff8109a421>] SyS_futex+0x71/0x150
[<ffffffff81b2a202>] system_call_fastpath+0x16/0x1b
[<ffffffffffffffff>] 0xffffffffffffffff

3) 查看机器版本

与现网其他机器相比,没有太大的差异,机器层面相关性可能不是很大。

3、__consumer_offsetstopic排查

基于broker消费的消费组,其偏移量的元数据信息是存储在__consumer_offsets这个topic下的。笔者之前在《kafka部分group无法正常消费数据排查》一文中曾介绍过因__consumer_offsets问题导致group异常的情况,所以便查看了一下__consumer_offsets的情况,发现一切正常:

image.png

4、进程阻塞原因排查

几经周折,没有发现什么进展,还是决定回到kafka-consumer-groups.sh本身,从查看进程堵塞原因出发。此时就需要我们jstack查看一下进程内诸线程的情况,我们发现:

image.png

进程阻塞在获取某个分区的HW(HighWatermark)上(注意:LEO对消费者是不可见的,所以这里虽然调用的方法是getLogEndOffset,但实际上是获取HW),这时我们就要从源码中进行深入的分析。

三、源码分析

kafka-consumer-groups.sh获取基于broker消费組信息,即调用kafka.admin.ConsumerGroupCommandKafkaConsumerGroupService.describeGroup。相关实现如下:

1、KafkaConsumerGroupService

class KafkaConsumerGroupService(val opts: ConsumerGroupCommandOptions) extends ConsumerGroupService {

  private val adminClient = createAdminClient()

  // `consumer` is only needed for `describe`, so we instantiate it lazily
  private var consumer: KafkaConsumer[String, String] = null

  def list() {
    adminClient.listAllConsumerGroupsFlattened().foreach(x => println(x.groupId))
  }

  protected def describeGroup(group: String) {
    val consumerSummaries = adminClient.describeConsumerGroup(group)
    if (consumerSummaries.isEmpty)
      println(s"Consumer group `${group}` does not exist or is rebalancing.")
    else {
      val consumer = getConsumer()
	  // 打印描述头
      printDescribeHeader()
      consumerSummaries.foreach { consumerSummary =>
        val topicPartitions = consumerSummary.assignment.map(tp => TopicAndPartition(tp.topic, tp.partition))
        val partitionOffsets = topicPartitions.flatMap { topicPartition =>
          Option(consumer.committed(new TopicPartition(topicPartition.topic, topicPartition.partition))).map { offsetAndMetadata =>
            topicPartition -> offsetAndMetadata.offset
          }
        }.toMap
        describeTopicPartition(group, topicPartitions, partitionOffsets.get,
          _ => Some(s"${consumerSummary.clientId}_${consumerSummary.clientHost}"))
      }
    }
  }

  // 获取HW的值
  protected def getLogEndOffset(topic: String, partition: Int): LogEndOffsetResult = {
    val consumer = getConsumer()
    val topicPartition = new TopicPartition(topic, partition)
    consumer.assign(List(topicPartition).asJava)
    consumer.seekToEnd(topicPartition)
    val logEndOffset = consumer.position(topicPartition)
    LogEndOffsetResult.LogEndOffset(logEndOffset)
  }

 //省略中间一部分不重要的代码 
 
  private def createNewConsumer(): KafkaConsumer[String, String] = {
    val properties = new Properties()
    val deserializer = (new StringDeserializer).getClass.getName
    val brokerUrl = opts.options.valueOf(opts.bootstrapServerOpt)
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, opts.options.valueOf(opts.groupOpt))
   // 不自动提交offset很重要,否则会影响消费组正常的消费(丢数据) 
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
    properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer)
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer)
    if (opts.options.has(opts.commandConfigOpt)) properties.putAll(Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)))

    new KafkaConsumer(properties)
  }
}

2、CURRENT OFFSETLOG END OFFSET计算规则

其中关键部分:

  • CURRENT OFFSET计算规则:
consumer.committed(new TopicPartition(topicPartition.topic, topicPartition.partition))).map { offsetAndMetadata =>
  topicPartition -> offsetAndMetadata.offset
}
  • HW(脚本显示LOG END OFFSET)计算规则
    consumer.seekToEnd(topicPartition)
    val logEndOffset = consumer.position(topicPartition)

从jstack分析来看,是这一步卡住了。

3、验证分析

参考HW(脚本显示的LOG END OFFSET)计算规则,实现了一个简单的HW采集程序,分别采集异常消费组下2个topic的情况,来看看具体是哪一步卡住?卡住前后是否有相关日志或输出?

我们发现:

  • 该消费组下日常的topic是正常获取HW
image.png
  • 而离线补录的topic无法正常获取HW值,可能异常
image.png

进而发现补录的topic存在leader为-1的情况。

image.png

推测:因为离线补录的topic大部分是不会在线上生产数据,只会在某些特点场景下由平台侧往里面的一次性导入数据,所有这个古老的集群当时下掉若干个节点时并没有迁移这些一次性的topic,从而在使用kafka-consumer-groups.sh获取消费组产生异常。

四、总结

1、 这次问题分析走了一些弯路,但还是加强了对kafka-consumer-groups.sh实现原理的理解

2、topic leader为-1会造成各种各样奇怪的问题,哪怕是一些不重要的topic。

目前所有高版本的集群针对这类场景有完善的监控,而0.9.0.1这种古老集群还相对不完善,等最近裁撤迁移到新集群后会有很大改善。


本站部分内容转载于网络,版权归原作者所有,转载之目的在于传播更多优秀技术内容,如有侵权请联系QQ/微信:153890879删除,谢谢!

推荐图文


随机推荐