zoukankan      html  css  js  c++  java
  • python连接kafka生产者,消费者脚本

    # -*- coding: utf-8 -*-
    
    '''''
        使用kafka-Python 1.3.3模块
        # pip install kafka==1.3.5
        # pip install kafka-python==1.3.5
    '''
    
    import sys
    import time
    import json
    
    from kafka import KafkaProducer
    from kafka import KafkaConsumer
    from kafka.errors import KafkaError
    
    
    KAFAKA_HOST = "101.236.51.235"
    KAFAKA_PORT = 9092
    KAFAKA_TOPIC = "test"
    
    
    class Kafka_producer():
        '''''
        生产模块:根据不同的key,区分消息
        '''
    
        def __init__(self, kafkahost,kafkaport, kafkatopic, key):
            self.kafkaHost = kafkahost
            self.kafkaPort = kafkaport
            self.kafkatopic = kafkatopic
            self.key = key
            print("producer:h,p,t,k",kafkahost,kafkaport,kafkatopic,key)
            bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
                    kafka_host=self.kafkaHost,
                    kafka_port=self.kafkaPort
                    )
            print("boot svr:",bootstrap_servers)
            self.producer = KafkaProducer(bootstrap_servers = bootstrap_servers
                    )
    
        def sendjsondata(self, params):
            try:
                parmas_message = json.dumps(params,ensure_ascii=False)
                producer = self.producer
                print(parmas_message)
                v = parmas_message.encode('utf-8')
                k = key.encode('utf-8')
                print("send msg:(k,v)",k,v)
                producer.send(self.kafkatopic, key=k, value= v)
                producer.flush()
            except KafkaError as e:
                print (e)
    
    class Kafka_consumer():
        '''''
        消费模块: 通过不同groupid消费topic里面的消息
        '''
    
        def __init__(self, kafkahost, kafkaport, kafkatopic, groupid):
            self.kafkaHost = kafkahost
            self.kafkaPort = kafkaport
            self.kafkatopic = kafkatopic
            self.groupid = groupid
            self.key = key
            self.consumer = KafkaConsumer(self.kafkatopic, group_id = self.groupid,
                    bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
                        kafka_host=self.kafkaHost,
                        kafka_port=self.kafkaPort )
                    )
    
        def consume_data(self):
            try:
                for message in self.consumer:
                    yield message
            except KeyboardInterrupt as e:
                print (e)
    
    
    def main(xtype, group, key):
        '''''
        测试consumer和producer
        '''
        if xtype == "p":
            # 生产模块
            producer = Kafka_producer(KAFAKA_HOST, KAFAKA_PORT, KAFAKA_TOPIC, key)
            print ("===========> producer:", producer)
            for _id in range(100):
               params = '{"msg" : "%s"}' % str(_id)
               params=[{"msg0" :_id},{"msg1" :_id}]
               producer.sendjsondata(params)
               time.sleep(1)
    
        if xtype == 'c':
            # 消费模块
            consumer = Kafka_consumer(KAFAKA_HOST, KAFAKA_PORT, KAFAKA_TOPIC, group)
            print ("===========> consumer:", consumer)
            message = consumer.consume_data()
            for msg in message:
                print ('msg---------------->k,v', msg.key,msg.value)
                print ('offset---------------->', msg.offset)
    
    if __name__ == '__main__':
        xtype = sys.argv[1]
        group = sys.argv[2]
        key = sys.argv[3]
        main(xtype, group, key)

    使用方式
    生产消息

    python testkafka.py p g k
    消费消息

    python testkafka.py c g k

  • 相关阅读:
    C++学习9 this指针详解
    福建省第八届 Triangles
    UVA 11584 Partitioning by Palindromes
    POJ 2752 Seek the Name, Seek the Fame
    UVA 11437 Triangle Fun
    UVA 11488 Hyper Prefix Sets (字典树)
    HDU 2988 Dark roads(kruskal模板题)
    HDU 1385 Minimum Transport Cost
    HDU 2112 HDU Today
    HDU 1548 A strange lift(最短路&&bfs)
  • 原文地址:https://www.cnblogs.com/reblue520/p/8270412.html
Copyright © 2011-2022 走看看