zoukankan      html  css  js  c++  java
  • mongo 增量同步之 MongoShake(3) kafka python处理kafka oplog mongoUtils

    场景:  采用 MongoShake 同步数据到kafka, 然后用python 处理,  做etl , 或者其他操作:

    # 同步 db_sea_user  ,  db_tom_user   to db_all_user    (mongodb) , 并且添加db_name(customer)

     代码: 

    MongoOplogHandler.py
    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    # @Time    : 9/27/21 10:48 AM
    # @Author  : Sea
    # @File    : MongoOplogHandler.py
    # @Software: IntelliJ IDEA
    # #pip install kafka-python
    # ******************************
    from kafka.consumer.group import KafkaConsumer
    from backports.configparser.helpers import str
    import json
    
    from MongoUtils import MongoUtils
    
    topic = "mgtest1"
    bootstrap_servers = "192.168.18.51:9092"
    group_id = 'etl'
    mongo_utils = MongoUtils()
    
    
    def start_kafkaListener():
        consumer = KafkaConsumer(topic, bootstrap_servers=[bootstrap_servers],
                                 group_id=group_id,
                                 auto_offset_reset='earliest',
                                 enable_auto_commit=False)
        print("=============+++ start consumer +++=========== ")
        for msg in consumer:
            # recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
            recv = "topic:%s  partition:%d    offset:%d" % (msg.topic, msg.partition, msg.offset)
            print(recv)
            # tb_value = msg.value.decode("utf-8").replace("true", "True").replace("false", "False").replace("null", "None")
            etl(msg.value)
            consumer.commit()
    
    
    def etl(value):
        value_dict = json.loads(value)
        print(type(value_dict))
        # print(value_dict)
        if value_dict:
            # i:insert u:update   d:delete(only get _id)
            op = value_dict["op"]
            # db.table
            ns = value_dict["ns"]
            # ######filter some db or table which we need do #####
            db_tb = str(ns).split('.')
            db = db_tb[0]
            tb = db_tb[1]
            # if db == 'sea_user' or db == 'tom_user' or tb =='config':
            if '_user' in db:
                # etl tab_value
                tb_values = value_dict["o"]
                tb_values = json_json(tb_values)
                handler_result(op, ns, tb_values)
    
    
    # swich case
    def handler_result(op, ns, value):
        todo = {
            "i": insert,
            "u": update,
            "d": delete
        }
        method = todo.get(op, other)
        if method:
            method(ns, value)
    
    
    def other(ns, value):
        print("ns:" + ns + " v:" + str(value))
        print("other")
    
    
    def update(ns, value):
        print("ns:" + ns + " v:" + str(value))
        print("update")
        db_tb = str(ns).split('.')
        db = db_tb[0]
        tb = db_tb[1]
        # if db == 'sea_user' or db == 'tom_user':
        if '_user' in db:
            value['db'] = db
            mongo_utils.save_or_update_dict_by_id(tb, value, 'all_user')
    
    
    def insert(ns, value):
        print("ns:" + ns + " v:" + str(value))
        print("insert")
        db_tb = str(ns).split('.')
        print("**********************************" + str(db_tb))
        db = db_tb[0]
        tb = db_tb[1]
        # if db == 'sea_user' or db == 'tom_user':
        if '_user' in db:
            value['db'] = db  # add  from
            mongo_utils.save_or_update_dict_by_id(tb, value, 'all_user')
    
    
    def delete(ns, value):
        print("ns:" + ns + " v:" + str(value))
        print("delete")
        db_tb = str(ns).split('.')
        db = db_tb[0]
        tb = db_tb[1]
        # if db == 'sea_user' or db == 'tom_user':
        if '_user' in db:
            mongo_utils.delById(tb, value['_id'])
    
    
    '''
    k:节点name
    v:节点value
    cn:当前节点
    '''
    
    
    def setNode(k, v, cn):
        pass
        if type(v) == list:  # 无论是map还是list,都是list封装
            # get type
            if v:
                v_type = type(v[0])  # 子节点的type
                if v_type == dict:  # dict
                    c_value = {}
                    for v1 in v:
                        k2 = v1["Name"]
                        v2 = v1["Value"]
                        # v_value[v1["Name"]] = v1["Value"]
                        setNode(k2, v2, c_value)
                    cn[k] = c_value
                if v_type == list:  # list
                    c_value_dict = {}  # list_dict
                    c_value_list = []  # list list
                    for v2 in v:  # list 子节点可能为list,和 dict
                        if type(v2) == dict:
                            # v2 is dict
                            k2 = v2["Name"]
                            v2 = v2["Value"]
                            setNode(k2, v2, c_value_dict)
                        if type(v2) == list:
                            # # TODO if list in dict,here just add it
                            # c_value_list.append(v2)
                            pass
                            set_list_list(v2, c_value_list)
                    if c_value_list:
                        cn[k] = c_value_list
                    if c_value_dict:
                        cn[k] = c_value_dict
                if (not v_type == list) and (not v_type == dict):  # 子节点的节点为普通类型
                    cn[k] = v
        else:  # 普通节点
            cn[k] = v
    
    
    '''
    nodelist: list 节点
    cnode: 当前节点_list
    '''
    
    
    def set_list_list(nodelist, cnode):
        c_value_dict = {}  # list_dict
        c_value_list = []  # list list
        c_value_other = []  # list other
        for v2 in nodelist:  # list 子节点可能为list,和 dict
            if type(v2) == dict:  # list dict
                # v2 is dict
                k2 = v2["Name"]
                v2 = v2["Value"]
                setNode(k2, v2, c_value_dict)
            if type(v2) == list:  # list list
                set_list_list(v2, c_value_list)
            if (not type(v2) == list) and (not type(v2) == dict):
                c_value_other.append(v2)
        if c_value_list:
            cnode.append(c_value_list)
        if c_value_dict:
            cnode.append(c_value_dict)
        if (not c_value_dict) or (not c_value_list):
            cnode.append(c_value_other)
    
    
    '''
    dict to dict
    '''
    
    
    def json_json(tb_value_json_dict):
        etl_dict = {}
        for kv in tb_value_json_dict:
            setNode(kv["Name"], kv["Value"], etl_dict)
        return etl_dict
    
    
    """
        json_str to python dict or list
    """
    
    
    def json_to_json(tb_value_json_str):
        etl_dict = {}
        etl_list = []
        tb_value_json = json.loads(tb_value_json_str)
        if type(tb_value_json) == list:
            for tb_value_dict in tb_value_json:
                node_dict = {}
                for kv in tb_value_dict:
                    setNode(kv["Name"], kv["Value"], node_dict)
                if node_dict:
                    etl_list.append(node_dict)
            return etl_list
        if type(tb_value_json) == dict:
            for kv in tb_value_json:
                setNode(kv["Name"], kv["Value"], etl_dict)
            return etl_dict
    
    
    if __name__ == '__main__':
        start_kafkaListener()
    MongoUtils:
    import pymongo
    from bson import ObjectId
    class MongoUtils:
    
        def __init__(self):
            # host = MONGO_CONFIG['MONGODB_HOST']
            # port = MONGO_CONFIG['MONGODB_PORT']
            # dbname = MONGO_CONFIG['MONGODB_DBNAME']
            # mongo_user = MONGO_CONFIG['MONGODB_USER']
            # mongo_password = MONGO_CONFIG['MONGODB_PASSWORD']
            # client = pymongo.MongoClient(host=host, port=port)
            self.client = pymongo.MongoClient("mongodb://root:root@192.168.18.176:27019/?authSource=admin&replicaSet=rs&readPreference=primary&appname=MongoDB%20Compass&ssl=false")
            # 指向指定的数据库
            # self.mdb = client["all_user"]
            # self.mdb.authenticate(mongo_user, mongo_password)
            # 获取数据库里存放数据的表名
            # self.post = mdb["categoryname"]
    
        def insert_json_str(self, table, str_json_data, db='all_user'):
            mdb = self.client[str(db)]
            post = mdb[str(table)]
            post.insert(dict(str_json_data))
    
        def insert_dict(self, table, dicts, db='all_user'):
            mdb = self.client[str(db)]
            post = mdb[str(table)]
            post.insert_one(dicts)
    
        def save_or_update_dict_by_id(self, table, dicts, db='all_user'):
            mdb = self.client[str(db)]
            post = mdb[str(table)]
            post.find_one_and_replace({"_id": dicts['_id']}, dicts, upsert=True)
            # post.update_one({"_id":"1111111"},{'$set':{"s1sss":"12"}},upsert=True)
    
        def drop_table(self, table, db='all_user'):
            mdb = self.client[str(db)]
            post = mdb[str(table)]
            post.drop()
    
    
        def find(self,table, query={}, db='all_user'):
            """ query = {'name':'sea'}  return list """
            mdb = self.client[str(db)]
            tb = mdb[str(table)]
            return tb.find(query)
    
        def findone(self, table, query={}, db='all_user'):
            """ query = {'name':'sea'}  return list """
            mdb = self.client[str(db)]
            tb = mdb[str(table)]
            return tb.find_one(query)
    
        def delById(self, table, id, db='all_user'):
            mdb = self.client[str(db)]
            tb = mdb[str(table)]
            tb.delete_one({"_id": id})
    
        def __del__(self):
            self.client.close()
    
    
    
    if __name__ == '__main__':
        mongo_utils = MongoUtils()
        # user_dict = {"name": "sea"}
        # # mongo_utils.insert_dict("user",user_dict)
        # # mongo_utils.delById("user",'616f7befdcc2ceea4c6f1559')
        # findone = mongo_utils.findone("user", {"_id": ObjectId("616f7e1fcee92cff1561cd92")})
        # print(findone)
        # mongo_utils.delById('user',ObjectId("616f7e1fcee92cff1561cd92"))
        mongo_utils.client[str('all_user')][str('user')].find_one_and_replace({"_id":"1232131"},{"_id":"1232131","s1sss":"1"},upsert=True)
        ## save or update ##
        # mongo_utils.client[str('all_user')][str('user')].update_one({"_id":"1111111"},{'$set':{"s1sss":"12"}},upsert=True)
  • 相关阅读:
    PHP 数据类型
    PHP SAPI
    PHP 基础架构
    PHP7的变化
    mysql 选择优化的数据类型需要注意的几点
    彻底删除在github上提交的文件
    php7 新特性
    php缓冲区 一些笔记
    设计模式 一些概念
    mysql性能优化其中一部分小结
  • 原文地址:https://www.cnblogs.com/lshan/p/15344028.html
Copyright © 2011-2022 走看看