zoukankan      html  css  js  c++  java
  • Centos7下安装kafka,并使用python操作kafka的简单使用

    kafka(端口9091-9093)-zookeeper(端口2181-2183)集群配置,使用自带的


     第一步:安装jdk


     jdk的安装参考:https://www.cnblogs.com/smilecindy/p/13736470.html


     第二步:安装kafka并启动kafka


     (1)下载kafka

    直接从官网下载:wget https://archive.apache.org/dist/kafka/1.0.0/kafka_2.11-1.0.0.tgz,进行解压安装

    也可以使用命令下载:wget  wget https://archive.apache.org/dist/kafka/1.0.0/kafka_2.11-1.0.0.tgz

    (2)解压kafka: tar -zxvf  kafka_2.11-1.0.0.tgz

     

     (3)运行zookeeper(kafka自带的zookeeper):

    cd kafka_2.11-1.0.0/     # 打开kafka目录

    sh bin/zookeeper-server-start.sh -daemon config/zookeeper.properties   # 后台运行zookeeper

    (4)运行kafka:

    sh bin/kafka-server-start.sh config/server.properties # 运行kafka服务

    出现 “started” 则是启动成功:

    注意: 运行成功之后,该窗口不能关闭,该窗口为【运行服务窗口】


     第三步:新打开一个窗口,进行创建topic



    命令:sh bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test5


     第四步:创建监听test5的消息队列程序----(consumer【消费者窗口】)


     命令:sh bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic test5 --from-beginning


     第五步:再先打开一个窗口,创建发送test5消息队列的生产者程序---(producer【生产者窗口】)


     命令:sh bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test5

    在>后输入信息并【enter】,则代表生产者发送了信息,如下图:

    回车之后,打开消费者端能够监听到生产者发送的消息,如下图:


     第六步:使用python操作kafka


     1.安装kafka的模块:pip install kafka-python;注意python3使用的是kafka-python

    2.新建一个kafka_consumer.py文件,并在【消费者窗口】执行该文件:

    from kafka import KafkaConsumer
    consumer = KafkaConsumer('test1', bootstrap_servers=['127.0.0.1:9092'])
    for msg in consumer:
    recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
    print(recv)

     3.新建一个kafka_producer.py文件,并在【生产者窗口】执行该文件:

    import json
    from kafka import KafkaProducer
    producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092')
    msg_dict = {
    "status": {
    "code": 0,
    "message": "ok"
    },
    "data": [
    {
    "set": {
    "bk_set_name": "空闲机池",
    "bk_set_id": 10,
    "TopModuleName": "",
    "bk_parent_id": 3,
    "bk_service_status": "1",
    "bk_set_desc": "",
    "bk_set_env": "3",
    "description": "",
    "bk_capacity": 0,
    "subnet": "",
    "wan_gate": "",
    "node_name": "智能边缘云##空闲机池"
    },
    "room": None,
    "lake": None,
    "lake_list": []
    }
    ],
    "paging": {
    "total_page": 1,
    "page": 1,
    "per_page": 10,
    "total_record": 1
    }
    }
    msg = json.dumps(msg_dict)
    producer.send('test1', bytes(msg,'ascii'), partition=0)
    producer.close()

    文件执行成功之后,则回到消费者窗口,查看是否进行消费信息,如下图:

     
  • 相关阅读:
    3.2 playbook tags
    3.1 playbook语法实践
    3. playbook基础组件
    elasticsearch IK中文分词
    elasticsearch参数详解
    2. ansible常用模块
    1. ansible简介
    Python sphinx-build在Windows系统中生成Html文档
    Oracle PL/SQL Developer集成TFS进行团队脚本文件版本管理
    Gulp自动构建Web前端程序
  • 原文地址:https://www.cnblogs.com/smilecindy/p/13968283.html
Copyright © 2011-2022 走看看