前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >一文读懂springboot整合kafka

一文读懂springboot整合kafka

原创
作者头像
QGS
发布2024-05-03 14:12:35
2900
发布2024-05-03 14:12:35
举报
文章被收录于专栏:QGS星球QGS星球

安装kafka

启动Kafka本地环境需Java 8+以上

Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。

Kafka启动方式有Zookeeper和Kraft,两种方式只能选择其中一种启动,不能同时使用。

Kafka下载https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz

解压tar -xzf kafka_2.13-3.7.0.tgz

一、Zookeeper启动Kafka(kafka内置zookeeper)

Kafka依赖Zookeeper

1、启动Zookeeper 2、启动Kafka

使用kafka自带Zookeeper启动

./zookeeper-server-start.sh ../config/zookeeper.properties &

./zookeeper-server-stop.sh ../config/zookeeper.properties

./kafka-server-start.sh ../config/server.properties &

./kafka-server-stop.sh ../config/server.properties

二、Zookeeper服务器启动Kafka

Zookeeper服务器安装

https://zookeeper.apache.org/

https://dlcdn.apache.org/zookeeper/zookeeper-3.9.2/apache-zookeeper-3.9.2-bin.tar.gz

tar zxvf apache-zookeeper-3.9.2-bin.tar.gz

配置Zookeeper服务器

cp zoo_sample.cfg zoo.cfg

启动Zookeeper服务器

./zkServer.sh start

修改Zookeeper端口

Zoo.cfg添加内容

admin.serverPort=8099

apache-zookeeper-3.9.2-bin/bin目录下重启Zookeeper

Zookeeper服务器启动kafka

/opt/kafka_2.13-3.7.0/bin目录下

./kafka-server-start.sh ../config/server.properties &

Kafka配置文件server.properties

三、使用KRaft启动Kafka

UUID通用唯一识别码(Universally Unique Identifier)

1、生成Cluster UUID(集群UUID):./kafka-storage.sh random-uuid

2.格式化kafka日志目录:./kafka-storage.sh format -t 3pMJGNJcT0uLIBsZhbucjQ -c ../config/kraft/server.properties

3.启动kafka:./kafka-server-start.sh ../config/kraft/server.properties &

springboot集成kafka

创建topic时,若不指定topic的分区(partition)数量使,则默认为1个分区(partition)

修改server.properties文件

vim server.properties

listeners=PLAINTEXT://0.0.0.0:9092

advertised.listeners=PLAINTEXT://192.168.68.133:9092

springboot加入依赖kafka

<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>

加入spring-kafka依赖后,springboot自动装配好kafkaTemplate的Bean

代码语言:java
复制
application.yml配置连接kafka
spring:
 kafka:
 bootstrap-servers: 192.168.68.133:9092

生产者

发送消息

代码语言:java
复制
@Resource
private KafkaTemplate<String,String> kafkaTemplate;

@Test
void kafkaSendTest(){
 kafkaTemplate.send("kafkamsg01","hello kafka");
}

消费者

接收消息

代码语言:java
复制
@Component
public class KafkaConsumer {

 @KafkaListener(topics = {"kafkamsg01","test"},groupId = "123")
 public void consume(String message){
 System.out.println("接收到消息:"+message);
 }

}

若没有配置groupid

Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is java.lang.IllegalStateException: No group.id found in consumer config, container properties, or @KafkaListener annotation; a group.id is required when group management is used.

代码语言:java
复制
@Component
public class KafkaConsumer {

 @KafkaListener(topics = {"kafkamsg01","test"},groupId = "123")
 public void consume(String message){
 System.out.println("接收到消息:"+message);
 }

}

想从第一条消息开始读取(若同组的消费者已经消费过该主题,并且kafka已经保存了该消费者组的偏移量,则设置auto.offset.reset设置为earliest不生效,需要手动修改偏移量或使用新的消费者组)

application.yml需要将auto.offset.reset设置为earliest

代码语言:java
复制
spring:
  kafka:
  bootstrap-servers: 192.168.68.133:9092
  consumer:
  auto-offset-reset: earliest

Earliest:将偏移量重置为最早的偏移量

Latest: 将偏移量重置为最新的偏移量

None: 没有为消费者组找到以前的偏移量,向消费者抛出异常

Exception: 向消费者抛出异常

脚本重置消费者组偏移量

代码语言:txt
复制
./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group 123 --topic kafkamsg01 --reset-offsets --to-earliest –execute

重置完成

我正在参与2024腾讯技术创作特训营最新征文,快来和我瓜分大奖!

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 安装kafka
  • springboot集成kafka
    • 修改server.properties文件
      • springboot加入依赖kafka
        • 生产者
          • 消费者
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
          http://www.vxiaotou.com