zoukankan      html  css  js  c++  java
  • kubernetes部署kafka集群

    一.kafka介绍

    kafka是一个分布式、多副本、多订阅者、分区的,基于zoopkeeper协调的分布式日志系统。其主要特点为:
    1.以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上的数据也能保证常数时间的访问性能。
    2.高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K消息的传输。
    3.支持kafka server间的消息分区以及分布式消费,同时保证每个partition内的消息顺序和传输。
    4.同时支持离线数据处理和实时数据处理。

    二.应用场景

    1.日志收集
    2.数据推送
    3.作为大缓冲区使用
    4.服务中间件

    三.应用架构


    如上图所示,一个kafka集群包含若干个Producer(服务器日志、业务数据、Web前端产生的page view等),若干个Broker(kafka支持水平扩展,一般broker数量越多,集群的吞吐量越大),若干个consumer group,一个zookeeper集群(kafka通过zookeeper管理集群配置、选举leader、consumer group等发生变化时进行rebalance)。

    3.1 名词解释

    • broker
      消息中间件处理节点(服务器),一个节点就是一个broker,一个kafka集群由一个或多个broker组成

    • Topic
      kafka对消息进行归类,发送到集群的每一条消息都要指定一个topic

    • Partition
      物理上的概念,每个topic包含一个或多个partition,一个partition对应一个文件夹,这个文件夹下存储partition的数据和索引文件,每个partition内部是有序的。

    • Producer
      生产者,负责发布消息到broker

    • Consumer
      消费者,从broker读取消息

    • ConsumerGroup
      每个consumer属于一个特定的consumer group,可为每个consumer指定group name,若不指定,则属于默认的group,一条消息可以发送到不同的consumer group,但一个consumer group中只能有一个consumer能消费这条消息。

    四.kubernetes集群部署kafka

    4.1 部署前准备

    • 创建好的至少三个节点的kubernetes集群(这里我们使用的版本1.13.10)
    • 创建好的存储类StorageClass(这里我们使用的是cephfs)

    4.2 部署yaml文件

    1.部署zookeeper的yaml文件

    [root@k8s001 kafka]# cat zookeeper.yaml 
    apiVersion: v1
    kind: Service
    metadata:
      name: zk-hs
      namespace: kafka
      labels:
        app: zk
    spec:
      ports:
      - port: 2888
        name: server
      - port: 3888
        name: leader-election
      clusterIP: None
      selector:
        app: zk
    ---
    apiVersion: v1
    kind: Service
    metadata:
      name: zk-cs
      namespace: kafka
      labels:
        app: zk
    spec:
      ports:
      - port: 2181
        name: client
      selector:
        app: zk
    ---
    apiVersion: policy/v1beta1
    kind: PodDisruptionBudget
    metadata:
      name: zk-pdb
      namespace: kafka
    spec:
      selector:
        matchLabels:
          app: zk
      maxUnavailable: 1
    ---
    apiVersion: apps/v1
    kind: StatefulSet
    metadata:
      name: zk
      namespace: kafka
    spec:
      selector:
        matchLabels:
          app: zk
      serviceName: zk-hs
      replicas: 3
      updateStrategy:
        type: RollingUpdate
      podManagementPolicy: Parallel
      template:
        metadata:
          labels:
            app: zk
        spec:
          nodeSelector:
              travis.io/schedule-only: "kafka"
          tolerations:
          - key: "travis.io/schedule-only"
            operator: "Equal"
            value: "kafka"
            effect: "NoSchedule"
          - key: "travis.io/schedule-only"
            operator: "Equal"
            value: "kafka"
            effect: "NoExecute"
            tolerationSeconds: 3600
          - key: "travis.io/schedule-only"
            operator: "Equal"
            value: "kafka"
            effect: "PreferNoSchedule"
          affinity:
            podAntiAffinity:
              requiredDuringSchedulingIgnoredDuringExecution:
                - labelSelector:
                    matchExpressions:
                      - key: "app"
                        operator: In
                        values:
                        - zk
                  topologyKey: "kubernetes.io/hostname"
          containers:
          - name: kubernetes-zookeeper
            imagePullPolicy: IfNotPresent
            image: fastop/zookeeper:3.4.10
            resources:
              requests:
                memory: "200Mi"
                cpu: "0.1"
            ports:
            - containerPort: 2181
              name: client
            - containerPort: 2888
              name: server
            - containerPort: 3888
              name: leader-election
            command:
            - sh
            - -c
            - "start-zookeeper 
              --servers=3 
              --data_dir=/var/lib/zookeeper/data 
              --data_log_dir=/var/lib/zookeeper/data/log 
              --conf_dir=/opt/zookeeper/conf 
              --client_port=2181 
              --election_port=3888 
              --server_port=2888 
              --tick_time=2000 
              --init_limit=10 
              --sync_limit=5 
              --heap=512M 
              --max_client_cnxns=60 
              --snap_retain_count=3 
              --purge_interval=12 
              --max_session_timeout=40000 
              --min_session_timeout=4000 
              --log_level=INFO"
            readinessProbe:
              exec:
                command:
                - sh
                - -c
                - "zookeeper-ready 2181"
              initialDelaySeconds: 10
              timeoutSeconds: 5
            livenessProbe:
              exec:
                command:
                - sh
                - -c
                - "zookeeper-ready 2181"
              initialDelaySeconds: 10
              timeoutSeconds: 5
            volumeMounts:
            - name: datadir
              mountPath: /var/lib/zookeeper
          # 这里我们需要将runAsuser和fsGroup用户调整为0,也就是管理员用户允许,否则会提示权限的报错
          securityContext:
            runAsUser: 0
            fsGroup: 0
      volumeClaimTemplates:
      - metadata:
          name: datadir
        spec:
          accessModes: [ "ReadWriteMany" ]
          storageClassName: cephfs
          resources:
            requests:
              storage: 20Gi
    

    2.部署kafka的yaml文件

    [root@k8s001 kafka]# cat kafka.yaml 
    ---
    apiVersion: v1
    kind: Service
    metadata:
      name: kafka-svc
      namespace: kafka
      labels:
        app: kafka
    spec:
      ports:
      - port: 9092
        name: server
      clusterIP: None
      selector:
        app: kafka
    ---
    apiVersion: policy/v1beta1
    kind: PodDisruptionBudget
    metadata:
      name: kafka-pdb
      namespace: kafka
    spec:
      selector:
        matchLabels:
          app: kafka
      minAvailable: 2
    ---
    apiVersion: apps/v1
    kind: StatefulSet
    metadata:
      name: kafka
      namespace: kafka
    spec:
      selector:
         matchLabels:
            app: kafka
      serviceName: kafka-svc
      replicas: 3
      template:
        metadata:
          labels:
            app: kafka
        spec:
          nodeSelector:
              travis.io/schedule-only: "kafka"
          tolerations:
          - key: "travis.io/schedule-only"
            operator: "Equal"
            value: "kafka"
            effect: "NoSchedule"
          - key: "travis.io/schedule-only"
            operator: "Equal"
            value: "kafka"
            effect: "NoExecute"
            tolerationSeconds: 3600
          - key: "travis.io/schedule-only"
            operator: "Equal"
            value: "kafka"
            effect: "PreferNoSchedule"
          affinity:
            podAntiAffinity:
              requiredDuringSchedulingIgnoredDuringExecution:
                - labelSelector:
                    matchExpressions:
                      - key: "app"
                        operator: In
                        values: 
                        - kafka
                  topologyKey: "kubernetes.io/hostname"
            podAffinity:
              preferredDuringSchedulingIgnoredDuringExecution:
                 - weight: 1
                   podAffinityTerm:
                     labelSelector:
                        matchExpressions:
                          - key: "app"
                            operator: In
                            values: 
                            - zk
                     topologyKey: "kubernetes.io/hostname"
          terminationGracePeriodSeconds: 300
          containers:
          - name: k8s-kafka
            imagePullPolicy: IfNotPresent
            image: fastop/kafka:2.2.0
            resources:
              requests:
                memory: "600Mi"
                cpu: 500m
            ports:
            - containerPort: 9092
              name: server
            command:
            - sh
            - -c
            - "exec kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=${HOSTNAME##*-} 
              --override listeners=PLAINTEXT://:9092 
              --override zookeeper.connect=zk-0.zk-hs.kafka.svc.cluster.local:2181,zk-1.zk-hs.kafka.svc.cluster.local:2181,zk-2.zk-hs.kafka.svc.cluster.local:2181 
              --override log.dir=/var/lib/kafka 
              --override auto.create.topics.enable=true 
              --override auto.leader.rebalance.enable=true 
              --override background.threads=10 
              --override compression.type=producer 
              --override delete.topic.enable=false 
              --override leader.imbalance.check.interval.seconds=300 
              --override leader.imbalance.per.broker.percentage=10 
              --override log.flush.interval.messages=9223372036854775807 
              --override log.flush.offset.checkpoint.interval.ms=60000 
              --override log.flush.scheduler.interval.ms=9223372036854775807 
              --override log.retention.bytes=-1 
              --override log.retention.hours=168 
              --override log.roll.hours=168 
              --override log.roll.jitter.hours=0 
              --override log.segment.bytes=1073741824 
              --override log.segment.delete.delay.ms=60000 
              --override message.max.bytes=1000012 
              --override min.insync.replicas=1 
              --override num.io.threads=8 
              --override num.network.threads=3 
              --override num.recovery.threads.per.data.dir=1 
              --override num.replica.fetchers=1 
              --override offset.metadata.max.bytes=4096 
              --override offsets.commit.required.acks=-1 
              --override offsets.commit.timeout.ms=5000 
              --override offsets.load.buffer.size=5242880 
              --override offsets.retention.check.interval.ms=600000 
              --override offsets.retention.minutes=1440 
              --override offsets.topic.compression.codec=0 
              --override offsets.topic.num.partitions=50 
              --override offsets.topic.replication.factor=3 
              --override offsets.topic.segment.bytes=104857600 
              --override queued.max.requests=500 
              --override quota.consumer.default=9223372036854775807 
              --override quota.producer.default=9223372036854775807 
              --override replica.fetch.min.bytes=1 
              --override replica.fetch.wait.max.ms=500 
              --override replica.high.watermark.checkpoint.interval.ms=5000 
              --override replica.lag.time.max.ms=10000 
              --override replica.socket.receive.buffer.bytes=65536 
              --override replica.socket.timeout.ms=30000 
              --override request.timeout.ms=30000 
              --override socket.receive.buffer.bytes=102400 
              --override socket.request.max.bytes=104857600 
              --override socket.send.buffer.bytes=102400 
              --override unclean.leader.election.enable=true 
              --override zookeeper.session.timeout.ms=6000 
              --override zookeeper.set.acl=false 
              --override broker.id.generation.enable=true 
              --override connections.max.idle.ms=600000 
              --override controlled.shutdown.enable=true 
              --override controlled.shutdown.max.retries=3 
              --override controlled.shutdown.retry.backoff.ms=5000 
              --override controller.socket.timeout.ms=30000 
              --override default.replication.factor=1 
              --override fetch.purgatory.purge.interval.requests=1000 
              --override group.max.session.timeout.ms=300000 
              --override group.min.session.timeout.ms=6000 
              --override inter.broker.protocol.version=2.2.0 
              --override log.cleaner.backoff.ms=15000 
              --override log.cleaner.dedupe.buffer.size=134217728 
              --override log.cleaner.delete.retention.ms=86400000 
              --override log.cleaner.enable=true 
              --override log.cleaner.io.buffer.load.factor=0.9 
              --override log.cleaner.io.buffer.size=524288 
              --override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 
              --override log.cleaner.min.cleanable.ratio=0.5 
              --override log.cleaner.min.compaction.lag.ms=0 
              --override log.cleaner.threads=1 
              --override log.cleanup.policy=delete 
              --override log.index.interval.bytes=4096 
              --override log.index.size.max.bytes=10485760 
              --override log.message.timestamp.difference.max.ms=9223372036854775807 
              --override log.message.timestamp.type=CreateTime 
              --override log.preallocate=false 
              --override log.retention.check.interval.ms=300000 
              --override max.connections.per.ip=2147483647 
              --override num.partitions=4 
              --override producer.purgatory.purge.interval.requests=1000 
              --override replica.fetch.backoff.ms=1000 
              --override replica.fetch.max.bytes=1048576 
              --override replica.fetch.response.max.bytes=10485760 
              --override reserved.broker.max.id=1000 "
            env:
            - name: KAFKA_HEAP_OPTS
              value : "-Xmx512M -Xms512M"
            - name: KAFKA_OPTS
              value: "-Dlogging.level=INFO"
            volumeMounts:
            - name: datadir
              mountPath: /var/lib/kafka
            readinessProbe:
              tcpSocket:
                port: 9092
              timeoutSeconds: 1
              initialDelaySeconds: 5
          securityContext:
            runAsUser: 1000
            fsGroup: 1000
      volumeClaimTemplates:
      - metadata:
          name: datadir
        spec:
          accessModes: [ "ReadWriteMany" ]
          storageClassName: cephfs
          resources:
            requests:
              storage:  20Gi
    

    4.3 部署

    这里zookeeper和kafka都是有状态的服务,不能使用deployment和rc这种控制器来部署,这里我们使用statefulset来部署zookeeper和kafka服务。

    4.3.1 给节点打标签

    这里我们想在哪几台机器上来运行kafka,需要对节点进行打标签。

    kubectl label node [node-name] travis.io/schedule-only=kafka 
    

    当然,如果我们如果不想在哪些节点运行kafka,可以通过配置污点来进行。

    kubectl taint node [node-name] travis.io/schedule-only=kafka:NoSchedule
    

    4.3.2 创建命名空间

    [root@k8s001 kafka]# kubectl create ns kafka
    

    4.3.3 创建zookeeper服务

    # 创建zookeeper服务
    [root@k8s001 kafka]# kubectl apply -f zookeeper.yaml
    # 查看zookeeper服务运行状态
    [root@k8s001 kafka]# kubectl get pod -n kafka
    NAME      READY   STATUS        RESTARTS   AGE
    zk-0      1/1     Running       0          7m8s
    zk-1      1/1     Running       0          7m8s
    zk-2      1/1     Running       0          7m8s
    

    4.3.4 创建kafka服务

    [root@k8s001 kafka]# kubectl apply -f kafka.yaml
    [root@k8s001 kafka]# kubectl get pod -n kafka
    NAME      READY   STATUS    RESTARTS   AGE
    kafka-0   1/1     Running   0          11m
    kafka-1   1/1     Running   0          11m
    kafka-2   1/1     Running   0          10m
    zk-0      1/1     Running   0          6m44s
    zk-1      1/1     Running   0          6m44s
    zk-2      1/1     Running   0          6m44s
    

    4.3.5 测试

    测试zookeeper:
    kubectl exec -it zk-0 -n kafka -- zkServer.sh status
    kubectl exec -it zk-0 -n kafka -- zkCli.sh create /hello world
    kubectl delete -f zookeeper.yaml
    kubectl apply -f zookeeper.yaml
    kubectl exec -it zk-0 -n kafka -- zkCli.sh get /hello
    测试kafka:
    kubectl exec -it kafka-0 -n kafka -- bash 
    >kafka-topics.sh --create 
    --topic test 
    --zookeeper zk-0.zk-hs.kafka.svc.cluster.local:2181,zk-1.zk-hs.kafka.svc.cluster.local:2181,zk-2.zk-hs.kafka.svc.cluster.local:2181 
    --partitions 3 
    --replication-factor 2
    kafka-topics.sh --list --zookeeper zk-0.zk-hs.kafka.svc.cluster.local:2181,zk-1.zk-hs.kafka.svc.cluster.local:2181,zk-2.zk-hs.kafka.svc.cluster.local:2181
    kafka-console-consumer.sh --topic test --bootstrap-server localhost:9092
    # 另起一个窗口,进入kafka-1容器
    kubectl exec -it kafka-1 -n kafka -- bash
    >kafka-console-producer.sh --topic test --broker-list localhost:9092
    随便输入内容,观察kafka-0启动的kafka-console-consumer.sh的输出。
    
  • 相关阅读:
    [模板] 循环数组的最大子段和
    [最短路][几何][牛客] [国庆集训派对1]-L-New Game
    [洛谷] P1866 编号
    1115 Counting Nodes in a BST (30 分)
    1106 Lowest Price in Supply Chain (25 分)
    1094 The Largest Generation (25 分)
    1090 Highest Price in Supply Chain (25 分)
    树的遍历
    1086 Tree Traversals Again (25 分)
    1079 Total Sales of Supply Chain (25 分 树
  • 原文地址:https://www.cnblogs.com/yuhaohao/p/14098523.html
Copyright © 2011-2022 走看看