zoukankan      html  css  js  c++  java
  • 使用Python进行 kafka的生产与消费

    kafka的生产与消费

    在生产前需要  .需要创建一个topic,和消费的的groupid

    比如可以在kafka管理系统中创建,不需要手动敲命令创建

    1.创建topic和绑定消费组

     2.kafka的生产

    import json
    from kafka import KafkaProducer
    
    
    topic_name="active_user_simplified"  #生产的topic
    kafka_addr = '172.17.9.151:9092,172.17.9.157:9092,172.17.9.155:9092'
    
    
    def create_kafaka():
        """
        写入kafka数据,useractive数据,如果用户活跃,useractiveUpdata服务就会消费
        :return:
        """
        producer = KafkaProducer(bootstrap_servers=kafka_addr,
                                 value_serializer=lambda m: json.dumps(m).encode())
        for i in range(1):
    
            data = {
                "app_key": "07b6ed26c8bcce540204c8f7",
                "uid": 8004540429,
                "platform": "W",
                "type": "active_terminate",
                "itime": 1608977776
            }
            result=producer.send(topic_name, data)
    
            print(result)
    
    
    create_kafaka()

    3.kafka的消费

    from kafka import KafkaConsumer
    
    topic_name="active_user_simplified"
    kafka_addr = '172.17.9.151:9092,172.17.9.157:9092,172.17.9.155:9092'
    
    
    
    def consumer_kafaka():
        group_id = "group-segment-useractiveUpdate"
        consumer = KafkaConsumer(topic_name,
                                 bootstrap_servers=kafka_addr,
                                 group_id=group_id,
                                 auto_offset_reset='earliest')
        for msg in consumer:
            print(msg.value)
    
    
    
    consumer_kafaka()

     消费数据的日志

  • 相关阅读:
    JSTL之迭代标签库
    java中的IO流
    浅谈代理模式
    TreeSet集合
    网络编程之UDP协议
    Java中的多线程
    Java中的面向对象
    JavaScript 函数表达式
    JavaScript 数据属性和访问器属性
    JavaScript 正则表达式语法
  • 原文地址:https://www.cnblogs.com/chongyou/p/14184261.html
Copyright © 2011-2022 走看看