zoukankan      html  css  js  c++  java
  • python生产和消费kafka数据

    安装kafka-python

    pip install kafka-python
    

    生产者

    from kafka import KafkaProducer  # 有时候导入包会报错,使用pip uninstall kafka-python,卸载后重装可以解决
    import json
    
    # 创建producer对象
    producer = KafkaProducer(
                                value_serializer=lambda v: json.dumps(v).encode('utf-8'),  # 对发送的数据进行序列化处理
                                bootstrap_servers=['192.168.0.189:9092','192.168.0.190:9092','192.168.0.191:9092']  # 安装了kafka的集群
                             )
    for i in range(10):
        # 创建 data
        data={
            "name":"李四",
            "age":23,
            "gender":"男",
            "id":i
        }
    
        # 将data发送到kafka,主题'test_topic'(自定义)
        producer.send('test_topic', data)
    
    producer.close()
    

    消费者

    from kafka import KafkaConsumer
    import json
    
    # 建立消费者对象
    consumer = KafkaConsumer('test_topic',  # 与消费者中发送消息的 topic对应
                             bootstrap_servers=['192.168.0.189:9092','192.168.0.190:9092','192.168.0.191:9092'],
                             value_deserializer=json.loads  # 反序列化数据
                             )
    
    # 生产者中send()一次数据,消费者中就会接收到一次数据,所以需要遍历
    for message in consumer:
        print(message.value)  # 通过.value方法获取到值
    
    consumer.close()
    

    注:有时候建立 生产者 或消费者 对象时会报错,反复多试几次就可以建立成功,具体什么原因还得多研究,后续补充

    参考链接

  • 相关阅读:
    损失函数相关
    半监督学习
    自监督学习
    leetcode相关
    深度学习中的Normalization
    TCN
    用户行为序列相关
    损失函数loss相关
    MapReduce编程之实例分析:wordCount
    MapReduce编程之初学概念篇
  • 原文地址:https://www.cnblogs.com/jaysonteng/p/14182755.html
Copyright © 2011-2022 走看看