当前位置:主页 > 查看内容

SpringBoot集成阿里云Kafka 示例

发布时间:2021-07-01 00:00| 位朋友查看

简介:Step By Step 1、kafka控制台创建公网类型实例 2、创建SpringBoot项目集成阿里云Kafka 3、发送接收测试 一、kafka控制台创建公网类型实例 1.1 Kafka 控制台 创建实例 1.2 获取认证参数 二、创建SpringBoot项目集成阿里云Kafka 2.1 创建Spring Boot(2.5.2)项……
Step By Step

1、kafka控制台创建公网类型实例
2、创建SpringBoot项目集成阿里云Kafka
3、发送接收测试

一、kafka控制台创建公网类型实例

1.1 Kafka控制台创建实例

图片.png

1.2 获取认证参数

图片.png

图片.png

二、创建SpringBoot项目集成阿里云Kafka

2.1 创建Spring Boot(2.5.2)项目

图片.png

2.2 依赖

 properties 
 java.version 1.8 /java.version 
 /properties 
 dependencies 
 dependency 
 groupId org.springframework.boot /groupId 
 artifactId spring-boot-starter-web /artifactId 
 /dependency 
 dependency 
 groupId org.springframework.kafka /groupId 
 artifactId spring-kafka /artifactId 
 /dependency 
 dependency 
 groupId org.springframework.boot /groupId 
 artifactId spring-boot-starter-test /artifactId 
 scope test /scope 
 /dependency 
 dependency 
 groupId org.springframework.kafka /groupId 
 artifactId spring-kafka-test /artifactId 
 scope test /scope 
 /dependency 
 /dependencies 

2.3 Sender.class

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class Sender {
 @Autowired
 private KafkaTemplate String, String template;
 public void send(String msg) {
 this.template.sendDefault("my_msg", msg);
 System.out.println("send message:" + msg);
}

2.4 Receiver.class

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class Receiver {
 @KafkaListener(topics = { "taro_topic" }) // 参数配置要监听的Topic
 public void receiveMessage(ConsumerRecord String, String record) {
 System.out.println("Receive Message");
 System.out.println("【*** Message: ***】key = " + record.key() + "、value = " + record.value());
}

2.5 KafkaController.class

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class KafkaController {
 @Autowired
 private Sender sender;
 @PostMapping("/send/{msg}") // 发送消息测试,注意此处为Post
 public String send(@PathVariable("msg") String msg) {
 sender.send(msg);
 return msg;
}

2.6 application.yml

spring:
 kafka:
 template:
 default-topic: topic 
 bootstrap-servers: SSL接入点 
 jaas:
 enabled: true
 loginModule: org.apache.kafka.common.security.plain.PlainLoginModule
 options:
 username: 用户名 
 password: 密码 
 consumer:
 ssl:
 truststoreLocation: file:/kafka.client.truststore.jks
 properties:
 sasl.mechanism: PLAIN
 security.protocol: SASL_SSL
 ssl.endpoint.identification.algorithm:
 group-id: group 
 max-poll-records: 2
 producer:
 ssl:
 truststoreLocation: file:/kafka.client.truststore.jks
 retries: 3
 acks: 1
 compression-type: lz4
 buffer-memory: 33554432
 batch-size: 51200
 properties:
 send.buffer.bytes: 262144
 sasl.mechanism: PLAIN
 security.protocol: SASL_SSL
 ssl.endpoint.identification.algorithm:
kafka.client.truststore.jks 下载地址,证书下载后直接放在C盘根目录下。

2.7 项目结构

图片.png

三、发送接收测试

3.1 启动项目,使用PostMan发送Post请求

图片.png

3.2 项目日志

图片.png

3.3 控制台消息监控查看

图片.png

更多参考

SSL接入点PLAIN机制收发消息


本文转自网络,原文链接:https://developer.aliyun.com/article/784990
本站部分内容转载于网络,版权归原作者所有,转载之目的在于传播更多优秀技术内容,如有侵权请联系QQ/微信:153890879删除,谢谢!

推荐图文

  • 周排行
  • 月排行
  • 总排行

随机推荐