zoukankan      html  css  js  c++  java
  • py kafka

    # https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/consumer.py
    #生产者
    import json
    from kafka import KafkaProducer
    from confluent_kafka import Producer
    
    msg_dict = {
        "interval": 10,
        "producer": {
            "name": "producer 1",
            "host": "10.10.10.1",
            "user": "root",
            "password": "root"
        },
        "cpu": "33.5%",
        "mem": "77%",
        "msg": "Hello kafka",
        "data": "测试",
    }
    
    
    def test():
        producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092')  # 连接kafka
    
        msg = "Hello World".encode('utf-8')  # 发送内容,必须是bytes类型
    
        # msg = {"data": 1}
        # producer.send('mytopic', json.dumps(msg_dict).encode("utf-8"))  # 发送的topic为test
        producer.send('mytopic', json.dumps(msg_dict))  # 发送的topic为test
        producer.close()
    
    
    # p = Producer({'bootstrap.servers': 'mybroker1,mybroker2'})
    p = Producer({'bootstrap.servers': '127.0.0.1:9092,mybroker2'})
    
    
    def delivery_report(err, msg):
        """ Called once for each message produced to indicate delivery result.
            Triggered by poll() or flush(). """
        if err is not None:
            print('Message delivery failed: {}'.format(err))
        else:
            print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
    
    
    # some_data_source = [str(i) + "*" for i in range(100)]
    some_data_source = [json.dumps(msg_dict) for i in range(5)]
    for data in some_data_source:
        # Trigger any available delivery report callbacks from previous produce() calls
        p.poll(0.1)
        # 异步通信  triggered(触发) delivered(递送)
        # Asynchronously produce a message, the delivery report callback
        # will be triggered from poll() above, or flush() below, when the message has
        # been successfully delivered or failed permanently.
        p.produce('mytopic', data.encode('utf-8'), callback=delivery_report)
    
    # Wait for any outstanding messages to be delivered and delivery report
    # callbacks to be triggered. 等待任何未完成的消息被传递,并触发传递报告回调。
    p.flush()
    
    if __name__ == '__main__':
        # test()
        pass
    
    
    
    # 消费者
    
    import json
    from kafka import KafkaConsumer
    from confluent_kafka.avro import AvroProducer, AvroConsumer
    from confluent_kafka import Consumer, KafkaError
    
    
    def one():
        consumer = KafkaConsumer('mytopic', bootstrap_servers=['127.0.0.1:9092'])
        for msg in consumer:
            # print(type(msg.value))
            # print(str(msg.value, encoding="utf8"))
            print(msg.value.decode())
            recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value.decode("utf-8"))
            print("...")
            print(recv)
    
    
    bootstrap_servers = '127.0.0.1:9092,xxx'
    
    
    def two():
        client = AvroProducer({
            'bootstrap.servers': bootstrap_servers,
            'schema.registry.url': "",
        })
        value = {"test": "value"}
        key = {"test": "key"}
        client.produce(topic="mytopic", value=value, key=key)
        client.flush()
    
    
    def three():
        c = Consumer({
            # 'bootstrap.servers': 'mybroker',
            'bootstrap.servers': bootstrap_servers,
            'group.id': 'mygroup2',
            'auto.offset.reset': 'earliest'
        })
    
        c.subscribe(['mytopic'])
    
        while True:
            msg = c.poll(1.0)
    
            if msg is None:
                continue
            if msg.error():
                print("Consumer error: {}".format(msg.error()))
                continue
            print(type(msg.value()))
            print(msg.value())
            print(json.loads(msg.value().decode('utf-8')))
            print('Received message: {}'.format(msg.value().decode('utf-8')))
    
        c.close()
    
    
    if __name__ == '__main__':
        # one()
        # two()
        three()
    
    
  • 相关阅读:
    手动安装vue-devtools
    redis随记
    JS时间格式化
    360自动抢票还不够,几行js代码设置无人值守
    HttpWebRequest请求返回非200的时候 HttpWebResponse怎么接受返回错误提示
    android发编译
    asprise-ocr-api-sample 高价收破解版64 32位
    (16)集合操作
    (15)字典操作
    (14)字符串
  • 原文地址:https://www.cnblogs.com/lajiao/p/11989849.html
Copyright © 2011-2022 走看看