前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >k8s运维: kafka和zookeeper在k8s集群踩的一些坑

k8s运维: kafka和zookeeper在k8s集群踩的一些坑

作者头像
机械视角
发布2021-03-06 22:32:01
3.6K1
发布2021-03-06 22:32:01
举报
文章被收录于专栏:TensorbytesTensorbytes

zookeeper配置istio sidecar后存在的网络不可用问题

如果zookeeper配置了istio sidecar ,在选举阶段就会报connection refused(Connection refused)错误,如下图:

这主要是因为 zookeeper 在server之间通信默认是监听 pod IP 地址,而istio要求监听0.0.0.0,因此需要设置quorumListenOnAllIPs=true

具体问题可以参考:https://istio.io/latest/faq/applications/

这个不止在 zookeeper 中会出现,包括 Apache NiFi 、 Cassandra、 Elasticsearch、Redis 中安装 sidecar 模式都会存在这个问题。

由于docker官方的zookeeper镜像没有提供 quorumListenOnAllIPs 的参数,我们需要直接手动添加,详细参考这个issue: https://github.com/31z4/zookeeper-docker/issues/117

或者可以用 bitnami/zookeeper 这个镜像,这个镜像提供了 quorumListenOnAllIPs 支持,可以通过设置ZOO_LISTEN_ALLIPS_ENABLED环境变量来控制,下面是简单的deployment文件:

代码语言:javascript
复制
kind: Deployment
apiVersion: apps/v1
metadata:
  name: zookeeper-1
  namespace: rcmd
spec:
  replicas: 1
  selector:
    matchLabels:
      app: zookeeper-1
  template:
    metadata:
      labels:
        app: zookeeper-1
    spec:
      containers:
      - name: zookeeper
        image: bitnami/zookeeper:3.6.2
        imagePullPolicy: Always
        ports:
        - containerPort: 2181
        env:
        - name: ALLOW_ANONYMOUS_LOGIN
          value: "yes"
        - name: ZOO_LISTEN_ALLIPS_ENABLED
          value: "true"
        - name: ZOO_SERVER_ID
          value: "1"
        - name: ZOO_SERVERS
          value: 0.0.0.0:2888:3888,zookeeper-2:2888:3888,zookeeper-3:2888:3888
--- 
kind: Deployment
apiVersion: apps/v1
metadata:
  name: zookeeper-2
  namespace: rcmd
spec:
  replicas: 1
  selector:
    matchLabels:
      app: zookeeper-2
  template:
    metadata:
      labels:
        app: zookeeper-2
    spec:
      containers:
      - name: zookeeper
        image: bitnami/zookeeper:3.6.2
        imagePullPolicy: Always
        ports:
        - containerPort: 2181
        env:
        - name: ALLOW_ANONYMOUS_LOGIN
          value: "yes"
        - name: ZOO_LISTEN_ALLIPS_ENABLED
          value: "true"
        - name: ZOO_SERVER_ID
          value: "2"
        - name: ZOO_SERVERS
          value: zookeeper-1:2888:3888,0.0.0.0:2888:3888,zookeeper-3:2888:3888
--- 

kind: Deployment
apiVersion: apps/v1
metadata:
  name: zookeeper-3
  namespace: rcmd
spec:
  replicas: 1
  selector:
    matchLabels:
      app: zookeeper-3
  template:
    metadata:
      labels:
        app: zookeeper-3
    spec:
      containers:
      - name: zookeeper
        image: bitnami/zookeeper:3.6.2
        imagePullPolicy: Always
        ports:
        - containerPort: 2181
        env:
        - name: ALLOW_ANONYMOUS_LOGIN
          value: "yes"
        - name: ZOO_LISTEN_ALLIPS_ENABLED
          value: "true"
        - name: ZOO_SERVER_ID
          value: "3"
        - name: ZOO_SERVERS
          value: zookeeper-1:2888:3888,zookeeper-2:2888:3888,0.0.0.0:2888:3888

再设置好 services 就可以 running 了。

kafka在k8s外网访问设置项

k8s 对外暴露一般都会走ingress,但kafka由于起自身特殊的connect机制,我们需要专门设置kafka让其客户端感知到其目标连接。

kafka 和客户端建立连接:

  • 客户端向 kafka server 发起 findCoordinator 请求,寻找可以建立连接的协调者,server 会返回broker连接地址
  • 客户端获得地址后,会创建该 Broker 的 Socket 连接,并保持心跳上报,连接建立起来之后初始和第一个borker的连接会被关闭

由于 kafka 会主要告诉客户端 broker 的连接地址,因为在对外网开放的时候我们需要把 broker 地址设置成外网可访问的地址,这里以wurstmeister/kafka的kafka为例,可以通过以下设置让外网访问:

代码语言:javascript
复制
kind: Deployment
apiVersion: apps/v1
metadata:
  name: kafka-broker0
  namespace: databases
spec:
  replicas: 1
  selector:
    matchLabels:
        app: kafka
        id: "kafka-broker0"
  template:
    metadata:
      labels:
        app: kafka
        id: "kafka-broker0"
    spec:
      containers:
      - name: kafka
        image: "wurstmeister/kafka:2.12-2.5.0"
        imagePullPolicy: "IfNotPresent"
        env:
        - name: KAFKA_ADVERTISED_LISTENERS
          value: "INSIDE://kafka-broker0:9092,OUTSIDE://kafka.db.tensorbytes.com:10000"
        - name: KAFKA_LISTENERS
          value: "INSIDE://:9092,OUTSIDE://:10000"
        - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
          value: "INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT"
        - name: KAFKA_INTER_BROKER_LISTENER_NAME
          value: "INSIDE"
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: zookeeper-1:2181
        - name: KAFKA_BROKER_ID
          value: "0"
        - name: KAFKA_CREATE_TOPICS
          value: mp_post_slog:1:1
        - name: LOG4J_LOGGER_KAFKA_AUTHORIZER_LOGGER
          value: "DEBUG"
        resources:
          limits:
            cpu: 200m
            memory: 512Mi

环境变量:

代码语言:javascript
复制
KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://_{HOSTNAME_COMMAND}:9094
KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE

istio virtualserver 配置文件:

代码语言:javascript
复制
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: kafka-router
  namespace: databases
spec:
  gateways:
  - db-external-gateway
  hosts:
  - kafka.db.tensorbytes.com
  tcp:
  - match:
    - port: 10000
    route:
    - destination:
        host: kafka-broker0.databases.svc.cluster.local
        port:
          number: 10000

---
apiVersion: v1
kind: Service
metadata:
  name: kafka-broker0
  labels:
    name: kafka
  namespace: rcmd
spec:
  ports:
  - port: 9092
    name: internal-port
    protocol: TCP
    targetPort: 9092
  - port: 10000
    name: external-port
    protocol: TCP
    targetPort: 10000
  selector:
    app: kafka
    id: "kafka-broker0"
  type: ClusterIP
集群外通过网关访问的测试脚本

生产者:

代码语言:javascript
复制
#coding:utf-8
from kafka import KafkaProducer
import random

producer = KafkaProducer(bootstrap_servers='kafka.db.tensorbytes.com:10000')
for i in range(10):
    producer.send('mp_post_slog', key=b'testping', value=b'bar')
producer.flush(timeout=10)

消费者:

代码语言:javascript
复制
#coding:utf-8
from kafka import KafkaConsumer

consumer = KafkaConsumer('mp_post_slog', bootstrap_servers='kafka.db.tensorbytes.com:10000', group_id='my_favorite_group')

for msg in consumer:
    print(msg)
集群内测试脚本

生产者:

代码语言:javascript
复制
#coding:utf-8
from kafka import KafkaProducer
import random

producer = KafkaProducer(bootstrap_servers='kafka-broker0:9092')
for i in range(10):
    producer.send('mp_post_slog', key=b'testping', value=b'bar')
producer.flush(timeout=10)

消费者:

代码语言:javascript
复制
#coding:utf-8
from kafka import KafkaProducer
import random

producer = KafkaProducer(bootstrap_servers='kafka-broker0:9092')
for i in range(10):
    producer.send('mp_post_slog', key=b'testping', value=b'bar')
producer.flush(timeout=10)
本文参与?腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2021-02-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客?前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与?腾讯云自媒体分享计划? ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • zookeeper配置istio sidecar后存在的网络不可用问题
  • kafka在k8s外网访问设置项
    • 集群外通过网关访问的测试脚本
      • 集群内测试脚本
      相关产品与服务
      容器服务
      腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
      http://www.vxiaotou.com