zoukankan      html  css  js  c++  java
  • kafka python

    python-kafka 参考: https://blog.csdn.net/see_you_see_me/article/details/78468421

     python  kafka 的 api 的基本操作:  https://www.cppentry.com/bencandy.php?fid=120&id=207208

    操作 比较全: https://blog.csdn.net/qq_41262248/article/details/80790918

    kafka 配置详解:

    https://blog.csdn.net/weixin_40596016/article/details/79562023

    依赖: 

       pip  install  kafka-python

    发送数据:


    1)、命令行方式---普通的发送方式


     from kafka import KafkaProducer
     producer = KafkaProducer(bootstrap_servers='192.168.120.11:9092')
     for _ in range(100):
    ... producer.send('world',b'some_message_bytes')
    .. producer.close().

    上面的几行功能分别是:

    导入KafkaProducer

    创建连接到192.168.120.11:9092这个Broker的Producer,

    循环向world这个Topic发送100个消息,消息内容都是some_message_bytes',这种发送方式不指定Partition,kafka会均匀大把这些消息分别写入5个Partiton里面,


    更详细的说明可以参考 https://kafka-python.readthedocs.io/en/master/index.html

    2)、命令行方式---发送json字符串
    json作为一种强大的文本格式,已经得到非常普遍的应用,kafak-python也支持发送json格式的消息

    其实如果你参考https://kafka-python.readthedocs.io/en/master/index.html这里的KafkaProducer里面的发送json

    producer = KafkaProducer(bootstrap_servers='192.168.120.11:9092',value_serializer=lambda v: json.dumps(v).encode('utf-8'))
    producer.send('world', {'key1': 'value1'})
    producer.close()

    4)、命令行方式--发送压缩字符串

     producer = KafkaProducer(bootstrap_servers='192.168.18.129:9092',compression_type='gzip')

    上面都是测试各个命令的使用,接下来,我们写一个完整的脚本,这个脚本的功能是把指定目录下的文件名发送到world这个topic

    file_monitor.py脚本

    #-*- coding: utf-8 -*-
    
    from kafka import KafkaProducer
    import json
    import os
    import time
    from sys import argv
    
    producer = KafkaProducer(bootstrap_servers='192.168.18.129:9092')
    
    def log(str):
    t = time.strftime(r"%Y-%m-%d_%H-%M-%S",time.localtime())
    print("[%s]%s"%(t,str))
    
    def list_file(path):
    dir_list = os.listdir(path);
    for f in dir_list:
    producer.send('world',f)
    producer.flush()
    log('send: %s' % (f))    
    
    list_file(argv[1])
    producer.close()
    log('done')


    假如我们要监控/opt/jdk1.8.0_91/lib/missioncontrol/features这个目录下的文件,可以这样执行

    python file_monitor.py  /opt/jdk1.8.0_91/lib/missioncontrol/features

    原文链接:https://blog.csdn.net/see_you_see_me/article/details/78468421

    2、创建Consumer
    通常使用Kafka时会创建不同的Topic,并且在Topic里面创建多个Partiton,因此作为Consumer,通常是连接到指定的Broker,指定的Topic来消费消息。

    完整的python 脚本

    consumer.py

    #-*- coding: utf-8 -*-
    
    from kafka import KafkaConsumer
    import time
    
    def log(str):
    t = time.strftime(r"%Y-%m-%d_%H-%M-%S",time.localtime())
    print("[%s]%s"%(t,str))
    
    log('start consumer')
    #消费192.168.120.11:9092上的world 这个Topic,指定consumer group是consumer-20171017
    consumer=KafkaConsumer('world',group_id='consumer-20171017',bootstrap_servers=['192.168.120.11:9092'])
    for msg in consumer:
    recv = "%s:%d:%d: key=%s value=%s" %(msg.topic,msg.partition,msg.offset,msg.key,msg.value)
    log(recv)


    原文链接:https://blog.csdn.net/see_you_see_me/article/details/78468421

    手动提交:

    #encoding=utf-8 
    from kafka.consumer.group import KafkaConsumer
    print("########################################################")
    print("##############   kafka consumer test 1  ################")
    print("########################################################")
    '''
    自动提交位移设为flase, 默认为取最新的偏移量,重新建立一个group_id,
    这样就实现了不影响别的应用程序消费数据,又能消费到最新数据,实现预警(先于用户发现)的目的。
    '''
    print("============= start consumer +++=========== ")
    consumer = KafkaConsumer("json",bootstrap_servers = ['192.168.18.129:9092'],
                            group_id ='test_group_json',
                            auto_offset_reset ='earliest', 
                            enable_auto_commit =False)
    
    for msg in consumer:
        offset = msg.offset
        print("offset"+str(offset))
        print("=============  consumering 2 +++=========== ")
        recv = "%s:%d:%d: key=%s value=%s" %(msg.topic,msg.partition,msg.offset,msg.key,msg.value)
        print(recv)
        consumer.commit()
        
    print("============= start consumer +++=========== ")

    5、消费者(手动设置偏移量)

    from kafka import KafkaConsumer
    from kafka.structs import TopicPartition
    
    
    consumer = KafkaConsumer('test',
                             bootstrap_servers=['172.21.10.136:9092'])
    
    
    print consumer.partitions_for_topic("test")  #获取test主题的分区信息
    print consumer.topics()  #获取主题列表
    print consumer.subscription()  #获取当前消费者订阅的主题
    print consumer.assignment()  #获取当前消费者topic、分区信息
    print consumer.beginning_offsets(consumer.assignment()) #获取当前消费者可消费的偏移量
    consumer.seek(TopicPartition(topic=u'test', partition=0), 5)  #重置偏移量,从第5个偏移量消费
    for message in consumer:
        print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                              message.offset, message.key,
                                              message.value))


                                              
    6、消费者(订阅多个主题)

    from kafka import KafkaConsumer
    from kafka.structs import TopicPartition
    
    
    consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092'])
    consumer.subscribe(topics=('test','test0'))  #订阅要消费的主题
    print consumer.topics()
    print consumer.position(TopicPartition(topic=u'test', partition=0)) #获取当前主题的最新偏移量
    for message in consumer:
        print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                              message.offset, message.key,
                                              message.value))

    7、消费者(手动拉取消息)

    from kafka import KafkaConsumer
    import time
    
    
    consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092'])
    consumer.subscribe(topics=('test','test0'))
    while True:
        msg = consumer.poll(timeout_ms=5)   #从kafka获取消息
        print msg
        time.sleep(1)
        
  • 相关阅读:
    java代码块执行顺序
    Oracle-SQL高级查询
    java单例模式
    Oracle序列和伪表
    Oracle函数
    Oracle存储过程
    Oracle触发器
    Oracle分析函数
    Oracle分页查询
    Oracle联合查询
  • 原文地址:https://www.cnblogs.com/lshan/p/11644839.html
Copyright © 2011-2022 走看看