zoukankan      html  css  js  c++  java
  • python读取kafka,输出到Vertica数据库

    # 主测试
    # https://docs.python.org/2/library/json.html
    import sys
    import json
    import vertica_python
    import time
    import os
    from pykafka import KafkaClient  # 导入的vertica_python和pykafka包需要pip install安装
    
    # 显示当前时间
    print('开始时间', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
    print(sys.getdefaultencoding())
    client = KafkaClient(hosts="192.168.1.1:9092")  # 填写kafka地址和端口,一般是9092端口
    # client.topics  # 查看所有topic
    topic = client.topics[b'topic']  # 选择一个topic
    consumer = topic.get_simple_consumer(consumer_timeout_ms=2000, auto_commit_enable=1)  # 等待5秒无新数据,退出
    data_group = []
    conn_info = {'host': '192.168.1.1', 'port': 1, 'user': 'a', 'password': 'b',
                 'database': 'c', 'read_timeout': 600, 'unicode_error': 'strict', 'ssl': False}  # 填写数据库连接信息
    # simple connection, with manual close
    connection = vertica_python.connect(**conn_info)
    cur = connection.cursor()
    a_error_count = 0
    a_success_count = 0
    path_os = os.path.abspath('offset.txt') # 将数据偏移量offset写入文件
    f1 = open(path_os, 'r', encoding='utf8')
    a_offset_start = int(f1.readline())  # 从a_offset_start开始读数据
    print(a_offset_start)
    # a_offset_start = 3000 # 可以手工指定从哪里开始读取数据,排错用
    f1.close()
    for message in consumer:  # 循环0
        if message is not None and message.offset > a_offset_start:
            try:
                a = message.value.decode('UTF-8')
                data_group.append(json.loads(a))
                c = message.offset
                for item in data_group:
                    str1 = "insert into 表名(列名) values "+ "('" + str(c)  # 将offset值也写入数据库 
              + "'," + "'%s','%s'); " % ( item['列名1'], item['列名2']) print(str1) cur.execute(str1) connection.commit() a_success_count += 1 data_group.pop() except: print('error_message') a_error_count += 1 continue c1 = message.offset f = open(path_os, 'w+' , encoding='utf8') f.truncate() f.write(str(c1)) f.write(' ' + '开始时间=' + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) f.write(' ' + 'a_success_count=' + str(a_success_count)) f.write(' ' + 'a_error_count=' + str(a_error_count)) f.close()
  • 相关阅读:
    1012 最大公约数和最小公倍数问题 2001年NOIP全国联赛普及组
    数论笔记
    Codevs 1200 同余方程 2012年NOIP全国联赛提高组
    Codevs 1213 解的个数(exgcd)
    Qbxt 模拟赛&&day-8
    NOIP 模拟赛 那些年,我们学过的文化课 --致已退役的fqk神犇.
    OI路上 day -9
    P3178 [HAOI2015]树上操作
    P3979 遥远的国度
    P4092 [HEOI2016/TJOI2016]树
  • 原文地址:https://www.cnblogs.com/castlevania/p/6428213.html
Copyright © 2011-2022 走看看