zoukankan      html  css  js  c++  java
  • kafka 生产,消费的的几个小问题

    1. 生产的代码比较简单:
    import json
    from kafka import KafkaProducer
    from decimal import Decimal
    import decimal
    
    class DecimalEncoder(json.JSONEncoder):
        def default(self, o):
            if isinstance(o, decimal.Decimal):
                return float(o)
            super(DecimalEncoder, self).default(o)
    
    def producer():
        host_port = 'localhost:9095'
        topic = '1111'
        producer = KafkaProducer(bootstrap_servers=[host_port], value_serializer=lambda v: json.dumps(v).encode('utf-8'))
        msg_dict = {
            "sleep_time": 10,
            "db_config": {
                "database": "test_1",
                "host": "xxxx",
                "user": "root",
                "password": "root"
            },
            "table": "msg",
            "msg": "Hello World"
        }
        msg_dict = {'price_cost': Decimal('6729.0716'), 'location_id': 193, 'product_id': 842892}
    
        msg = json.dumps(msg_dict,cls=DecimalEncoder)
        producer.send(topic, msg, partition=0)
        print('send success')
        producer.close()
    
    
    if __name__ == '__main__':
        producer()
    
    1. 消费的代码涉及到的东西比较多,个人觉得群组功能挺不错的:
    import json
    from pykafka import KafkaClient
    def customer2():
    
        hosts = 'localhost:9095,localhost:9092'
        client = KafkaClient(hosts=hosts)
        topic = client.topics['1111']
        consumer = topic.get_simple_consumer(consumer_group=b'123456', auto_commit_interval_ms=1,
                                             auto_commit_enable=True)
        for msg in consumer:
            a = json.loads(msg.value,encoding='utf-8')
            print(a)
    if __name__ == '__main__':
        customer2()
    

    问答:

    1. kafka,在没有消费端消费消息的情况下,生产消息,启动消费端获取不到数据,在启动消费端后推送消息又可以获取到数据?
      消费端使用pykafka 而不是使用kafka,使用get_simple_consumer方法来获取数据,基本上每次指定一个新的group都会获取前边所有历史数据
    2. 消费端指定多个host?
      host使用逗号隔开.
  • 相关阅读:
    cache 元素 数据类型类(1)
    cache 存储数据访问
    cache类的元素
    COS(cache objectscript)语言及语法cache对象及对象类型
    cache创建数据库
    cache 元素 数据类型类(2)
    usaco1.3.3 Calf Flac 我的题解
    USACO the castle
    Ordered Fractions usaco
    【转】IBM Rational Rose 操作指南(下)
  • 原文地址:https://www.cnblogs.com/qianxunman/p/13750128.html
Copyright © 2011-2022 走看看