zoukankan      html  css  js  c++  java
  • Kafka 通过python简单的生产消费实现

    使用CentOS6.5、python3.6、kafkaScala 2.10  - kafka_2.10-0.8.2.2.tgz (asc, md5)

    一、下载kafka

    下载地址

    https://kafka.apache.org/downloads

    里面包含zookeeper

    二、安装Kafka

    1、安装zookeeper

    mkdir /root/kafka/

    tar -vzxf kafka_2.10-0.8.2.2

    cd /root/kafka/kafka_2.10-0.8.2.2

    cat  config/zookeeper.properties | grep -v '#' >> config/zk.properties

    mkdir -p /home/kafka/zk

    vi zk.properties
    dataDir=/home/kafka/zk  #因为zookeeper变更为zk,所以需要在这里修改一下

    启动zookeeper(后台启动)

    /root/kafka/kafka_2.10-0.8.2.2/bin/zookeeper-server-start.sh /root/kafka/kafka_2.10-0.8.2.2/config/zk.properties &

    2、安装Kafka

    cd /root/kafka/kafka_2.10-0.8.2.2

    cat config/server.properties | grep -v '#'  >> config/kafka_01.properties

    启动Kafka(后台启动)

    /root/kafka/kafka_2.10-0.8.2.2/bin/kafka-server-start.sh /root/kafka/kafka_2.10-0.8.2.2/config/kafka_01.properties &

    三、新建Kafka topic

    1、新建topic

    cd /root/kafka/kafka_2.10-0.8.2.2

    ./bin/kafka-topics.sh --create --zookeeper 192.168.50.33:2181 --replication-factor 1 --partitions 1 --topic test

    2、查看topic

    ./bin/kafka-topics.sh --list --zookeeper 192.168.50.33:2181

    四、kafka生产者脚本

    1、安装python的Kafka模块

    pip3 install kafka-python(之前已安装)

    2、kafka生产者脚本

    cat kafka_pro.py
    from kafka import KafkaProducer
    from kafka import KafkaConsumer
    from kafka.errors import KafkaError
    import json
    import time
    class Kafka_producer():
        '''
        使用kafka的生产模块
        '''
        def __init__(self, kafkahost, kafkaport, kafkatopic):
            self.kafkaHost = kafkahost
            self.kafkaPort = kafkaport
            self.kafkatopic = kafkatopic
            self.producer = KafkaProducer(bootstrap_servers='{kafka_host}:{kafka_port}'.format(
                kafka_host=self.kafkaHost,
                kafka_port=self.kafkaPort
            ))
        def sendjsondata(self, params):
            try:
                parmas_message = json.dumps(params)
                producer = self.producer
                producer.send(self.kafkatopic, parmas_message.encode('utf-8'))
                producer.flush()
            except KafkaError as e:
                print(e)

    def main():
        '''
        测试consumer和producer
        :return:
        '''
        # 测试生产模块
        producer = Kafka_producer("127.0.0.1",9092,"test")
        for i in range(1000000000000):
            params = 'test---' + str(i)
            print(params)
            producer.sendjsondata(params)
            time.sleep(1)

    if __name__ == '__main__':
        main()
        import os
        print(os.uname)

    五、kafka消费者脚本

    cat kafka_cust.py
    from kafka import KafkaProducer
    from kafka import KafkaConsumer
    from kafka.errors import KafkaError
    import json
    import time
    class Kafka_consumer():
        '''
        使用Kafka—python的消费模块
        '''
        def __init__(self, kafkahost, kafkaport, kafkatopic, groupid):
            self.kafkaHost = kafkahost
            self.kafkaPort = kafkaport
            self.kafkatopic = kafkatopic
            self.groupid = groupid
            self.consumer = KafkaConsumer(self.kafkatopic, group_id=self.groupid,
                                          bootstrap_servers='{kafka_host}:{kafka_port}'.format(
                                              kafka_host=self.kafkaHost,
                                              kafka_port=self.kafkaPort))
        def consume_data(self):
            try:
                for message in self.consumer:
                    # print json.loads(message.value)
                    yield message
            except KeyboardInterrupt as e:
                print(e)

    def main():
        '''
        测试consumer和producer
        :return:
        '''
        # 测试消费模块
        # 消费模块的返回格式为ConsumerRecord(topic=u'ranktest', partition=0, offset=202, timestamp=None,
        # imestamp_type=None, key=None, value='"{abetst}:{null}---0"', checksum=-1868164195,
        # serialized_key_size=-1, serialized_value_size=21)
        consumer = Kafka_consumer('127.0.0.1',
                                  9092,
                                  "test",
                                  'test-python-test')
        message = consumer.consume_data()
        for i in message:
            print(i.value)

    if __name__ == '__main__':
        main()
    整理自:
    https://www.cnblogs.com/hunttown/p/9041036.html
    https://gitee.com/jalright/scriptstodo/blob/master/kafka/producer.py
    https://gitee.com/jalright/scriptstodo/blob/master/kafka/cunsumer.py
  • 相关阅读:
    JCL: What is EXCP
    百分比布局的使用
    使用TabLayout快速实现一个导航栏
    彻底理解android中的内部存储与外部存储
    Eclipse的LogCat总是自动清空怎么办?
    怎么给Unity写一个原生的插件
    一句话、一张图记住Activity和Fragment之间错综复杂的生命周期关系
    2015年工作中遇到的问题:131-140(有图才有真相)
    2015年工作中遇到的问题:131-140(有图才有真相)
    雷观(二十四):谈谈我对国家事务“二胎”和“教育”的一些看法
  • 原文地址:https://www.cnblogs.com/xibuhaohao/p/11721187.html
Copyright © 2011-2022 走看看