zoukankan      html  css  js  c++  java
  • kafka 教程(三)-远程访问

    远程连接 kafka 配置

    默认的 kafka 配置是无法远程访问的,解决该问题有几个方案。

    方案1

    advertised.listeners=PLAINTEXT://IP:9092

    注意必须是 ip,不能是 hostname

    方案2

    advertised.listeners=PLAINTEXT://node0:9092

    node0 是 hostname,需在 /etc/hosts 中 添加一行

    172.16.89.80 node0

    然后 必须在 远程机(要访问 kafka 的机器 windows)上修改 hosts文件,

    C:WindowsSystem32driversetchosts

    在末尾加上 

    IP1  节点1
    IP2  节点2

    节点名与服务器上的 hostname 相同。

    测试异常记录

    WARN [Consumer clientId=consumer-1, groupId=console-consumer-4184] Connection to node -1 (/172.16.89.80:9092) could not be established. Broker may not be available
    . (org.apache.kafka.clients.NetworkClient)

    kafka 配置 与 console 启动的 ip 不一致,如 配置文件中 listeners=PLAINTEXT://172.16.89.80:9092,启动 是  localhost

    kafka.errors.NoBrokersAvailable: NoBrokersAvailable

    listeners=PLAINTEXT://172.16.89.80:9092

    基础操作

    最简单的场景,生产者发送,消费者接收

    Producer

    send(self, topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None)

    producer = KafkaProducer(bootstrap_servers = '172.16.89.80:9092')
    # producer = KafkaProducer(bootstrap_servers = 'node0:9092')
    # 如果这里是 ip,后面必须加 producer.flush() 或者 producer.close();
    # 如果是 hostname,则不需要,但是好像有丢包
    print(producer.config)  # 打印配置信息
    
    topic = '91202'
    for i in range(200):
        msg = "msg%d" % i
        producer.send(topic, msg)
    
    producer.flush()
    # producer.close()

    上面的注释是我亲测的结果,至于为什么,我还没明白,谁能帮我解答

    Consumer

    topic = '91202'
    consumer = KafkaConsumer(topic, bootstrap_servers=['172.16.89.80:9092'])
    # consumer是一个消息队列,当后台有消息时,这个消息队列就会自动增加.所以遍历也总是会有数据,当消息队列中没有数据时,就会堵塞等待消息带来
    for msg in consumer:
        recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
        print recv
        time.sleep(2)

    注意 这种消费方式只能消费实时数据,不能消费历史数据

    如果想读历史消息,可以这样写

    consumer = KafkaConsumer(topic, auto_offset_reset='earliest', bootstrap_servers=['172.16.89.80:9092'])

     auto_offset_reset:重置 offset,earliest 表示移到最早的可用消息,lastest 为最新消息,默认 latest

    源码定义:{‘smallest’: ‘earliest’, ‘largest’: ‘latest’}

    输出

    生产者不指定分区,消费者输出如下

    91101:1:8: key=None value=msg0
    91101:2:12: key=None value=msg1
    91101:0:17: key=None value=msg2

    3 条消息分到了3个分区

    如果不指定分区,一个 topic 的多个消息会被分发到不同的分区;消费者也收到了所有分区的消息

    生产者指定分区,消费者输出如下

    91101:2:13: key=None value=msg0
    91101:2:14: key=None value=msg1
    91101:2:15: key=None value=msg2

    3 条消息被分到同一个分区

    阻塞发送

    kafka send 消息是异步的,即使 send 发生错误,程序也不会提示,我们可以通过 阻塞 的方式确认是否发送。

    method1-get

    import pickle
    import time
    from kafka import KafkaProducer
    from kafka.errors import kafka_errors
    producer = KafkaProducer(
        bootstrap_servers=['172.16.89.80:9092'],
        key_serializer=lambda k: pickle.dumps(k),
        value_serializer=lambda v: pickle.dumps(v))
    start_time = time.time()
    for i in range(0, 100):
        future = producer.send(topic="9908", key="num", value=i)
        # 同步阻塞,通过调用get()方法保证有序性.
        try:
            record_metadata = future.get(timeout=20)
            # print(record_metadata.topic)
            # print(record_metadata.partition)
            # print(record_metadata.offset)
        except kafka_errors as e:
            print(str(e))
    end_time = time.time()
    time_counts = end_time - start_time
    print(time_counts)

    method2-flush

    import pickle
    import time
    from kafka import KafkaProducer
    producer = KafkaProducer(bootstrap_servers=['172.16.89.80:9092'],
                             key_serializer=lambda k: pickle.dumps(k),
                             value_serializer=lambda v: pickle.dumps(v))
    start_time = time.time()
    for i in range(0, 100):
        future = producer.send('9909', key='num', value=i, partition=0)
    # 将缓冲区的全部消息push到broker当中
    producer.flush()
    producer.close()
    end_time = time.time()
    time_counts = end_time - start_time
    print(time_counts)


    格式化输入输出

    KafkaProducer 指定生产者输入的格式转换方式,key_serializer、value_serializer 用于格式化 key 和 value

    KafkaConsumer 指定消费者输出的格式转换方式,value_deserializer

    指定消费者组

    之前讲过,不同消费者组消费相同的 topic,互不影响;同一个消费者组的不同成员在同一时刻不能消费同一个 topic 相同的分区;一个线程相当于一个消费者      【验证】

    这里来验证一下

    Producer

    不指定分区,20条消息,3个分区

    producer = KafkaProducer(bootstrap_servers = '172.16.89.80:9092')
    print(producer.config)##打印配置信息
    
    topic = '91101'
    for i in range(20):
        msg = "msg%d" % i
        producer.send(topic, msg)
        time.sleep(2)
    producer.close()

    Consumer

    开两个线程,或者两个窗口,指定相同的消费者组,数组名 随便写,一样即可;

    也是实时消费;

    topic = '91101'
    consumer = KafkaConsumer(topic, group_id='111', bootstrap_servers=['172.16.89.80:9092'])
    for msg in consumer:
        recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
        print recv

    输出

    一个消费者取到 0和1 分区的 value

    91101:1:28: key=None value=msg1
    91101:0:51: key=None value=msg3
    91101:0:52: key=None value=msg5
    91101:0:53: key=None value=msg9
    91101:0:54: key=None value=msg10
    91101:1:29: key=None value=msg11
    91101:0:55: key=None value=msg12
    91101:1:30: key=None value=msg15
    91101:0:56: key=None value=msg16
    91101:1:31: key=None value=msg17
    91101:0:57: key=None value=msg18

    一个消费者取到 2 分区的 value

    91101:2:46: key=None value=msg0
    91101:2:47: key=None value=msg2
    91101:2:48: key=None value=msg4
    91101:2:49: key=None value=msg6
    91101:2:50: key=None value=msg7
    91101:2:51: key=None value=msg8
    91101:2:52: key=None value=msg13
    91101:2:53: key=None value=msg14
    91101:2:54: key=None value=msg19

    也就是没有消费不同的分区,结论正确。

    由于只有2个消费者,3个分区,所以必须有一个消费者消费2个分区;

    如果是3个消费者应该就不存在这种情况,经验证,确实如此;

    如果有再多的消费者,就分不到消息了;

    这也验证了 之前讲的,同一个topic,不推荐多于 partition 个数的 消费者来消费,会造成资源浪费。        【验证】

    小结

    1. 消费者不指定组,能收到所有分区的 消息

    2. 如果指定了组,同组的不同消费者会消费不同的分区

    3. 如果2个分区2个消费者,则一人一个分区 ;如果2个分区3个消费者,则有一个人收不到消息;

    4. 如果想消费同一分区,指定不同的组

    这种特性可以用作 负载均衡

    设定 offset

    kafka 提供了 偏移量 offset 的概念,根据偏移量可以读取 终端开启前 未接收的数据, 也可以读取任意位置的数据  【可读取历史数据】

    为了验证效果,操作如下

    1. 创建一个新的 topic

    2. 开启生产者,不开启消费者,并发送一些数据

    3. 开启消费者,并指定 分区 和 偏移量,设置偏移量为 0

    4. 再次运行消费者,指定相同的分区,偏移量设置为 4

    topic = '91107'
    consumer = KafkaConsumer(group_id='111', bootstrap_servers=['172.16.89.80:9092'])
    
    # consumer 指定主题和分区
    consumer.assign([TopicPartition(topic, partition=0), TopicPartition(topic, partition=1)])       #
    
    # 获取主题的分区信息,
    print consumer.partitions_for_topic(topic)  # None
    
    # 获取 consumer 的指定
    print consumer.assignment() # set([TopicPartition(topic='91107', partition=0), TopicPartition(topic='91107', partition=1)])
    
    # 获取 consumer 指定分区 的 起始offset
    print consumer.beginning_offsets(consumer.assignment()) 
    # {TopicPartition(topic=u'91107', partition=0): 0, TopicPartition(topic=u'91107', partition=1): 0}
    
    consumer.seek(TopicPartition(topic, partition=1), 4)
    for msg in consumer:
        recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
        print recv

    输出

    # offset 0
    91107:1:0: key=None value=msg2
    91107:1:1: key=None value=msg6
    91107:1:2: key=None value=msg10
    91107:1:3: key=None value=msg11
    91107:1:4: key=None value=msg13
    91107:1:5: key=None value=msg19
    
    # offset 4
    91107:1:4: key=None value=msg13
    91107:1:5: key=None value=msg19

    相当于一个文件中从头到尾 6 行,offset 4 从第 4 行开始读,【行数从 0 开始】

    异常记录

    kafka.errors.IllegalStateError: IllegalStateError: You must choose only one way to configure your consumer: (1) subscribe to specific topics by name, (2) subscribe to topics matching a regex pattern, (3) assign itself specific topic-partitions.

    KafkaConsumer 和 assign 不能同时 指定 topic

    小结

    1. offset 相当于文件中的 offset 概念

    2. 指定 offset 时,必须指定分区,一个分区相当于一个文件,指定分区就相当于指定文件,offset 表示 从文件中 offset 行开始读

    3. offset 功能可以读取 历史数据

    定时主动拉取

    主动拉取可能拉不到

    from kafka import KafkaConsumer
    import time
    
    # consumer = KafkaConsumer(bootstrap_servers=['node0:9092'])      # 这样写只能获取最新消息
    consumer = KafkaConsumer(bootstrap_servers=['node0:9092'], auto_offset_reset='earliest')        # 这样可以从头拉取
    consumer.subscribe(topics=('91201','91202'))
    
    while True:
        msg = consumer.poll(timeout_ms=5)   # 从kafka获取消息
        print(msg.keys())
        print('*' * 100)
        time.sleep(2)

    主动从多个 topic 拉取数据

    输出

    []
    ****************************************************************************************************
    [TopicPartition(topic=u'91201', partition=1), TopicPartition(topic=u'91201', partition=0)]
    ****************************************************************************************************
    [TopicPartition(topic=u'91201', partition=0), TopicPartition(topic=u'91201', partition=2)]
    ****************************************************************************************************
    [TopicPartition(topic=u'91202', partition=1), TopicPartition(topic=u'91201', partition=2)]

    可以看到有的回合没有拉倒数据

    消息挂起与恢复

    挂起,消费者不能消费,恢复后,才能消费

    from kafka import KafkaConsumer
    from kafka.structs import TopicPartition
    import time
    
    topic = '91202'
    consumer = KafkaConsumer(bootstrap_servers=['node0:9092'])
    consumer.subscribe(topics=(topic))
    consumer.topics()
    
    consumer.pause(TopicPartition(topic=topic, partition=0))  # pause执行后,consumer不能读取,直到调用resume后恢复。
    num = 0
    while True:
        print(num)
        print(consumer.paused())   # 获取当前挂起的消费者
        msg = consumer.poll(timeout_ms=5)
        print(msg)
        time.sleep(2)
        num = num + 1
        if num == 10:
            print("resume...")
            consumer.resume(TopicPartition(topic=topic, partition=0))
            print("resume......")

    参考资料:

    https://www.cnblogs.com/tigerzhouv587/p/11232398.html  python 发送kafka

    https://kafka-python.readthedocs.io/en/master/usage.html

    https://blog.csdn.net/luanpeng825485697/article/details/81036028    好全的资料

    https://www.jianshu.com/p/776c188cefa9  从 zookeeper 消费

    https://cloud.tencent.com/developer/news/202116  也挺全的资料

  • 相关阅读:
    学习CLR Via C#的一些体会
    ScrollView动画滚动
    使用blend自定义symbol
    Silverlight中消除ToolTip的白色背景
    nil,NULL,NSNull的区别
    app store,Mac app store 下载加速的方法
    发布时NSLog不打印信息
    TestFlight的使用步骤
    “Could not change executable permissions on the application”的原因和解决方法
    iOS6地图“查看路线”、导航功能的实现
  • 原文地址:https://www.cnblogs.com/yanshw/p/11493576.html
Copyright © 2011-2022 走看看