zoukankan      html  css  js  c++  java
  • Kafka 简单实验二(Python实现简单生产者消费者)

    Apache Kafka 是什么?

    Kafka 是一个开源的分布式流处理平台,其简化了不同数据系统的集成。流指的是一个数据管道,应用能够通过流不断地接收数据。Kafka 作为流处理系统主要有两个用处:

    1. 数据集成: Kafka 捕捉事件流或数据变化流,并将这些数据送给其它数据系统,如关系型数据库,键值对数据库或者数据仓库。
    2. 流处理:Kafka接收事件流并保存在一个只能追加的队列里,该队列称为日志(log)。日志里的信息是不可变的,因此支持连续实时的数据处理和流转换,并使结果在系统级别可访问。

    相比于其它技术,Kafka 拥有更高的吞吐量,内置分区,副本和容错率。这些使得 Kafka 成为大规模消息处理应用的良好解决方案。

    Kafka 系统有三个主要的部分:

    1. 生产者(Producer): 产生原始数据的服务。
    2. 中间人(Broker): Kafka 是生产者和消费者之间的中间人,它使用API来获取和发布数据。
    3. 消费者(Consumer): 使用中间人发布的数据的服务。

    安装 Kafka

      见 Kafka 简单实验一

    配置环境

    我们的项目将包括:

      生产者:将字符串发送给 Kafka   消费者: 获取数据并展示在终端窗口中  Kafka: 作为中间人

    安装需要的依赖包

    pip install kafka-python
    

    创建生产者

    生产者是给 Kafka 中间人发送消息的服务。值得注意的是,生产者并不关注最终消费或加载数据的消费者。 创建生产者: 创建一个 producer.py 文件并添加如下代码:

    import time
    from kafka import SimpleProducer, KafkaClient
    
    #  connect to Kafka
    kafka = KafkaClient('localhost:9092')
    producer = SimpleProducer(kafka)
    # Assign a topic
    topic = 'my-topic'

    创建消息:

      循环生成1到100之间的数字

    发送消息:

      Kafka 消息是二进制字符串格式(byte)

    以下是完整的生产者代码:

    import time
    from kafka import SimpleProducer, KafkaClient
    
    #  connect to Kafka
    kafka = KafkaClient('localhost:9092')
    producer = SimpleProducer(kafka)
    # Assign a topic
    topic = 'my-topic'
    
    def test():
        print('begin')
        n = 1
        while (n<=100):
            producer.send_messages(topic, str(n))
            print "send" + str(n)
            n += 1
            time.sleep(0.5)
        print('done')
    
    if __name__ == '__main__':
        test()

    创建消费者

    消费者监听并消费来自 Kafka 中间人的消息。我们的消费者应该监听 my-topic 主题的消息并将消息展示。

    以下是消费者代码(consumer.py):

    from kafka import KafkaConsumer
    
    #connect to Kafka server and pass the topic we want to consume
    consumer = KafkaConsumer('my-topic')
    
    print "begin"
    for msg in consumer:
        print msg

    运行项目

    确保 Kafka 在运行
    打开两个终端,在第一个终端中运行消费者:

    $ python consumer.py

    在第二个终端运行生产者:

    $ python producer.py
  • 相关阅读:
    Android Material Design控件使用(2)——FloatButton TextInputEditText TextInputLayout 按钮和输入框
    HbuilderX 常用快捷键
    Android Material Design控件使用(一)——ConstraintLayout 约束布局
    git 命令行 修改文件 并push(阿里云)
    【http转https】其之二:申请Let's Encrypt颁发SSL证书
    git: fatal: Could not read from remote repository
    Android Vector曲折的兼容之路
    Gradle Build速度加快终极方法(android studio)
    android studio 更新 Gradle错误解决方法(Gradle sync failed)
    开发Android必知的工具
  • 原文地址:https://www.cnblogs.com/zhangtianyuan/p/7655904.html
Copyright © 2011-2022 走看看