本文以Linux CentOS环境为例,介绍Python版本的Kafka客户端连接指导,包括Kafka客户端安装,以及生产、消费消息。
使用前请参考收集连接信息收集Kafka所需的连接信息。
一般系统预装了Python。在命令行输入python,得到如下回显,说明Python已安装。
[root@ecs-test python-kafka]# python3 Python 3.7.1 (default, Jul 5 2020, 14:37:24) [GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux Type "help", "copyright", "credits" or "license" for more information. >>>
如果未安装Python,请使用以下命令安装:
yum install python
pip install kafka-python==2.0.1
以下加粗内容需要替换为实例自有信息,请根据实际情况替换。
from kafka import KafkaProducer import ssl ##连接信息 conf = { 'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"], 'topic_name': 'topic_name', 'sasl_plain_username': 'username', 'sasl_plain_password': 'password' } context = ssl.create_default_context() context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) context.verify_mode = ssl.CERT_REQUIRED ##证书文件 context.load_verify_locations("phy_ca.crt") print('start producer') producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers'], sasl_mechanism="PLAIN", ssl_context=context, security_protocol='SASL_SSL', sasl_plain_username=conf['sasl_plain_username'], sasl_plain_password=conf['sasl_plain_password']) data = bytes("hello kafka!", encoding="utf-8") producer.send(conf['topic_name'], data) producer.close() print('end producer')
from kafka import KafkaProducer conf = { 'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"], 'topic_name': 'topic-name', } print('start producer') producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers']) data = bytes("hello kafka!", encoding="utf-8") producer.send(conf['topic_name'], data) producer.close() print('end producer')
from kafka import KafkaConsumer import ssl ##连接信息 conf = { 'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"], 'topic_name': 'topic_name', 'sasl_plain_username': 'username', 'sasl_plain_password': 'password', 'consumer_id': 'consumer_id' } context = ssl.create_default_context() context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) context.verify_mode = ssl.CERT_REQUIRED ##证书文件 context.load_verify_locations("phy_ca.crt") print('start consumer') consumer = KafkaConsumer(conf['topic_name'], bootstrap_servers=conf['bootstrap_servers'], group_id=conf['consumer_id'], sasl_mechanism="PLAIN", ssl_context=context, security_protocol='SASL_SSL', sasl_plain_username=conf['sasl_plain_username'], sasl_plain_password=conf['sasl_plain_password']) for message in consumer: print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value)) print('end consumer')
from kafka import KafkaConsumer conf = { 'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"], 'topic_name': 'topic-name', 'consumer_id': 'consumer-id' } print('start consumer') consumer = KafkaConsumer(conf['topic_name'], bootstrap_servers=conf['bootstrap_servers'], group_id=conf['consumer_id']) for message in consumer: print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value)) print('end consumer')
前言 最近越来越多的读者认可我的文章,还是件挺让人高兴的事情。有些读者私信我...
双线主机机房的种类一般有四种: 1、单线单IP实现的是单线路,速度相当于双线路...
您可以根据业务需要购买相应计算能力和存储空间的Redis实例,同时可购买多个Redi...
从传媒高管到创业新兵,在看到音频赛道的巨大价值后,利用5年时间不断打磨商业闭...
据外媒报道,市场研究机构Canalys发布的报告显示,2020年第二季度,中国云基础设...
如何申请一个网站 域名 ?网站域名是客户区别于同类网站的标识,也是客户到访网...
功能介绍 通过流水线或执行步骤前置操作中webhook类型的url触发流水线或stage。 ...
1. 接口描述 接口请求域名: ecdn.tencentcloudapi.com 。 本接口(UpdateDomain...
人们了解道大数据实际上对企业融资有多大影响吗?在探索的过程中可以找到有关这种...
随着公司业务发展,对大数据的获取和实时处理的要求就会越来越高,日志处理、用...