zoukankan      html  css  js  c++  java
  • 存储Kafka消息[Python版]

    前段时间,由于业务需要对Kafka消息从两方面进行存储,一方面离线分批存储,另一方面在线分布式存储。在存储Kafka消息前,需要了解Kafka工作原理

    1 离线分批存储

    将消费到的Kafka数据打包分批存储到本地

    from pykafka import KafkaClient
    import json
    import time
    
    client = KafkaClient(hosts="xx.xx.xx.xx:9092,xx.xx.xx.xx:9092,xx.xx.xx.xx:9092")
    topic = client.topics['test']
    consumer = topic.get_simple_consumer(
        consumer_group="logGroup",
        reset_offset_on_start=True
    )
    
    review_list = []
    
    while True:
        for message in consumer:
            if message is not None:
                #print(message.offset, message.value)
                #print(message.value)
                #print(type(message.value)) #<class 'bytes'>
                message.value = str(message.value).strip('b').strip("'")
                print(type(message.value)) #<class 'str'>
                message.value = message.value.replace("\n", "") #去除
    
    
                message.value = eval(message.value)
                print(type(message.value)) #<class 'dict'>
    
                print(message.value)            
                review_list.append(message.value)
                #print(review_list)
                if len(review_list) == 20000:
                    now = time.strftime("%Y_%m_%d-%H_%M_%S",time.localtime(time.time()))
                    print(now)
                    fname="/lujing"+now+r".json"
                    with open(fname, 'w') as f:
                        
                        json.dump(review_list, f)
                    
                    review_list = []
                    print(len(review_list)==20000)
                    now = time.strftime("%Y_%m_%d-%H_%M_%S",time.localtime(time.time()))
                    print(now)
    
                    continue
    

    2 在线分布式存储

    直接将消费到的Kafka消息写入HBase

    # -*- coding: utf-8 -*-
    """
    Created on Tue Nov 5 14:12:09 2019
    
    @author: yoyoyo
    """
    
    
    from kafka import KafkaConsumer
    import happybase
    import json
    import time
    
    hbase_ip='localhost'
    hbase_port=9090
    ip = hbase_ip
    port = hbase_port
    pool = happybase.ConnectionPool(size=1, host=ip) # create connection
    
    # table insert data
    def hbase_load(tableName, lists):
        with pool.connection() as connection:
            connection.open()    #start transport
        if tableName not in str(connection.tables()):
            create_table(connection, tableName)
        table = connection.table(tableName)
        b = table.batch(batch_size=1024) # batchs inserts data
        for li in lists:
            try:
                rowkey = li['xxx']
                data_dicts = {}
                for d, x in li.items():
                    key = "timeStamp:" + d
                    value = str(x)
                    data_dicts[key] = value
                    b.put(row=rowkey, data=data_dicts)
                    b.send()
                    print("rowkey:" + rowkey + " data append success")
            except Exception as ex:
                print(str(ex) + " data append failure.")
        connection.close()
    
    # create table
    def create_table(conn, table):
        try:
            conn.create_table(
                table,
                {
                    "timeStamp": dict(max_versions=10)
                }
            )
        except Exception as ex:
            print(str(ex) + " table exists.")
    # print log
    def log(str):
        t = time.strftime(r"%Y-%m-%d %H:%M:%S", time.localtime())
        print("[%s]%s" % (t, str))
    lst = []
    log('start consumer')
    consumer = KafkaConsumer('test', group_id='logGroup', bootstrap_servers=['xx.xx.xx.xx:9092']) # HBase集群master ip
    for msg in consumer:
        recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
        log(recv)
        dict_data = json.loads(msg.value, strict = False)
        dict_data['xxx'] = str(dict_data['xxx'])+'-'+dict_data['xxx']
        lst.append(dict_data)
        hbase_load('kafka2Hbase_test', lst)
    

    服务器调试在线分布式存储时报错处理

    错误1 连接Hbase出现的错误

    thriftpy2.transport.TTransportException: TTransportException(type=1, message=“Could not connect to (‘localhost’, 9090)”
    

    原因分析
    Hbase是用Java写的,原生地提供了Java接口,如果用其他语言,需要开启连接原生提供的thrift接口服务器。也就是python使用HappyBase操作HBase需要先开启thrift接口服务器。

    解决办法并查看进程

    (base) root@master:/opt/hbase/hbase-1.2.10/bin# hbase-daemon.sh start thrift2
    
    (base) root@master:~# jps
    3266 QuorumPeerMain
    4772 HMaster
    996 Kafka
    4102 ResourceManager
    4231 NodeManager
    11850 ThriftServer
    12048 Jps
    3890 SecondaryNameNode
    3703 DataNode
    3578 NameNode
    

    错误2 loads时报错
    JSONDecodeError: invalid control character
    解决办法:json.loads(json, strict=False) #关掉strict即可

  • 相关阅读:
    video兼容ie,ckplayer网页播放器
    边框在2个边,不重叠不接触的情况下是梯形。
    【Unity】关于屏幕自适应的思路
    【Unity】鼠标指向某物体,在其上显示物体的名字等等等等信息
    【C#】关于左移/右移运算符的使用
    【Unity】鼠标点选物体
    Python time和datetime模块
    Python 模块之间的调用
    SaltStack 使用pillar安装配置管理zabbix
    SaltStack 实践课程一
  • 原文地址:https://www.cnblogs.com/eugene0/p/12548121.html
Copyright © 2011-2022 走看看