前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >kafka-消费者偏移量__consumer_offsets_相关解析

kafka-消费者偏移量__consumer_offsets_相关解析

原创
作者头像
Get
发布2024-03-10 20:33:45
1400
发布2024-03-10 20:33:45

https://blog.csdn.net/z69183787/article/details/109810468

在kafka的log文件中发现了还有很多以?__consumer_offsets_的文件夹;总共50个;

代码语言:java
复制
考虑到一个 kafka 生成环境中可能有很多consumer?和?consumer group,如果这些 consumer 同时提交位移,
则必将加重 __consumer_offsets 的写入负载,

因此 kafka 默认为该 topic 创建了50个分区,并且对每个?group.id做哈希求模运算Math.abs(groupID.hashCode()) % numPartitions,
从而将负载分散到不同的 __consumer_offsets 分区上。
分区数可以通过?offsets.topic.num.partitions?参数设置,默认值为50。

clipboard.png
clipboard.png
代码语言:java
复制
由于Zookeeper并不适合大批量的频繁写入操作,新版Kafka已推荐将consumer的位移信息保存在Kafka内部的topic中,
即__consumer_offsets?topic,并且默认提供了/bin/kafka_consumer_groups.sh 脚本供用户查看consumer信息。
__consumer_offsets?是 kafka 自行创建的,和普通的 topic 相同,它存在的目的之一就是保存 consumer 提交的位移。
__consumer_offsets?的每条消息格式大致如图所示:
1、group.id:
2、topic + partition:
3、offset:
KV 格式的消息,key 就是一个三元组:group.id+topic+分区号,而 value 就是 offset 的值。

clipboard.png
clipboard.png

生成、消费流程:

代码语言:java
复制
1. 消费Topic消息
打开一个session a,执行下面的消费者命令 ;指定了消费组:szz1-group;?topic:szz1-test-topic
bin/kafka-console-consumer.sh --bootstrap-server  xxx1:9092,xxx2:9092,xxx3:9092 --group szz1-group --topic szz1-test-topic
2.产生消息
打开一个新的session b,执行生产消息命令
bin/kafka-console-producer.sh --broker-list  xxx1:9092,xxx2:9092,xxx3:9092  --topic szz1-test-topic
发送几条消息

010271124076.png
010271124076.png

然后可以看到刚刚打开的 session a 消费了消息;

027112455861.png
027112455861.png
代码语言:java
复制
3. 查看指定消费组的消费位置 offset
bin/kafka-consumer-groups.sh --bootstrap-server xxx1:9092,xxx2:9092,xxx3:9092 --describe --group szz1-group
可以看到图中 展示了每个partition?对应的消费者id; 因为只开了一个消费者; 所以是这个消费者同时消费3个partition;
TOPIC:主题
PARTTION:分区ID
CURRENT-OFFSET: 当前消费组消费到的偏移量
LOG-END-OFFSET: 日志最后的偏移量 
LAG:落差,指还有几个消息没有被消费(LOG-END-OFFSET - CURRENT-OFFSET = 0,说明当前消费组已经全部消费了)
CONSUMER-ID:消费者 ID
HOST:消费者 IP
CLIENT-ID:消费组 ID

clipboard.png
clipboard.png

那么我把 session a 关掉;现在没有消费者之后; 我再发送几条消息看看;

027113444742.png
027113444742.png
代码语言:java
复制
发送了2条消息之后,?partition-0?partition-1?的LOG-END-OFFSET: 日志最后的偏移量分别增加了1; 
但是CURRENT-OFFSET: 当前消费组消费到的偏移量?保持不变; 这是因为没有被消费;
重新打开一个消费组 继续消费,
重新打开session之后, 会发现控制台输出了刚刚发送的2条消息; 并且偏移量也更新了

027113806535.png
027113806535.png
代码语言:java
复制
4. 从头开始消费 --from-beginning
如果我们用新的消费组去消费一个Topic,那么默认这个消费组的offset会是最新的; 也就是说历史的不会消费
例如下面我们新开一个session c ;消费组设置为szz1-group3
bin/kafka-console-consumer.sh --bootstrap-server   xxx1:9092,xxx2:9092,xxx3:9092 --group szz1-group3    --topic szz1-test-topic
查看消费情况
bin/kafka-consumer-groups.sh --bootstrap-server  xxx1:9092,xxx2:9092,xxx3:9092  --describe --group szz1-group3

027123506994.png
027123506994.png
代码语言:java
复制
可以看到 CURRENT-OFFSET?=?LOG-END-OFFSET?;
如何让新的消费组/者 从头开始消费呢? 加上参数?--from-beginning
5.如何确认 consume_group 在哪个__consumer_offsets-? 中
Math.abs(groupID.hashCode()) % numPartitions
6. 查找__consumer_offsets 分区数中的消费组偏移量offset
上面的?3. 查看指定消费组的消费位置offset?中,我们知道如何查看指定的topic消费组的偏移量;
那还有一种方式也可以查询
先通过?consume_group?确定分区数; 例如?"szz1-group".hashCode()%50=32; 
那我们就知道?szz-group消费组的偏移量信息存放在?__consumer_offsets_32中;
通过命令
bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 32 --broker-list xxx1:9092,xxx2:9092,xxx3:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"

027135823471.png
027135823471.png
代码语言:java
复制
前面的 是key 后面的是value;key由 消费组+Topic+分区数 确定; 后面的value就包含了 消费组的偏移量信息等等
然后接着我们发送几个消息,并且进行消费; 上面的控制台会自动更新为新的offset;
7 查询topic的分布情况
bin/kafka-topics.sh --describe --zookeeper xxx:2181 --topic TOPIC名称
8 清理?__consumer_offsets?

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
http://www.vxiaotou.com