zoukankan      html  css  js  c++  java
  • python实现-kafka作为消息中间件 -实现数据生产和消费-实用的脚本

    转自:https://www.cnblogs.com/DJRemix/p/11383103.html
    import socket

    import json
    from kafka import KafkaProducer
    from kafka.errors import KafkaError
    #kafka的配置文件:(bootstrap_servers:kafka的集群地址,topic_name:主题,consumer_id:消费分组)
    KAFKA_SETTING = {
    'bootstrap_servers': ["172.24.181.182:9092", "172.24.181.181:9092", "172.24.181.183:9092"],
    'topic_name': 'user_data',
    'consumer_id': 'consumer_ai'
    }
    conf= KAFKA_SETTING

    print("[setting] =", conf)


    producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers'],
    api_version = (0,10),
    retries=5)

    partitions = producer.partitions_for(conf['topic_name'])
    print('Topic下分区: %s' % partitions)
    #需要推送的数据:(推送到kafka的数据类型必须的json类型)
    user_data = {
    "appToken": "d23ea83dbf7c411aa36e5ab519f41818",
    "appId": "JF_WK_001",
    "mobile": "15950857927",
    "isRealTimeReturn": True,
    "applyTime": 15100226057,
    "uuid": "a91140f54b898w85d7a50d4b95994",
    "customerNo": 1153265851
    }

    send_data = bytes(json.dumps(user_data), encoding="utf-8")

    try:
    future = producer.send(conf['topic_name'], send_data)
    future.get()
    print('send message succeed.')
    except KafkaError as e:
    print('send message failed. [e] ='),

    import socket
    from kafka import KafkaConsumer
    from kafka.errors import KafkaError


    KAFKA_SETTING = {
    'bootstrap_servers': ["172.24.181.182:9092", "172.24.181.181:9092", "172.24.181.183:9092"],
    'topic_name': 'result_data',
    'topic_name_user': 'user_data',
    'consumer_id': 'consumer_ai'
    }

    conf = KAFKA_SETTING

    consumer = KafkaConsumer(bootstrap_servers=conf['bootstrap_servers'],
    group_id=conf['consumer_id'],
    api_version = (0,10))

    print('consumer start to consuming...')
    consumer.subscribe((conf['topic_name'], ))
    from IPython import embed
    # embed()

    print("consumer = ", consumer)
    for message in consumer:
    print(message.topic, message.offset, message.key, message.value, message.partition)
  • 相关阅读:
    STM32使用keil串口输出中文乱码问题
    STM32CUBEMX忘记配置sys中的debug导致程序只能下载一次的问题
    远渡重洋的开源之路我是买家项目
    其实我就是个技术迷自身定位及展望
    五一上海行
    The Secret 秘密 读书笔记
    MySQL数据库设计复习笔记及项目实战
    PHP可调试团队开发环境配置支持企业级开发
    WIN7下QQ概念版使用手记
    Memento
  • 原文地址:https://www.cnblogs.com/yangjintao/p/11384728.html
Copyright © 2011-2022 走看看