zoukankan      html  css  js  c++  java
  • python中kafka生产者和消费者实现

    安装kafka-python:

    C:anaconda3Scripts>pip install kafka-python


    import datetime
    import json
    from kafka import KafkaProducer
    from kafka import KafkaConsumer
    from kafka.errors import KafkaError

    '''
    使用kafka-python的生产模块
    '''
    class Kafka_producer():
    def __init__(self, bootstrapServers, kafkaTopic):
    self.bootstrapServers = bootstrapServers
    self.kafkaTopic = kafkaTopic
    self.producer = KafkaProducer(bootstrap_servers=self.bootstrapServers)

    def sendjsondata(self, params):
    try:
    parmas_message = json.dumps(params)
    producer = self.producer
    future = producer.send(self.kafkaTopic, parmas_message.encode('utf-8'))
    producer.flush()
    recordMetadata = future.get(timeout=10)
    print(recordMetadata, datetime.datetime.now().strftime('%Y%m%d%H%M%S'))
    except KafkaError as e:
    print(e)

    '''
    使用Kafka-python的消费模块
    '''
    class Kafka_consumer():
    def __init__(self, bootstrapServers, kafkaTopic, groupId):
    self.kafkaTopic = kafkaTopic
    self.bootstrapServers = bootstrapServers
    self.groupId = groupId
    self.consumer = KafkaConsumer(self.kafkaTopic, group_id=self.groupId, bootstrap_servers=self.bootstrapServers)

    def consume_data(self):
    try:
    for message in self.consumer:
    yield message
    except BaseException as e:
    print(e)

    if __name__ == '__main__':
    bootstrapServers = ['ip1:port1', 'ip2:port2', 'ip3:port3']
    topicStr = '主题'

    print('-' * 20)
    print('生产者')
    print('-' * 20)

    producer = Kafka_producer(bootstrapServers, topicStr)
    for id in range(5):
    params = '{tst}:{null}---' + str(id)
    producer.sendjsondata(params)

    print('-' * 20)
    print('消费者')
    print('-' * 20)

    groupId = 'group名称'
    consumer = Kafka_consumer(bootstrapServers, topicStr, groupId)
    message = consumer.consume_data()
    for i in message:
    print(i.value)


  • 相关阅读:
    [LUOGU] 1364 医院设置
    [POJ] 3278 Catch That Cow
    [OpenJudge] 2727 仙岛寻药
    [POJ] 2386 Lake Counting
    [POJ]1118 Lining up
    [LUOGU]1141 01迷宫
    [POJ]1111 Image Perimeters
    python之路——初识函数
    python----------文件操作
    Python中的split()函数的用法
  • 原文地址:https://www.cnblogs.com/WebLinuxStudy/p/12692830.html
Copyright © 2011-2022 走看看