zoukankan      html  css  js  c++  java
  • 04_kafka python客户端_Producer模拟

    使用的python库: kafka-python

    安装方式: pip install kafka-python

    简单的模拟Producer

    """
    Kafka Producer Test
    using kafka-python library
    """
    # -*- encoding: utf-8 -*-
    # Author: shayzhang@sina.com
    
    # import KafkaProducer class
    from kafka import KafkaProducer
    # import KafkaError class
    from kafka.errors import KafkaTimeoutError
    # time for message timestamp
    import time
    
    
    def main():
        # 创建producer实例,并传入bootstrap_servers列表(brokers), 修改producer实例配置
        producer = KafkaProducer(bootstrap_servers=["192.168.229.100:9092", "192.168.229.101:9092", "192.168.229.102:9092"])
    
        # topic to be published
        topic = 'ctopic'
    
        # message value to be published, must be bytes type
        msg = bytes('hello_from_python', encoding='utf-8')
        # for python2:  msg = b'hello_from_python'
    
        # message key, must be bytes type
        # used to determine which partition the message will be stored
        key = bytes('shay', encoding='utf-8')
        # for python2:  key = b'shay'
    
        # Async send, default
        try:
    
            # get partitions for the topic
            partition_set = producer.partitions_for(topic)
            for e in partition_set:
                print("Partition: " + str(e))
                # print 'Partition: '+ str(e)
    
            future = producer.send(topic, msg, key, partition=None, timestamp_ms=time.time())
            # block until all records are sent to cluster
            producer.flush()
    
            print("Message Send!")
            # print "Message send!"
        except  KafkaTimeoutError:
            print("Kafka Timeout")
            # print("Kafka Timeout")
    
    
    if __name__ == '__main__':
        main()

    在集群上任选1个节点,开启console-consumer,  运行该py文件

    Consumer收到该数据

  • 相关阅读:
    centos7 使用postgres
    centos7 Authentication failure
    centos 安装 jdk PostgreSQL
    JS遍历对象或者数组
    PHP简单 对象(object) 与 数组(array) 的转换
    IDEA 配置Tomcat 跑Jeecg项目
    kettle 安装mysql 驱动
    : Could not open a connection to your authentication agent
    java_Ninja实战过程
    设备判断
  • 原文地址:https://www.cnblogs.com/shay-zhangjin/p/8012253.html
Copyright © 2011-2022 走看看