1.包装?ProducerRecord 对象
Kafka 会将发送消息包装为 ProducerRecord 对象, ProducerRecord 对象包含了目标主题和要发送的内容,同时还可以指定键和分区。在发送 ProducerRecord 对象前,生产者会先把键和值对象序列化成字节数组,这样它们才能够在网络上传输。
2.指定分区
接下来,数据被传给分区器。如果之前已经在 ProducerRecord 对象里指定了分区,那么分区器就不会再做任何事情。如果没有指定分区 ,那么分区器会根据 ProducerRecord 对象的键来选择一个分区,紧接着,这条记录被添加到一个记录批次里,这个批次里的所有消息会被发送到相同的主题和分区上。
3.放入缓存
分好区的消息不是直接被发送到服务端,而是放入了生产者的一个缓存里面。在这个缓存里面,多条消息会被封装成为一个批次(batch),默认一个批次的大小是 16K。
4.发送消息
Sender 线程启动以后会从缓存里面去获取可以发送的批次。把这些记录批次发送到相应的 broker 上。
?
5.接收返回
服务器在收到这些消息时会返回一个响应。如果消息成功写入 Kafka,就返回一个 RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入失败,则会返回一个错误。生产者在收到错误之后会尝试重新发送消息,如果达到指定的重试次数后还没有成功,则直接抛出异常,不再重试
?
生产者需要用序列化器(Serializer)把对象转换成字节数组才能通过网络发送给Kaflca。 而在对侧, 消费者需要用反序列化器(Deserializer)把从Kaflca 中收到的字节数组转换成相应的对象。消息的key和value都使用了字符串, 对应程序中的 序列化器也使用了客户端自带的org.apache.kaflca. common. serialization. StringSerializer, 除了用于 String 类型的序列化器,还有ByteArray、ByteBuffer、 Bytes、 Double、Integer、 Long这几种类 型, 它们都实现了org.apache.kaflca. common. serialization. Serializer接口, 此接口有3个方法:
public void configure(Map<String, ?> configs, boolean isKey)
public byte[] serialize(String topic, T data)
public void close()
configure()方法用来配置当前类,serialize()方法用来执行序列化操作。 而close()方法用来关闭当前的序列化器, 一般情况下close()是一个空方法, 如果实现了此方法, 则必须确保此方法的幕等性, 因为这个方法很可能会被KafkaProducer 调用多次。 生产者使用的序列化器和消费者使用的反序列化器是需要一一对应的, 如果生产者使用了 某种序列化器, 比如StringSerializer, 而消费者使用了另 一种序列化器, 比如IntegerSerializer,那么是无法解析出想要的数据的
kakfa支持配置自定义序列化:只需将KafkaProducer的value.serializer 参数设置为CompanySerializer类的全限定名即可。
public int partition(S七ring topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster);
public void close()
如果key不为null,那么分区号会是所有分区中的任意一个,如果为null则仅会为可用分区中的任意一个
public ProducerRecord<K, V> onSend (ProducerRecord<K, V> record );
public void onAcknowledgement(RecordMetadata metadata , Exception exception );
public void close() ;
KafkaProducer 在将消息序列化和计算分区 前会调 生产者拦截器 onSend() 方法来对消息进行相应 定制化操作。一般来说最好不要修改消息 ProducerRecord 的topic和partition 等信息,如果要修改,则需确保对其有准确的判断,否则会与预想的效果出现偏 差。比如修改 key 不仅会影响分区的计算,同样会影响 broker 端日志压缩( Log Compaction) 的功能
send()本身是异步的,但是调用send()后可以通过代码实现同步还是异步,异步:一旦消息被保存在等待发送的消息缓存
中,此方法就立即返回。这样并行发送多条消息而不阻塞去等待每一条消息的响应。当然也可以使用同步发送但是性能差,不推荐
简单同步发送实现方法:
在调用send方法后直接调用get方法强行堵塞
RecordMetadata metadata = producer.send(record).get();
异步实现
通常我们并不关心发送成功的情况,更多关注的是失败的情况,因此 Kafka 提供了异步发送和回调函数。 代码如下:
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.out.println("进行异常处理");
} else {
System.out.printf("topic=%s, partition=%d, offset=%s \n",
metadata.topic(), metadata.partition(), metadata.offset());
}
}
});
在kafka生产者中大部分的参数都有合理的默认值,一般不需要修改它们
(1)acks=1
默认值即为1?。生产者发送消息之后,只要分区的 leader 副本成功写入消 息,那么它就会收到来自服务端的成功响应,如果消息无法写入 leader 副本,比如在 leader副本崩溃、重新选举新的 leader 副本的过程中,那么生产者就会收到一个错误的响应,为了避免消息丢失,生产者可以选择重发消息。如果消息写入 leader 副本并 返回成功响应给生产者,且在被其他 fo llo wer 副本拉取之前 leader 副本崩溃,那么此时消息还是会丢失,因为新选举的 leader 副本中并没有这条对应的消息 acks 设置为1,是消息可靠性和吞吐量之间的折中方案
(2)acks = 0
?
(3)acks = -1或acks =all
生产者在消 息发送之后,需要等待 ISR 中的所有副本都成功写入消息之后才能够收到来自服务端的成功响应。在其他配置环境相同的情况下, acks 设置为 (all )可以达到最强的可靠性。但这并不意味着消息就一定可靠,因为?JSR 中可能只有 leader 副本,这样就退化成了 acks=1?的情况。要获得更高的消息 可靠性需要配合 min.insync.replicas 参数的联动
注意 acks 参数配置的值是一个字符串类型,而不是整数类型
该参数用于控制生产者发送的请求大小,它可以指发送的单个消息的最大值,kafka默认的发送一条消息的大小是1M
发生错误后,消息重发的次数。如果达到设定值,生产者就会放弃重试并返回错误。默认是0,即在发生异常的时候不进行任何重试动作。消息在从生产者发出到成功写入服务器之前可能发生一些临时性的异常, 比如网络抖动、 le der 副本的选举等,这种异常往往是可以自行恢复的,生产者可以通过配置 retries 大于0值,以此通过内部重试来恢复而不是一昧地将异常抛给生产者的应用程序。但是不是所有异常都能处理,比如超过消息最大值的异常
retry.backoff.ms用来设置两次重试之间的间隔
这个参数用来指定生产者发送 ProducerBatch 之前等待更多消息( ProducerRecord )加入 Producer Batch 时间,默认值为 。生产者客户端会在 ProducerBatch 填满或等待时间超过 linger.ms 值时发迭出去。增大这个参数的值会增加消息的延迟,但是同时能提升一定的吞吐量。
timeout.ms 指定了 borker 等待同步副本返回消息的确认时间;
request.timeout.ms 指定了生产者在发送数据时等待服务器返回响应的时间;
metadata.fetch.timeout.ms 指定了生产者在获取元数据(比如分区首领是谁)时等待服务器返回响应的时间。
?
【51CTO.com快译】Mint和Ubuntu是初学者当中最受欢迎的两种Linux发行版。它们之...
目? 录 引? 言 1? 性能测试的基本概念 2? 性能测试的分类 2.1 负载测试 2.2 压力...
这年头,你看到的东西未必就是你认为的东西。一个mysql协议的后面,可能是tidb;...
一. XSL入门 1.XSL---XML的样式表 HTML网页使用预先确定的标识(tags),这就是说...
01.分区表性能 PostgreSQL 对分区表的支持由来已久。在 10.0 之前,分区表需要用...
1.从小到大,我最拿手、也是最拿得出手的魔术就是把钱变没。 2.生活就像心电图...
比较吃力的地方是drop到播放列表,查MSDN查了n久,还有WindowMediaPlayer6.x插件...
从Zabbix 2.X版本开始,就已经可以监控VMware的产品了,但要求VMware vSphere的...
让我帮你决定。 长期以来,数据湖在业界引起了极大的轰动,通常被视为解决所有数...
今天, 谷歌正式推送了Chrome 89稳定版,其修复了不少问题,比如继续降低对内存...