zoukankan      html  css  js  c++  java
  • pika的阻塞式使用

    [root@cloudplatform ELK]# cat startIncHouTai.py

    import os
    
    # 杀掉内存中的进程 
    cmd='pgrep -f  PutDataToKafkaInc.py  | xargs kill -9 '
    os.system(cmd)
    
    # 后台启动进程
    cmd='nohup /usr/bin/python3 -u /usr/local/software/ELK/PutDataToKafkaInc.py >nohup.log 2>&1 &'
    os.system(cmd)
    
    print('增量脚本启动成功')

     增量.py

    import os
    import sys
    
    # pip install kafka-python
    sys.path.append("/usr/local/software/ELK")
    from Util.DateEncoder import *
    from kafka import KafkaProducer
    from Util.TextUtil import *
    from Util.MySQLHelper import *
    from Util.GetAreaPartionIdUtil import *
    from Util.RabbitMqUtil import *
    
    # 区域码
    AreaCode = mysql_AreaCode
    MemoryDict = []
    
    
    # 处理某张表中ID的数据变更
    def doAction(sql):
        while True:
            try:
                dt = db.query(sql)
                if len(dt) > 0:
                    # 将字段大写转为小写
                    for row in dt:
                        new_dics = {}
                        for k, v in row.items():
                            new_dics[k.lower()] = v
                    jstr = json.dumps(new_dics, cls=DateEncoder)
                    logInfo('准备上报kafka,数据:'+jstr)
                    # 声明Kafka生产者
                    producer = KafkaProducer(bootstrap_servers=kafka_servers)
                    producer.send(topic=topicName, partition=partitionId, value=jstr.encode('utf-8'))
                    # 提交一下
                    producer.flush()
                    # 关闭kafkaProducer
                    producer.close()
                    logInfo('成功完成上报工作!')
                    break
            except Exception as err:
                logInfo('是不是没有连接上kafka??将休息3秒...'+str(err))
                time.sleep(3)
    
    
    # 通过表名,找到对应的配置文件,读取这个表的PK是什么
    def GetPk(table):
        file = os.getcwd().replace('\', '/') + '/Sql/' + table + '.json'
        if os.path.exists(file):
            jsonStr = ReadContent(file)
            obj = json.loads(jsonStr)
            # 主键
            pk = obj['pk']
            return pk
        else:
            logInfo('文件:' + file + "不存在,程序无法继续!")
            sys.exit()
    
    
    # 通过表名和PK的真实值,拼接出SQL语句
    def GetSql(table, id):
        file = os.getcwd().replace('\', '/') + '/Sql/' + table + '.json'
        if os.path.exists(file):
            jsonStr = ReadContent(file)
            obj = json.loads(jsonStr)
            # 主键
            pk = obj['pk']
            sql = str(obj['sql']).replace('>', '=')
            sql = sql.replace("#area_code#", AreaCode).replace("order by t1.#pk#", "").replace("order by t2.#pk#",
                                                                                               "").replace(
                "order by t3.#pk#", "").replace("order by t4.#pk#", "").replace("order by #pk#", "").replace("#pk#",
                                                                                                             pk).replace(
                "#id#", str(id)).replace("#limit#", "")
            return sql
        else:
            logInfo('文件:' + file + "不存在,程序无法继续!")
            sys.exit()
    
    
    # 解析rabbitmq中的json数据,知道当前变化的是哪个ID
    def RabbitMqId(dataJson, pk):
        if dataJson['event'] == 'insert':
            return int(dataJson['columns'][pk]['value'])
        elif dataJson['event'] == 'update':
            return int(dataJson['before']['columns'][pk]['value'])
        else:
            logInfo('不是insert也不是update,这是不行的,程序无法执行!')
            return 0
    
    
    # 黄海定义的输出信息的办法,带当前时间
    def logInfo(msg):
        i = datetime.datetime.now()
        print(" %s            %s" % (i, msg))
    
    
    if __name__ == '__main__':
        logInfo('开始获取主机对应的PartitionId...')
        partitionId = GetAreaPartitionId()
        logInfo('成功获取主机的对应PartitionId=' + str(partitionId))
        # 队列名
        queue_Name = 'kafka_queue'
        # 交换机名
        switch_Name = 'elk_switch'
        # 声明mysql数据库
        db = MySQLHelper()
    
        # 统一的topic名称
        topicName = 'dsideal_db'
    
        while True:
            try:
                # 准备连接到Rabbitmq
                logInfo("正在连接到RabbitMQ...")
                credentials = pika.PlainCredentials(RabbitMq_User, RabbitMq_Password)
                connection = pika.BlockingConnection(pika.ConnectionParameters(RabbitMq_IP, int(RabbitMq_Port), '/', credentials))
                channel = connection.channel()
                logInfo("成功连接到RabbitMQ!正在阻塞等待消息..")
                for method_frame, properties, body in channel.consume(queue_Name):
                    # 阻塞式获取消息
                    # 将body转为json
                    logInfo("发现RabbiMQ中的消息!正在处理...")
                    dataJson = json.loads(body.decode(encoding='utf-8').lower())
                    logInfo('dataJson='+str(dataJson))
    
                    tableName = str(dataJson['table'])
                    event = str(dataJson['event'])
                    # 1、获取pk是什么字段名称
                    pk = GetPk(tableName)
                    logInfo("成功获取PK="+pk)
    
                    # 2、解析rabbitmq中的json数据,知道这个主键的对应真实值是什么?
                    id = RabbitMqId(dataJson, pk)
                    logInfo("成功获取ID="+str(id))
    
                    # 3、组装查询的sql
                    sql = GetSql(tableName, id)
                    # 4、处理这个表中ID的数据变更
                    logInfo("成功获取SQL="+sql)
                    # 成功处理
                    doAction(sql)
                    logInfo('成功处理' + tableName + '' + event + "事件一条!id=" + str(id))
    
                    # 确认收到这条消息
                    channel.basic_ack(method_frame.delivery_tag)
                    logInfo("成功设置ACK标识!")
                    break
                requeued_messages = channel.cancel()
                connection.close()
            except Exception as err:
                logInfo("发生了异常:"+str(err)+",将休息10秒...")
                time.sleep(10000)
  • 相关阅读:
    两句话的区别在于目录的不同。
    关于系统的操作全在这里了。这个看起来很重要。
    屏幕坐标的方法
    改变轴心的操作。
    关于旋转的变换处理方法。
    对其位置
    点边同事移除的办法处理。
    移动的坐标变换
    判断文件是否存在的函数。
    把节点归零处理
  • 原文地址:https://www.cnblogs.com/littlehb/p/9188847.html
Copyright © 2011-2022 走看看