zoukankan      html  css  js  c++  java
  • python 发送kafka

    python 发送kafka大体有三种方式

    1 发送并忘记(不关注是否正常到达,不对返回结果做处理)

    1 import pickle
    2 import time
    3 from kafka import KafkaProducer
    4
    5 producer = KafkaProducer(bootstrap_servers=['ip:9092'],
    6                          key_serializer=lambda k: pickle.dumps(k),
    7                          value_serializer=lambda v: pickle.dumps(v))
    8
    9 start_time = time.time()
    10 for i in range(0, 10000):
    11     print('------{}---------'.format(i))
    12     future = producer.send('test_topic', key='num', value=i, partition=0)
    13
    14 # 将缓冲区的全部消息push到broker当中
    15 producer.flush()
    16 producer.close()
    17
    18 end_time = time.time()
    19 time_counts = end_time - start_time
    20 print(time_counts)

    2  同步发送(通过get方法等待Kafka的响应,判断消息是否发送成功)

    1 import pickle
    2 import time
    3 from kafka import KafkaProducer
    4 from kafka.errors import kafka_errors
    5
    6 producer = KafkaProducer(
    7     bootstrap_servers=['ip:9092'],
    8     key_serializer=lambda k: pickle.dumps(k),
    9     value_serializer=lambda v: pickle.dumps(v)
    10 )
    11
    12 start_time = time.time()
    13 for i in range(0, 10000):
    14     print('------{}---------'.format(i))
    15     future = producer.send(topic="test_topic", key="num", value=i)
    16     # 同步阻塞,通过调用get()方法进而保证一定程序是有序的.
    17     try:
    18         record_metadata = future.get(timeout=10)
    19         # print(record_metadata.topic)
    20         # print(record_metadata.partition)
    21         # print(record_metadata.offset)
    22     except kafka_errors as e:
    23         print(str(e))
    24
    25 end_time = time.time()
    26 time_counts = end_time - start_time
    27 print(time_counts)

    3  异步发送+回调函数(消息以异步的方式发送,通过回调函数返回消息发送成功/失败)

    1 import pickle
    2 import time
    3 from kafka import KafkaProducer
    4
    5 producer = KafkaProducer(
    6     bootstrap_servers=['ip:9092'],
    7     key_serializer=lambda k: pickle.dumps(k),
    8     value_serializer=lambda v: pickle.dumps(v)
    9 )
    10
    11
    12 def on_send_success(*args, **kwargs):
    13     """
    14     发送成功的回调函数
    15     :param args:
    16     :param kwargs:
    17     :return:
    18     """
    19     return args
    20
    21
    22 def on_send_error(*args, **kwargs):
    23     """
    24     发送失败的回调函数
    25     :param args:
    26     :param kwargs:
    27     :return:
    28     """
    29
    30     return args
    31
    32
    33 start_time = time.time()
    34 for i in range(0, 10000):
    35     print('------{}---------'.format(i))
    36     # 如果成功,传进record_metadata,如果失败,传进Exception.
    37     producer.send(
    38         topic="test_topic", key="num", value=i
    39     ).add_callback(on_send_success).add_errback(on_send_error)
    40
    41 producer.flush()
    42 producer.close()
    43
    44 end_time = time.time()
    45 time_counts = end_time - start_time
    46 print(time_counts)

     除此之外,还能发送压缩数据流

    def gzip_compress(msg_str):
        try:
            buf = StringIO.StringIO()
            with gzip.GzipFile(mode='wb', fileobj=buf) as f:
                f.write(msg_str)
            return buf.getvalue()
        except BaseException, e:
            print ("Gzip压缩错误" + e)
    
    
    def gzip_uncompress(c_data):
        try:
            buf = StringIO.StringIO(c_data)
            with gzip.GzipFile(mode='rb', fileobj=buf) as f:
                return f.read()
        except BaseException, e:
            print ("Gzip解压错误" + e)
    
    
    def send_kafka(topic_name, msg, key=None):
        if key is not None:
            producer = KafkaProducer(bootstrap_servers=["fdw8.fengjr.inc:9092","fdw9.fengjr.inc:9092","fdw10.fengjr.inc:9092"],
                                     key_serializer=gzip_compress, value_serializer=gzip_compress)
            r = producer.send(topic_name, value=msg, key=key)
        else:
            producer = KafkaProducer(bootstrap_servers=["fdw8.fengjr.inc:9092","fdw9.fengjr.inc:9092","fdw10.fengjr.inc:9092"],
                                     value_serializer=gzip_compress)
            r = producer.send(topic_name, value=msg)
        # producer.flush(timeout=5)
        producer.close(timeout=5)
        return r

      

  • 相关阅读:
    CLASS 类 __getattr__
    class多态
    class类 __repr__ 与__str__
    CLASS类继承
    calss 类
    SVN报错:database is locked
    项目:表格打印(字符图网格进阶、rjust、列表中最长的字符串长度)
    项目:口令保管箱,批处理文件配置.bat
    字典方法 setdefault()、pprint;迭代、递归的区别
    项目:在wiki标记中添加无序列表(split、join巩固)
  • 原文地址:https://www.cnblogs.com/tigerzhouv587/p/11232398.html
Copyright © 2011-2022 走看看