zoukankan      html  css  js  c++  java
  • python kafka

    kafka简介(摘自百度百科)

    简介:

    kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消费。

    特性:
    通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
    高吞吐量[2]  :即使是非常普通的硬件Kafka也可以支持每秒数百万[2]  的消息
    支持通过Kafka服务器和消费机集群来分区消息
    支持Hadoop并行数据加载

    术语:
    Broker
    Kafka集群包含一个或多个服务器,这种服务器被称为broker
    Topic
    每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
    Partition
    Partition是物理上的概念,每个Topic包含一个或多个Partition.
    Producer
    负责发布消息到Kafka broker
    Consumer
    消息消费者,向Kafka broker读取消息的客户端。
    Consumer Group
    每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。

    一、安装

    在pypi.python.org有很多关于操作kafka的组件,我们选择weight最高的kafka 1.3.5
    1、有网的情况下执行如下命令安装:
    pip install kafka
    easy_install kafka

    2、无网的情况下把源码下载下来,上传到需要安装的主机
    压缩包:kafka-1.3.5.tar.gz
    解压: tar xvf kafka-1.3.5.tar.gz
    执行安装命令:   cd kafka-1.3.5   
                   python setup.py install
                   
    如安装报依赖错误,需要把依赖的组件也下载下来,然后进行安装,同样的方法,不赘述!

    二、按照官网的样例,先跑一个应用

    1、生产者:

    from kafka import KafkaProducer
    
    producer = KafkaProducer(bootstrap_servers=['172.21.10.136:9092'])  #此处ip可以是多个['0.0.0.1:9092','0.0.0.2:9092','0.0.0.3:9092' ]
    
    for i in range(3):
        msg = "msg%d" % i
        producer.send('test', msg)
    producer.close()

    2、消费者(简单demo):

    from kafka import KafkaConsumer
    
    consumer = KafkaConsumer('test',
                             bootstrap_servers=['172.21.10.136:9092'])
                             
    for message in consumer:
        print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                              message.offset, message.key,
                                              message.value))

    启动后生产者、消费者可以正常消费。

    3、消费者(消费群组)

    from kafka import KafkaConsumer
    
    consumer = KafkaConsumer('test',
                             group_id='my-group',
                             bootstrap_servers=['172.21.10.136:9092'])
                             
    for message in consumer:
        print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                              message.offset, message.key,
                                              message.value))

    启动多个消费者,只有其中可以可以消费到,满足要求,消费组可以横向扩展提高处理能力

    4、消费者(读取目前最早可读的消息)

    from kafka import KafkaConsumer
    
    consumer = KafkaConsumer('test',
                             auto_offset_reset='earliest',
                             bootstrap_servers=['172.21.10.136:9092'])
                             
    for message in consumer:
        print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                              message.offset, message.key,
                                              message.value))

    auto_offset_reset:重置偏移量,earliest移到最早的可用消息,latest最新的消息,默认为latest
    源码定义:{'smallest': 'earliest', 'largest': 'latest'}

    5、消费者(手动设置偏移量)

    from kafka import KafkaConsumer
    from kafka.structs import TopicPartition
    
    consumer = KafkaConsumer('test',
                             bootstrap_servers=['172.21.10.136:9092'])
    
    print consumer.partitions_for_topic("test")  #获取test主题的分区信息
    print consumer.topics()  #获取主题列表
    print consumer.subscription()  #获取当前消费者订阅的主题
    print consumer.assignment()  #获取当前消费者topic、分区信息
    print consumer.beginning_offsets(consumer.assignment()) #获取当前消费者可消费的偏移量
    consumer.seek(TopicPartition(topic=u'test', partition=0), 5)  #重置偏移量,从第5个偏移量消费
    for message in consumer:
        print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                              message.offset, message.key,
                                              message.value))

    6、消费者(订阅多个主题)

    from kafka import KafkaConsumer
    from kafka.structs import TopicPartition
    
    consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092'])
    consumer.subscribe(topics=('test','test0'))  #订阅要消费的主题
    print consumer.topics()
    print consumer.position(TopicPartition(topic=u'test', partition=0)) #获取当前主题的最新偏移量
    for message in consumer:
        print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                              message.offset, message.key,
                                              message.value))

    7、消费者(手动拉取消息)

    from kafka import KafkaConsumer
    import time
    
    consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092'])
    consumer.subscribe(topics=('test','test0'))
    while True:
        msg = consumer.poll(timeout_ms=5)   #从kafka获取消息
        print msg
        time.sleep(1)

    8、消费者(消息挂起与恢复)

    from kafka import KafkaConsumer
    from kafka.structs import TopicPartition
    import time
    
    consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092'])
    consumer.subscribe(topics=('test'))
    consumer.topics()
    consumer.pause(TopicPartition(topic=u'test', partition=0))
    num = 0
    while True:
        print num
        print consumer.paused()   #获取当前挂起的消费者
        msg = consumer.poll(timeout_ms=5)
        print msg
        time.sleep(2)
        num = num + 1
        if num == 10:
            print "resume..."
            consumer.resume(TopicPartition(topic=u'test', partition=0))
            print "resume......"

    pause执行后,consumer不能读取,直到调用resume后恢复。

  • 相关阅读:
    关于oracle分页出现数据重复的问题
    VSCode如何配置中文环境
    POI导出excel执行自动求和
    IE浏览器不支持document.getElementsByClassName的解决办法
    原生JavaScript实现切换tab显示不同的样式
    easyUI日期控件只选择月份和日期
    POI导出Excel的两种方法
    python Django 学习笔记(四)—— 使用MySQL数据库
    python Django 学习笔记(三)—— 模版的使用
    python Django 学习笔记(二)—— 一个简单的网页
  • 原文地址:https://www.cnblogs.com/wangbaihan/p/9467892.html
Copyright © 2011-2022 走看看