消息队列,英文名:Message Queue,经常缩写为MQ。从字面上来理解,消息队列是一种用来存储消息的队列。来看一下下面的代码:
// 1. 创建一个保存字符串的队列 QueuestringQueue = new LinkedList();
// 2. 往消息队列中放入消息 stringQueue.offer( "hello" );
// 3. 从消息队列中取出消息并打印 System.out.println(stringQueue.poll());
上述代码,创建了一个队列,先往队列中添加了一个消息,然后又从队列中取出了一个消息。这说明了队列是可以用来存取消息的。
我们可以简单理解消息队列就是将需要传输的数据存放在队列中。
消息队列中间件就是用来存储消息的软件(组件)。举个例子来理解,为了分析网站的用户行为,我们需要记录用户的访问日志。这些一条条的日志,可以看成是一条条的消息,我们可以将它们保存到消息队列中。将来有一些应用程序需要处理这些日志,就可以随时将这些消息取出来处理。
目前市面上的消息队列有很多,例如:Kafka、RabbitMQ、ActiveMQ、RocketMQ、ZeroMQ等。
消息队列的应用场景
电商网站中,新的用户注册时,需要将用户的信息保存到数据库中,同时还需要额外发送注册的邮件通知、以及短信注册码给用户。但因为发送邮件、发送注册短信需要连接外部的服务器,需要额外等待一段时间,此时,就可以使用消息队列来进行异步处理,从而实现快速响应。
image.png
image.png
1.1.1.1 ?日志处理(大数据领域常见)
大型电商网站(淘宝、京东、国美、苏宁...)、App(抖音、美团、滴滴等)等需要分析用户行为,要根据用户的访问行为来发现用户的喜好以及活跃情况,需要在页面上收集大量的用户访问信息。
image.png
image.png
image.png
image.png
image.png
点对点模式特点:
每个消息只有一个接收者(Consumer)(即一旦被消费,消息就不再在消息队列中)
发送者和接收者间没有依赖性,发送者发送消息之后,不管有没有接收者在运行,都不会影响到发送者下次发送消息;
接收者在成功接收消息之后需向队列应答成功,以便消息队列删除当前接收的消息;
image.png
image.png
Kafka是由Apache软件基金会开发的一个开源流平台,由Scala和Java编写。Kafka的Apache官网是这样介绍Kakfa的。
Apache Kafka是一个分布式流平台。一个分布式的流平台应该包含3点关键的能力:
1.?发布和订阅流数据流,类似于消息队列或者是企业消息传递系统
2.?以容错的持久化方式存储数据流
处理数据流
1.?Publish and subscribe:发布与订阅
2.?Store:存储
3.?Process:处理
我们通常将Apache Kafka用在两类程序:
1.?建立实时数据管道,以可靠地在系统或应用程序之间获取数据
2.?构建实时流应用程序,以转换或响应数据流
image.png
上图,我们可以看到:
1.?Producers:可以有很多的应用程序,将消息数据放入到Kafka集群中。
2.?Consumers:可以有很多的应用程序,将消息数据从Kafka集群中拉取出来。
3.?Connectors:Kafka的连接器可以将数据库中的数据导入到Kafka,也可以将Kafka的数据导出到
数据库中。
4.?Stream Processors:流处理器可以Kafka中拉取数据,也可以将数据写入到Kafka中。
Kafka比ActiveMQ牛逼得多
特性 | ActiveMQ | RabbitMQ | Kafka | RocketMQ |
---|---|---|---|---|
所属社区/公司 | Apache | Mozilla Public License | Apache | Apache/Ali |
成熟度 | 成熟 | 成熟 | 成熟 | 比较成熟 |
生产者-消费者模式 | 支持 | 支持 | 支持 | 支持 |
发布-订阅 | 支持 | 支持 | 支持 | 支持 |
REQUEST-REPLY | 支持 | 支持 | - | 支持 |
API完备性 | 高 | 高 | 高 | 低(静态配置) |
多语言支持 | 支持JAVA优先 | 语言无关 | 支持,JAVA优先 | 支持 |
单机呑吐量 | 万级(最差) | 万级 | 十万级 | 十万级(最高) |
消息延迟 | - | 微秒级 | 毫秒级 | - |
可用性 | 高(主从) | 高(主从) | 非常高(分布式) | 高 |
消息丢失 | - | 低 | 理论上不会丢失 | - |
消息重复 | - | 可控制 | 理论上会有重复 | - |
事务 | 支持 | 不支持 | 支持 | 支持 |
文档的完备性 | 高 | 高 | 高 | 中 |
提供快速入门 | 有 | 有 | 有 | 无 |
首次部署难度 | - | 低 | 中 | 高 |
可以注意到Kafka的版本号为:kafka_2.12-2.4.1,因为kafka主要是使用scala语言开发的,2.12为scala的版本号。http://kafka.apache.org/downloads可以查看到每个版本的发布时间。
image.png
image.png
image.png
image.png
image.png
目录名称 | 说明 |
---|---|
bin | Kafka的所有执行脚本都在这里。例如:启动Kafka服务器、创建Topic、生产者、消费者程序等等 |
config | Kafka的所有配置文件 |
libs | 运行Kafka所需要的所有JAR包 |
logs | Kafka的所有日志文件,如果Kafka出现一些问题,需要到该目录中去查看异常信息 |
site-docs | Kafka的网站帮助文件 |
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
基于1个分区1个副本的基准测试
测试步骤:
1.?启动Kafka集群
2.?创建一个1个分区1个副本的topic: benchmark
3.?同时运行生产者、消费者基准测试程序
4.?观察结果
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
<**repositories**><!--?代码库?-->??
????<**repository**>??
????????<**id**>central</**id**>??
????????<**url**>http://maven.aliyun.com/nexus/content/groups/public//</**url**>??
????????<**releases**>??
????????????<**enabled**>true</**enabled**>??
????????</**releases**>??
????????<**snapshots**>??
????????????<**enabled**>true</**enabled**>??
????????????<**updatePolicy**>always</**updatePolicy**>??
????????????<**checksumPolicy**>fail</**checksumPolicy**>??
????????</**snapshots**>??
????</**repository**>??
</**repositories**>??
??
<**dependencies**>??
????<!--?kafka客户端工具?-->??
????<**dependency**>??
????????<**groupId**>org.apache.kafka</**groupId**>??
????????<**artifactId**>kafka-clients</**artifactId**>??
????????<**version**>2.4.1</**version**>??
????</**dependency**>??
??
????<!--?工具类?-->??
????<**dependency**>??
????????<**groupId**>org.apache.commons</**groupId**>??
????????<**artifactId**>commons-io</**artifactId**>??
????????<**version**>1.3.2</**version**>??
????</**dependency**>??
??
????<!--?SLF桥接LOG4J日志?-->??
????<**dependency**>??
????????<**groupId**>org.slf4j</**groupId**>??
????????<**artifactId**>slf4j-log4j12</**artifactId**>??
????????<**version**>1.7.6</**version**>??
????</**dependency**>??
??
????<!--?SLOG4J日志?-->??
????<**dependency**>??
????????<**groupId**>log4j</**groupId**>??
????????<**artifactId**>log4j</**artifactId**>??
????????<**version**>1.2.16</**version**>??
????</**dependency**>??
</**dependencies**>??
??
<**build**>??
????<**plugins**>??
????????<**plugin**>??
????????????<**groupId**>org.apache.maven.plugins</**groupId**>??
????????????<**artifactId**>maven-compiler-plugin</**artifactId**>??
????????????<**version**>3.7.0</**version**>??
????????????<**configuration**>??
????????????????<**source**>1.8</**source**>??
????????????????<**target**>1.8</**target**>??
????????????</**configuration**>??
????????</**plugin**>??
????</**plugins**>??
</**build**>
将log4j.properties配置文件放入到resources文件夹中
**log4j.rootLogger**=**INFO,stdout****??
****log4j.appender.stdout**=**org.apache.log4j.ConsoleAppender****??
****log4j.appender.stdout.layout**=**org.apache.log4j.PatternLayout****??
****log4j.appender.stdout.layout.ConversionPattern**=?**%5p?-?%m%n**
image.png
**public?class**?KafkaProducerTest?{??
????**public?static?void**?main(String[]?args)?{??
????????//?1.?创建用于连接Kafka的Properties配置??
????????Properties?props?=?**new**?Properties();??
????????props.put(?**"bootstrap.servers"**?,?**"192.168.88.100:9092"**?);??
????????props.put(?**"acks"**?,?**"all"**?);??
????????props.put(?**"key.serializer"**?,?**"org.apache.kafka.common.serialization.StringSerializer"**?);??
????????props.put(?**"value.serializer"**?,?**"org.apache.kafka.common.serialization.StringSerializer"**?);??
??
????????//?2.?创建一个生产者对象KafkaProducer??
????????KafkaProducer<String,?String>?producer?=?**new**?KafkaProducer<String,?String>(props);??
??
????????//?3.?调用send发送1-100消息到指定Topic?test??
????????**for**(**int**?i?=?0;?i?<?100;?++i)?{??
????????????**try**?{??
????????????????//?获取返回值Future,该对象封装了返回值??
????????????????Future<RecordMetadata>?future?=?producer.send(**new**?ProducerRecord<String,?String>(?**"test"**?,?**null**,?i?+?**""**?));??
????????????????//?调用一个Future.get()方法等待响应??
????????????????future.get();??
????????????}?**catch**?(InterruptedException?e)?{??
????????????????e.printStackTrace();??
????????????}?**catch**?(ExecutionException?e)?{??
????????????????e.printStackTrace();??
????????????}??
????????}??
??
????????//?5.?关闭生产者??
????????producer.close();??
????}??
}
image.png
image.png
**public?class**?KafkaProducerTest?{??
????**public?static?void**?main(String[]?args)?{??
????????//?1.?创建用于连接Kafka的Properties配置??
????????Properties?props?=?**new**?Properties();??
????????props.put(?**"bootstrap.servers"**?,?**"node1.itcast.cn:9092"**?);??
????????props.put(?**"acks"**?,?**"all"**?);??
????????props.put(?**"key.serializer"**?,?**"org.apache.kafka.common.serialization.StringSerializer"**?);??
????????props.put(?**"value.serializer"**?,?**"org.apache.kafka.common.serialization.StringSerializer"**?);??
??
????????//?2.?创建一个生产者对象KafkaProducer??
????????KafkaProducer<String,?String>?producer?=?**new**?KafkaProducer<String,?String>(props);??
??
????????//?3.?调用send发送1-100消息到指定Topic?test??
????????**for**(**int**?i?=?0;?i?<?100;?++i)?{??
????????????**try**?{??
????????????????//?获取返回值Future,该对象封装了返回值??
????????????????Future<RecordMetadata>?future?=?producer.send(**new**?ProducerRecord<String,?String>(?**"test"**?,?**null**,?i?+?**""**?));??
????????????????//?调用一个Future.get()方法等待响应??
????????????????future.get();??
????????????}?**catch**?(InterruptedException?e)?{??
????????????????e.printStackTrace();??
????????????}?**catch**?(ExecutionException?e)?{??
????????????????e.printStackTrace();??
????????????}??
????????}??
??
????????//?5.?关闭生产者??
????????producer.close();??
????}??
}
**public?class**?KafkaProducerTest?{??
????**public?static?void**?main(String[]?args)?{??
????????//?1.?创建用于连接Kafka的Properties配置??
????????Properties?props?=?**new**?Properties();??
????????props.put(?**"bootstrap.servers"**?,?**"node1.itcast.cn:9092"**?);??
????????props.put(?**"acks"**?,?**"all"**?);??
????????props.put(?**"key.serializer"**?,?**"org.apache.kafka.common.serialization.StringSerializer"**?);??
????????props.put(?**"value.serializer"**?,?**"org.apache.kafka.common.serialization.StringSerializer"**?);??
??
????????//?2.?创建一个生产者对象KafkaProducer??
????????KafkaProducer<String,?String>?producer?=?**new**?KafkaProducer<String,?String>(props);??
??
????????//?3.?调用send发送1-100消息到指定Topic?test??
????????**for**(**int**?i?=?0;?i?<?100;?++i)?{??
????????????//?一、同步方式??
????????????//?获取返回值Future,该对象封装了返回值??
????????????//?Future<RecordMetadata>?future?=?producer.send(new?ProducerRecord<String,?String>("test",?null,?i?+?""));??
????????????//?调用一个Future.get()方法等待响应??
????????????//?future.get();??
??
????????????//?二、带回调函数异步方式??
????????????producer.send(**new**?ProducerRecord<String,?String>(?**"test"**?,?**null**,?i?+?**""**?),?**new**?Callback()?{??
????????????????@Override??
????????????????**public?void**?onCompletion(RecordMetadata?metadata,?Exception?exception)?{??
????????????????????**if**(exception?!=?**null**)?{??
????????????????????????System.***out***.println(?**"**?**发送消息出现异常**?**"**?);??
????????????????????}??
????????????????????**else**?{??
????????????????????????String?topic?=?metadata.topic();??
????????????????????????**int**?partition?=?metadata.partition();??
????????????????????????**long**?offset?=?metadata.offset();??
??
????????????????????????System.***out***.println(?**"**?**发送消息到**?**Kafka**?**中的名字为**?**"**?+?topic?+?**"**?**的主题,第**?**"**?+?partition?+?**"**?**分区,第**?**"**?+?offset?+?**"**?**条数据成功**?**!"**?);??
????????????????????}??
????????????????}??
????????????});??
????????}??
??
????????//?5.?关闭生产者??
????????producer.close();??
????}??
}
image.png
image.png
image.png
ZK用来管理和协调broker,并且存储了Kafka的元数据(例如:有多少topic、partition、consumer)
ZK服务主要用于通知生产者和消费者Kafka集群中有新的broker加入、或者Kafka集群中出现故障的broker。
Kafka正在逐步想办法将ZooKeeper剥离,维护两套集群成本较高,社区提出KIP-500就是要替换掉ZooKeeper的依赖。“Kafka on Kafka”——Kafka自己来管理自己的元数据
生产者负责将数据推送给broker的topic
消费者负责从broker的topic中拉取数据,并自己进行处理
image.png
image.png
image.png
image.png
image.png
image.png
//?3.?发送1-100数字到Kafka的test主题中??
**while**(**true**)?{??
????**for**?(**int**?i?=?1;?i?<=?100;?++i)?{??
????????//?注意:send方法是一个异步方法,它会将要发送的数据放入到一个buffer中,然后立即返回??
????????//?这样可以让消息发送变得更高效??
????????producer.send(**new**?ProducerRecord<>(?**"test"**?,?i?+?**""**?));??
????}??
????Thread.*sleep*(3000);??
}
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
*//?1.?创建消费者*
????**public**?**static**?Consumer?**<**?String,?String?**>**??createConsumer()?{
?????????*//?1.?创建Kafka消费者配置*
????????Properties?**props**?**=**?**new**?Properties();
????????**props**.setProperty(?**"**?**bootstrap.servers**?**"**?,??**"**?**node1.itcast.cn:9092**?**"**?);
????????**props**.setProperty(?**"**?**group.id**?**"**?,??**"**?**ods_user**?**"**?);
????????**props**.put(?**"**?**isolation.level**?**"**?,?**"**?**read_committed**?**"**?);
????????**props**.setProperty(?**"**?**enable.auto.commit**?**"**?,??**"**?**false**?**"**?);
????????**props**.setProperty(?**"**?**key.deserializer**?**"**?,??**"**?**org.apache.kafka.common.serialization.StringDeserializer**?**"**?);
????????**props**.setProperty(?**"**?**value.deserializer**?**"**?,??**"**?**org.apache.kafka.common.serialization.StringDeserializer**?**"**?);
?
?????????*//?2.?创建Kafka消费者*
????????KafkaConsumer<String,?String>?**consumer**?**=**?**new**?KafkaConsumer<>(props);
?
?????????*//?3.?订阅要消费的主题*
????????**consumer**.subscribe(**Arrays**.asList(?**"**?**ods_user**?**"**?));
????????
????????**return**?consumer;
}
编写一个方法 createProducer,返回一个生产者对象。注意:需要配置事务的id,开启了事务会默认开启幂等性。
image.png
image.png
实现步骤:
1.?调用之前实现的方法,创建消费者、生产者对象
2.?生产者调用initTransactions初始化事务
3.?编写一个while死循环,在while循环中不断拉取数据,进行处理后,再写入到指定的topic
(1)?生产者开启事务
(2)?消费者拉取消息
(3)?遍历拉取到的消息,并进行预处理(将1转换为男,0转换为女)
(4)?生产消息到dwd_user topic中
(5)?提交偏移量到事务中
(6)?提交事务
(7)?捕获异常,如果出现异常,则取消事务
**public**?**static**?void?main(String[]?args)?{
????????Consumer<String,?String>?**consumer**?**=**?createConsumer();
????????Producer<String,?String>?**producer**?**=**?createProducer();
?????????*//?初始化事务*
????????**producer**.initTransactions();
?
????????**while**(true)?{
????????????**try**?{
?????????????????*//?1.?开启事务*
????????????????**producer**.beginTransaction();
?????????????????*//?2.?定义Map结构,用于保存分区对应的offset*
????????????????Map<TopicPartition,?OffsetAndMetadata>?**offsetCommits**?**=**?**new**?HashMap<>();
?????????????????*//?2.?拉取消息*
????????????????ConsumerRecords<String,?String>?**records**?**=**?**consumer**.poll(**Duration**.ofSeconds(2));
????????????????**for**?(ConsumerRecord<String,?String>?**record**??**:**??records)?{
?????????????????????*//?3.?保存偏移量*
????????????????????**offsetCommits**.put(**new**?TopicPartition(**record**.topic(),?**record**.partition()),
????????????????????????????**new**?OffsetAndMetadata(**record**.offset()?+?1));
?????????????????????*//?4.?进行转换处理*
????????????????????String[]?**fields**?**=**?**record**.value().split(?**"**?**,**?**"**?);
????????????????????fields[1]?**=**?fields[1].equalsIgnoreCase(?**"**?**1**?**"**?)??**?**???**"**?**男**?**"**?**:**?**"**?**女**?**"**?;
????????????????????String?**message**?**=**?fields[0]??**+**???**"**?**,**?**"**???**+**??fields[1]??**+**???**"**?**,**?**"**???**+**??fields[2];
?????????????????????*//?5.?生产消息到dwd_user*
????????????????????**producer**.send(**new**?ProducerRecord<>(?**"**?**dwd_user**?**"**?,?message));
????????????????}
?????????????????*//?6.?提交偏移量到事务*
????????????????**producer**.sendOffsetsToTransaction(offsetCommits,??**"**?**ods_user**?**"**?);
?????????????????*//?7.?提交事务*
????????????????**producer**.commitTransaction();
????????????}?**catch**?(Exception?**e**)?{
?????????????????*//?8.?放弃事务*
????????????????**producer**.abortTransaction();
????????????}
????????}
????}
image.png
image.png
生产者写入消息到topic,Kafka将依据不同的策略将数据分配到不同的分区中
1.?轮询分区策略
2.?随机分区策略
3.?按key分区分配策略
4.?自定义分区策略
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
指标 | 意义 |
---|---|
Brokers Spread | broker使用率 |
Brokers Skew | 分区是否倾斜 |
Brokers Leader Skew | leader partition是否存在倾斜 |
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
指标**** | 单分区单副本(ack=0)**** | 单分区单副本(ack=1)**** | 单分区单副本(ack=-1/all)**** |
---|---|---|---|
吞吐量 | 165875.991109/s每秒16.5W条记录 | 93092.533979/s每秒9.3W条记录 | 73586.766156 /s每秒7.3W调记录 |
吞吐速率 | 158.19 MB/sec | 88.78 MB/sec | 70.18 MB |
平均延迟时间 | 192.43 ms | 346.62 ms | 438.77 ms |
最大延迟时间 | 670.00 ms | 1003.00 ms | 1884.00 ms |
image.png
image.png
指标**** | 单分区单副本(ack=0)**** | 单分区单副本(ack=1)**** |
---|---|---|
吞吐量 | 165875.991109 records/sec每秒16.5W条记录 | 93092.533979 records/sec每秒9.3W条记录 |
吞吐速率 | 158.19 MB/sec每秒约160MB数据 | 88.78 MB/sec每秒约89MB数据 |
平均延迟时间 | 192.43 ms avg latency | 346.62 ms avg latency |
最大延迟时间 | 670.00 ms max latency | 1003.00 ms max latency |