zoukankan      html  css  js  c++  java
  • docker安装kafka及使用

    一、下载镜像

    docker pull wurstmeister/zookeeper
    docker pull wurstmeister/kafka

    二、先启动zookeeper

    #单机方式
    docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper

    三、启动kafka

    #单机方式
    docker run -d --name kafka
    -p 9092:9092
    -e KAFKA_BROKER_ID=0
    -e KAFKA_ZOOKEEPER_CONNECT=10.0.0.101:2181
    -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://10.0.0.101:9092
    -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka

    四、创建一个topic(使用代码次步可省略)

    #进入容器
    docker exec -it ${CONTAINER ID} /bin/bash
    cd opt/bin
    #单机方式:创建一个主题
    bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic mykafka
    #运行一个生产者
    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mykafka
    #运行一个消费者
    bin/kafka-console-consumer.sh --zookeeper zookeeper:2181 --topic mykafka --from-beginning

    五、kafka设置分区数量

    #分区数量的作用:有多少分区就能负载多少个消费者,生产者会自动分配给分区数据,每个消费者只消费自己分区的数据,每个分区有自己独立的offset
    #进入kafka容器
    vi opt/kafka/config/server.properties
    修改run.partitions=2
    #退出容器
    ctrl+p+q
    #重启容器
    docker restart kafka

    #修改指定topic
    ./kafka-topics.sh --zookeeper localhost:2181 --alter --partitions 3 --topic topicname

    六、python代码

    #生产者
    from kafka import KafkaProducer
    import json
    import datetime

    topic='test'
    producer = KafkaProducer(bootstrap_servers='10.0.0.101:9092',value_serializer=lambda m:json.dumps(m).encode("utf-8")) # 连接kafka
    # 参数bootstrap_servers:指定kafka连接地址
    # 参数value_serializer:指定序列化的方式,我们定义json来序列化数据,当字典传入kafka时自动转换成bytes
    # 用户密码登入参数
    # security_protocol="SASL_PLAINTEXT"
    # sasl_mechanism="PLAIN"
    # sasl_plain_username="maple"
    # sasl_plain_password="maple"

    for i in range(1000):
      data={"num":i,"ts":datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
      producer.send(topic,data)

    producer.close()
    #消费者
    from kafka import KafkaConsumer
    import time

    topic = 'test'
    consumer = KafkaConsumer(topic, bootstrap_servers=['10.0.0.101:9092'], group_id="test", auto_offset_reset="earliest")
    # 参数bootstrap_servers:指定kafka连接地址
    # 参数group_id:如果2个程序的topic和group_id相同,那么他们读取的数据不会重复,2个程序的topic相同,group_id不同,那么他们各自消费相同的数据,互不影响
    # 参数auto_offset_reset:默认为latest表示offset设置为当前程序启动时的数据位置,earliest表示offset设置为0,在你的group_id第一次运行时,还没有offset的时候,给你设定初始offset。一旦group_id有了offset,那么此参数就不起作用了


    for msg in consumer:
      recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
      print(recv)
      # time.sleep(1)
    #运行3个消费者结果
    test:0:3212: key=None value=b'{"num": 981, "ts": "2021-02-23 16:38:14"}'
    test:0:3213: key=None value=b'{"num": 982, "ts": "2021-02-23 16:38:14"}'
    test:0:3214: key=None value=b'{"num": 987, "ts": "2021-02-23 16:38:14"}'
    test:0:3215: key=None value=b'{"num": 997, "ts": "2021-02-23 16:38:14"}'
    test:0:3216: key=None value=b'{"num": 998, "ts": "2021-02-23 16:38:14"}'
    test:0:3217: key=None value=b'{"num": 999, "ts": "2021-02-23 16:38:14"}'

    test:1:353: key=None value=b'{"num": 970, "ts": "2021-02-23 16:38:14"}'
    test:1:354: key=None value=b'{"num": 977, "ts": "2021-02-23 16:38:14"}'
    test:1:355: key=None value=b'{"num": 978, "ts": "2021-02-23 16:38:14"}'
    test:1:356: key=None value=b'{"num": 979, "ts": "2021-02-23 16:38:14"}'
    test:1:357: key=None value=b'{"num": 984, "ts": "2021-02-23 16:38:14"}'
    test:1:358: key=None value=b'{"num": 985, "ts": "2021-02-23 16:38:14"}'
    test:1:359: key=None value=b'{"num": 994, "ts": "2021-02-23 16:38:14"}'

    test:2:317: key=None value=b'{"num": 989, "ts": "2021-02-23 16:38:14"}'
    test:2:318: key=None value=b'{"num": 990, "ts": "2021-02-23 16:38:14"}'
    test:2:319: key=None value=b'{"num": 991, "ts": "2021-02-23 16:38:14"}'
    test:2:320: key=None value=b'{"num": 992, "ts": "2021-02-23 16:38:14"}'
    test:2:321: key=None value=b'{"num": 993, "ts": "2021-02-23 16:38:14"}'
    test:2:322: key=None value=b'{"num": 995, "ts": "2021-02-23 16:38:14"}'
    test:2:323: key=None value=b'{"num": 996, "ts": "2021-02-23 16:38:14"}'

     

  • 相关阅读:
    重写不受限制
    类的向上转型(安全)向下转型(不安全)
    类的继承之构造函数和析构函数的顺序
    父类和子类的截断现象
    派生类重写方法
    运算符重载总结(大全)
    运算符重载方法3
    运算符重载方法2
    Shell基础编程
    TCP Wrappers(简单防火墙)---限制IP登录ssh
  • 原文地址:https://www.cnblogs.com/angelyan/p/14445710.html
Copyright © 2011-2022 走看看