前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >kafka-2-生产者-流程

kafka-2-生产者-流程

原创
作者头像
Get
发布2024-03-10 20:19:23
780
发布2024-03-10 20:19:23

https://blog.csdn.net/itcodexy/article/details/109574747

https://www.jianshu.com/p/8a7fa04a5a49

https://segmentfault.com/a/1190000015282836

https://segmentfault.com/u/snailiu

https://www.cnblogs.com/sujing/p/10960832.html

详解:

clipboard.png
clipboard.png
代码语言:java
复制
消息在网络中传输的方式只能通过二级制的方式,所以首先需要将消息序列化为二进制形式缓存在客户端,
kafka 使用了双端队列的方式将消息缓存起来,
然后使用发送线程(Sender)读取队列中的消息交给 Selector 进行网络传输发送给服务端(Broker)
1. 客户端组件
2. 客户端缓存存储模型
3. 确定消息的 partition 位置
4. 发送线程的工作原理


1、通过使用以下四大客户端组件来完成客户端消息的发送工作:
   1、KafkaProducer:是一个生产者客户端的进程,通过该对象启动生产者来发送消息。
   2、RecordAccumulator:是一个记录收集器,用于收集客户端发送的消息,并将收集到的消息暂存到客户端缓存中。
   3、Sender:是一个发送线程,负责读取记录收集器中缓存的批量消息,经过一些中间转换操作,
              将要发送的数据准备好,然后交由 Selector 进行网络传输。
   4、Selector:是一个选择器,用于处理网络连接和读写处理,使用网络连接处理客户端上的网络请求
   
2、客户端缓存模型:一条消息首先需要确定要被存储到那个 partition 对应的双端队列上;
    其次,存储消息的双端队列是以批的维度存储的,即 N 条消息组成一批,一批消息最多存储 N 条,
          超过后则新建一个组来存储新消息;
    其次,新来的消息总是从左侧写入,即越靠左侧的消息产生的时间越晚;
          最后,只有当一批消息凑够 N 条后才会发送给 Broker,否则不会发送到 Broker 上。

3、确定消息的 partition 位置:2 种方式:对Partition哈希求余、轮询
    A:对于指定了 key 的消息,partition 位置的计算方式为:Utils.murmur2(key) % numPartitions
       即先对 key 进行哈希计算,然后在于 partition 个数求余,
       从而得到该条消息应该被存储在哪个 partition 上。
    B:对于没有指定 key 的消息,partition 位置的计算方式为:采用 round-robin 方式确定 partition 位置
       即采用轮询的方式,平均的将消息分布到不同的 partition 上,
       从而避免某些 partition 数据量过大影响 Broker 和消费端性能。
4、发送线程的工作原理
Sender 线程的主要工作是收集满足条件的批数据
    第一步:扫描记录收集器中满足条件的批数据,然后将 partition -> 批数据映射转换成 BrokerId -> N 批数据的映射。
    第二步:Sender 线程会为每个 BrokerId 创建一个客户端请求,然后将请求交给 NetWorkClient,
            由 NetWrokClient 去真正发送网络请求到 Broker。
NetWorkClient 的工作内容
    Sender:该线程准备好要发送的数据后,交由 NetWorkClient 来进行网络相关操作。
            主要包括客户端与服务端的建连、发送客户端请求、接受服务端响应。
            完成如上一系列的工作主要由如下方法完成。
    reday()方法。从记录收集器获取准备完毕的节点,并连接所有准备好的节点。
    send()方法。为每个节点创建一个客户端请求,然后将请求暂时存到节点对应的 Channel(通道)中。
    poll()方法。该方法会真正轮询网络请求,发送请求给服务端节点和接受服务端的响应。

clipboard.png
clipboard.png
clipboard.png
clipboard.png

流程详解:

代码语言:java
复制
消息发送的过程中,涉及到两个线程协同工作:
1、主线程首先将业务数据封装成ProducerRecord对象,
2、之后调用send()方法将消息放入RecordAccumulator(消息收集器,也可以理解为主线程与Sender线程直接的缓冲区)中暂存,
3、Sender线程负责将消息信息构成请求,并最终执行网络I/O的线程,它从RecordAccumulator中取出消息并批量发送出去,
   需要注意的是,KafkaProducer是线程安全的,多个线程间可以共享使用同一个KafkaProducer对象
1、在我们通过代码send()消息之后,这条消息就会发往拦截器Interceptor
2、Interceptor会对数据做处理
    ~ 加解密/脱敏
    ~ 过滤不满足条件的数据(ip白名单、错误编码、脏数据或者残缺数据)
    ~ 统计消息投递成功率或结合第三方工具计算消息在Kafka存储的时间
    ~ 在消息的header里放一个唯一标识,方便下游做去重
    ~ 针对旧版本,新版本Kafka引入了幂等性来保证Once Exactly(刚好一次)
3、对数据进行序列化
    无论是否存在key,都必须给key和value指定序列化方式(消息在网络中传输的方式只能通过二级制的方式),
    可通过实现Serializer自定义序列化规则
4、确定数据分区(确定消息的 partition 位置)
    分区策略很重要,好的分区策略可以解决数据倾斜的问题
    可通过实现Partitioner接口来自定义分区规则,否则规则如下
        ~ 1、如果发送send的时候指定了分区,则使用指定分区
        ~ 2、如未指定,则根据key进行hash,然后对分区数取模(Utils.murmur2(key) % numPartitions)
        ~ 3、如未指定且没key,则轮询发送给分区(低版本采用随机)
5、临时缓存(存储)
    RecordAccumulator采用了双端队列(Deque)数据结构来临时存储
    目的:提高发送数据的吞吐量
        确定消息发送的分区后,会在RecordAccumulator寻找对应的Deque,找不到对应的Deque则新建
        从对应的Deque的尾巴中取出最后一个RecordBatch(记录大小)进行判断:
            ~ 1、如果该Batch加上当前消息的大小,小于batch.size,则追加进去;
            ~ 2、否则创建新的Deque,将当前消息放进去并将Batch放到Deque队列
            注:RecordBatch是写Kafka的最小单位
6、Sender 拉取数据
    当满足linger.ms和buffer.memory任一个条件时,会进行数据的拉取
7、排队发送
    每一个Deque的数据都有一个对应的ClientRequest,负责携带RecordBatch,排队等待前一个RecordBatch的响应
    (Sender 线程会为每个 BrokerId 创建一个客户端请求)
8、包装
    将ClientRequest扔到KafkaChannel中,等待Selector的发送
9、写入Kafka
    这一步骤是真正的往Kafka的Broker中写数据,回应的规则是
        ~ ack=0:发送出去就立马执行第10步,不等待响应
                 典型的 fire and forget , 性能最好,但也最容易丢数据
        ~ ack=1:发送出去,等到那批数据被写到主副本上时,就成功响应,执行10步骤
                 由于只是写到主副本的页缓存,因此存在丢数据的可能
        ~ ack=-1:发送出去,直到ISR队列中包括主副本在内的min.insync.replicas个副本被写成功,才成功响应,执行10步骤
                  ack=-1搭配min.insync.replicas的结果
                  让kafka的副本复制策略游离在同步复制和异步复制之间
                  既避免了同步复制拖慢性能,又提高了异步复制的可靠性
10、回复 NetworkClient,开始下一个RecordBatch的发送
11、NetworkClient回复 RecordAccumulator,开始下一个RecordBatch的发送。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
http://www.vxiaotou.com