zoukankan      html  css  js  c++  java
  • kafka生产、消费py脚本

    原文:https://www.cnblogs.com/reblue520/p/8270412.html

    # -*- coding: utf-8 -*-
    
    '''''
        使用kafka-Python 1.3.3模块
        # pip install kafka==1.3.5
        # pip install kafka-python==1.3.5
    '''
    
    import sys
    import time
    import json
    
    from kafka import KafkaProducer
    from kafka import KafkaConsumer
    from kafka.errors import KafkaError
    
    
    KAFAKA_HOST = "101.236.51.235"
    KAFAKA_PORT = 9092
    KAFAKA_TOPIC = "test"
    
    
    class Kafka_producer():
        '''''
        生产模块:根据不同的key,区分消息
        '''
    
        def __init__(self, kafkahost,kafkaport, kafkatopic, key):
            self.kafkaHost = kafkahost
            self.kafkaPort = kafkaport
            self.kafkatopic = kafkatopic
            self.key = key
            print("producer:h,p,t,k",kafkahost,kafkaport,kafkatopic,key)
            bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
                    kafka_host=self.kafkaHost,
                    kafka_port=self.kafkaPort
                    )
            print("boot svr:",bootstrap_servers)
            self.producer = KafkaProducer(bootstrap_servers = bootstrap_servers
                    )
    
        def sendjsondata(self, params):
            try:
                parmas_message = json.dumps(params,ensure_ascii=False)
                producer = self.producer
                print(parmas_message)
                v = parmas_message.encode('utf-8')
                k = key.encode('utf-8')
                print("send msg:(k,v)",k,v)
                producer.send(self.kafkatopic, key=k, value= v)
                producer.flush()
            except KafkaError as e:
                print (e)
    
    class Kafka_consumer():
        '''''
        消费模块: 通过不同groupid消费topic里面的消息
        '''
    
        def __init__(self, kafkahost, kafkaport, kafkatopic, groupid):
            self.kafkaHost = kafkahost
            self.kafkaPort = kafkaport
            self.kafkatopic = kafkatopic
            self.groupid = groupid
            self.key = key
            self.consumer = KafkaConsumer(self.kafkatopic, group_id = self.groupid,
                    bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
                        kafka_host=self.kafkaHost,
                        kafka_port=self.kafkaPort )
                    )
    
        def consume_data(self):
            try:
                for message in self.consumer:
                    yield message
            except KeyboardInterrupt as e:
                print (e)
    
    
    def main(xtype, group, key):
        '''''
        测试consumer和producer
        '''
        if xtype == "p":
            # 生产模块
            producer = Kafka_producer(KAFAKA_HOST, KAFAKA_PORT, KAFAKA_TOPIC, key)
            print ("===========> producer:", producer)
            for _id in range(100):
               params = '{"msg" : "%s"}' % str(_id)
               params=[{"msg0" :_id},{"msg1" :_id}]
               producer.sendjsondata(params)
               time.sleep(1)
    
        if xtype == 'c':
            # 消费模块
            consumer = Kafka_consumer(KAFAKA_HOST, KAFAKA_PORT, KAFAKA_TOPIC, group)
            print ("===========> consumer:", consumer)
            message = consumer.consume_data()
            for msg in message:
                print ('msg---------------->k,v', msg.key,msg.value)
                print ('offset---------------->', msg.offset)
    
    if __name__ == '__main__':
        xtype = sys.argv[1]
        group = sys.argv[2]
        key = sys.argv[3]
        main(xtype, group, key)
  • 相关阅读:
    牛客小白月赛12 D 月月给华华出题 (欧拉函数,数论,线筛)
    牛客小白月赛12 F 华华开始学信息学 (分块+树状数组)
    牛客小白月赛12 C 华华给月月出题 (积性函数,线性筛)
    牛客小白月赛12 I 华华和月月逛公园 (tarjian 求桥)
    Tourist's Notes CodeForces
    Educational Codeforces Round 71 (Rated for Div. 2) E XOR Guessing (二进制分组,交互)
    Tunnel Warfare HDU
    蓝桥杯第三届总决赛
    HDU 1695(数论,筛选+素因子分解+容斥)
    lightoj 1248 Dice (III)(几何分布+期望)
  • 原文地址:https://www.cnblogs.com/hancece/p/11906276.html
Copyright © 2011-2022 走看看