zoukankan      html  css  js  c++  java
  • Python MongoDB 合表

    一、原始表结构

    1、imsi表

    MongoDB Enterprise > db.trs_action_dzwl_zm.findOne()
    {
            "_id" : {
                    "imsi" : "460029380018855",
                    "start_time" : "2019-03-13 15:37:07"
            },
            "site_address" : "织里-大港路与G318交叉口",
            "xnetbar_wacode" : "EG-MIX-WL-4C-006",
            "imei" : "000000052052052",
            "device_longitude" : "120.275424",
            "device_latitude" : "30.838656",
            "tmsi" : "1552462627",
            "rssi" : "140",
            "band" : "40",
            "plmn" : "46000",
            "tel_number" : "1595028",
            "device_name" : "织里-大港路与G318交叉口-4G",
            "vendor_name" : "南京森根",
            "province" : "江苏省",
            "city" : "盐城市"
    }

    2、car表

    MongoDB Enterprise > db.trs_action_car_info.findOne()
    {
            "_id" : {
                    "license_number" : "苏A39NX7",
                    "start_time" : "2019-05-16 23:03:13"
            },
            "site_address" : "湖织大道-香圩桥东侧",
            "site_location_id" : "",
            "unlawful_act" : "",
            "driving_direct" : "其它",
            "lane_id" : "001",
            "netbar_wacode" : "904",
            "license_color" : "002",
            "photo_cnt" : "",
            "monitor_type" : "卡口式监控",
            "photo_path" : "/pic?did=12ffaa00-78a3-1037-921c-54c4150760be&bid=486472&pid=4294966623&ptime=1558018994",
            "speed" : "0",
            "stat" : "0",
            "vehicle_brand1" : "0",
            "vehicle_brand2" : "0",
            "car_length" : "",
            "car_color" : "其它颜色",
            "shade" : "000",
            "car_type" : "轿车",
            "license_type" : "92式民用车",
            "vehicle_feature_path" : "",
            "device_name" : "湖织大道-香圩桥东侧",
            "monitor_direct" : "未知",
            "lane" : "001",
            "device_longitude" : "120.308512",
            "device_latitude" : "30.881026",
            "site_name" : "湖织大道-香圩桥东侧",
            "road_segment_direct" : "未知",
            "site_longitude" : "120.308512",
            "site_latitude" : "30.881026"
    }

    3、face表

    MongoDB Enterprise > db.trs_action_face_info.findOne()
    {
            "_id" : {
                    "pid" : "0120_1561570383884_d61beb5b9e644ed081f4ffc5e362ece7",
                    "start_time" : "2019-06-13 12:32:59"
            },
            "site_address" : "融泰宾馆",
            "img_mode" : "",
            "obj_img_url" : "/pic?=d4=i778z096as091-706105m6ep=t1i5i*d1=*ipd7=*9s8=42b8i2d05*717540c14-a563e27-1579*d-d0i806d8e42",
            "quality_score" : "0.883593",
            "netbar_wacode" : "33052802001310942740",
            "device_name" : "融泰宾馆",
            "device_longitude" : "120.262211",
            "device_latitude" : "30.841749",
            "age" : "",
            "gender" : "1",
            "race" : "",
            "beard" : "",
            "eye_open" : "",
            "eye_glass" : "",
            "sun_glass" : "1",
            "mask" : "",
            "mouth_open" : "",
            "smile" : "1",
            "similarity" : "0.97059",
            "image_id" : "0120_1561570383884_d61beb5b9e644ed081f4ffc5e362ece7",
            "bkg_url" : "/pic?=d4=i778z096as091-706105m6ep=t1i5i*d1=*ipd7=*9s8=42b8i2d05*717540c14-a563e27-1579*d-d0i806d8e42"
    }

    4、MAC表

    二、合表后collectionsitetime结构

    要求:将imsi、car、face、MAC(MAC暂时不合)四张表,将表中一些关键字段提取出来

    1)以站点

    2)以两分钟为间隔

    3)一个document中,两分钟内最多只存200个关键数据

    MongoDB Enterprise > db.collecsites.findOne()
    {
            "_id" : ObjectId("5e159ef831d840f9482b2adc"),
            "timeline" : "2019-03-13 15:34:00",
            "site" : "织里-大港路与G318交叉口",
            "face" : [ ],
            "lpn" : [ ],
            "mac" : [ ],
            "nsamples" : 200,
            "imsi" : [
                    {
                            "start_time" : "2019-03-13 15:35:56",
                            "imsi" : "460078995442766"
                    },
                    {
                            "start_time" : "2019-03-13 15:35:56",
                            "imsi" : "460006254007976"
                    }
            ]
    }

    三、开发脚本

    1、使用到python模块

    from multiprocessing import Pool(进程池)

    from pymongo import MongoClient(python连接mongodb驱动)

    import pandas as pd(将一段时间划分为多个时间段,本例子以2分钟一个时间段)

    2、脚本

    1)连接mongodb的脚本

    [root@mongodb07 python3]# cat mongodbclient.py
    #coding=utf-8
    from multiprocessing import Pool
    import os, time, random
    import json
    from datetime import datetime
    from pymongo import MongoClient
    import sys
    import datetime
    class Database(object):
        def __init__(self, address, port, database):
            self.conn = MongoClient(host=address, port=port)
            self.db = self.conn[database]
        def get_state(self):
            return self.conn is not None and self.db is not None
        def insert_one(self, collection, data):
            if self.get_state():
                ret = self.db[collection].insert_one(data)
                return ret.inserted_id
            else:
                return ""
        def insert_many(self, collection, data):
            if self.get_state():
                ret = self.db[collection].insert_many(data)
                return ret.inserted_id
            else:
                return ""
        def update(self, collection, data):
            # data format:
            # {key:[old_data,new_data]}
            data_filter = {}
            data_revised = {}
            for key in data.keys():
                data_filter[key] = data[key][0]
                data_revised[key] = data[key][1]
            if self.get_state():
                return self.db[collection].update_many(data_filter, {"$set": data_revised}).modified_count
            return 0
        def updateOne(self, collection, data_filter,data_revised):
            if self.get_state():
                return self.db[collection].update(data_filter,data_revised,True)
            return 0

        def find(self, col, condition, column=None):
            if self.get_state():
                if column is None:
                    return self.db[col].find(condition)
                else:
                    return self.db[col].find(condition, column)
            else:
                return None
        def aggregate(self, col, condition):
            if self.get_state():
                options = {'allowDiskUse':True}
                result=self.db[col].aggregate(condition,**options)
                return result
            else:
                return None

        def delete(self, col, condition):
            if self.get_state():
                return self.db[col].delete_many(filter=condition).deleted_count
            return 0
        def close_connect(self):
            self.conn.close()
            #return 'mongo连接已关闭'
    2)对mongodb中collection做实际操作的脚本
    [root@mongodb07 python3]# cat collection_curd.py
    #coding:utf-8
    from multiprocessing import Pool
    import os, time, random
    import json
    from datetime import datetime
    from pymongo import MongoClient
    import sys
    import datetime
    import mongodbclient
    import pandas as pd
    def max_number(num1,num2,num3):   ##获取最大值
        max_num=max(num1,num2,num3)
        return max_num
    def site_cursor_to_list(myresult,colum):  ##将mongodb输出的cursor转换为python的list
        sitelist=[]
        for i in myresult:
            sitelist.append(i[colum])
        return sitelist
    def list_Duplicate_removal(inlist):   ##去除重复值
        outlist=list(set(inlist))
        return outlist
    def get_time_interval(str_start_time,str_end_time):  ##以2分钟为单位,将输入的时间范围切分
        time_interval=pd.date_range(str_start_time, str_end_time,freq='2 Min')
        return time_interval
    def get_site(collection_name,str_start_time,str_end_time):  ##获取2分钟内imsi/face/lpn/mac的站点名称
        db = mongodbclient.Database("172.16.102.15", 27017, "idpad_zl")
        myresult=db.find(collection_name, {"_id.start_time":{ "$gte":str_start_time,"$lt":str_end_time}})
        db.close_connect()
        return site_cursor_to_list(myresult,"site_address")
    def get_site_data(collection_name,str_start_time,str_end_time,site,colums):  ##根据条件:2分钟的起止时间、站点名、集合名、字段名,获取所需数据
        db = mongodbclient.Database("172.16.102.15", 27017, "idpad_zl")
        myresult=db.find(collection_name, {"_id.start_time":{ "$gte":str_start_time,"$lt":str_end_time},"site_address":site},colums)
        db.close_connect()
        return myresult

    def sitetime_insert(collection_name,site,str_start_time,imsi_sitetime,face_sitetime,car_sitetime,mac_sitetime):  ##将数据插入集合
        db = mongodbclient.Database("172.16.102.15", 27017, "idpad_zl")
        db.insert_one(collection_name,{"site":site,"timeline":str_start_time,"nsamples":200,"imsi":imsi_sitetime,"face":face_sitetime,"lpn":car_sitetime,"mac":mac_sitetime})
        db.close_connect()
    def sitetime_updateOne(collection_name,site,str_start_time,key,value):   ##将数据更新到集合中
        db = mongodbclient.Database("172.16.102.15", 27017, "idpad_zl")
        db.updateOne(collection_name,{"site":site,"timeline":str_start_time,"nsamples":200,key:[]},{"$set":{key:value}})
        db.close_connect()
    #def sit_colse():
    #    db.close_connect()
     
    3)操作脚本
    [root@mongodb07 python3]# cat collection_insert.py
    #coding:utf-8
    from multiprocessing import Pool
    import os, time, random
    import json
    from datetime import datetime
    from pymongo import MongoClient
    import sys
    import datetime
    import mongodbclient
    import pandas as pd
    import collection_curd as curd
    from multiprocessing import Pool
    #update_exec(imsi_outlen_flo,"collecsites",'imsi',imsidata,imsi_outlen_int,imsi_max_len)
    def update_exec(type_outlen_flo,collectionname,site,str_start_time,typelist,datalist,type_outlen_int,type_max_len):
        if type_outlen_flo <=1.0:
            curd.sitetime_updateOne(collectionname,site,str_start_time,typelist,datalist)
        else:
            for x in range(type_outlen_int+1):
                if x==type_outlen_int:
                    curd.sitetime_updateOne(collectionname,site,str_start_time,typelist,datalist[x*200:type_max_len])
                    #print(typelist)
                else:
                    curd.sitetime_updateOne(collectionname,site,str_start_time,typelist,datalist[x*200:(x+1)*200])
                    #print(typelist)
    def data_exec(nums,time_interval):
        #start = time.time()
        #print("start_time : ",start)
        #time_interval=curd.get_time_interval('20190310','20191230')
        #for i in range(len(time_interval)-1):  ##从时间切片中,选取每一个切片时间段
        #print("start : ",nums)
        str_start_time = datetime.datetime.strftime(time_interval[nums],'%Y-%m-%d %H:%M:%S')  ##时间切片,每个切片的开始时间
        str_end_time = datetime.datetime.strftime(time_interval[nums+1],'%Y-%m-%d %H:%M:%S')  ##时间切片,每个切片的结束时间
        #print(str_start_time,'   ',str_end_time)
        #print("########################")
        #time.sleep(5)
        #exit()
        #sitelist=[]
        myresult_imsi_sit=curd.get_site("trs_action_dzwl_zm",str_start_time,str_end_time) ##获取2分钟内imsi的站点名称,并将站点名带入下面的循环
        myresult_car_sit=curd.get_site("trs_action_car_info",str_start_time,str_end_time) ##获取2分钟内car的站点名称,并将站点名带入下面的循环
        myresult_face_sit=curd.get_site("trs_action_face_info",str_start_time,str_end_time) ##获取2分钟内face的站点名称,并将站点名带入下面的循环
        myresult=myresult_imsi_sit+myresult_car_sit+myresult_face_sit
        #print(myresult)
        myresult=curd.list_Duplicate_removal(myresult) ##获取去重后的所有站点
        #print(myresult)
        #exit()
        if not myresult:
            pass
        else:
            for i in range(len(myresult)):
                site=myresult[i]
                #print(site)
                my_imsi_site_data=curd.get_site_data("trs_action_dzwl_zm",str_start_time,str_end_time,site,{"_id"}) ##获取这个站点、这段时间内的数据imsi
                my_car_site_data=curd.get_site_data("trs_action_car_info",str_start_time,str_end_time,site,{"_id"}) ##获取这个站点、这段时间内的数据car
                my_face_site_data=curd.get_site_data("trs_action_face_info",str_start_time,str_end_time,site,{"_id"}) ##获取这个站点、这段时间内的数据face
                imsidata=curd.site_cursor_to_list(my_imsi_site_data,"_id")
                cardata=curd.site_cursor_to_list(my_car_site_data,"_id")
                facedata=curd.site_cursor_to_list(my_face_site_data,"_id")
                #print(imsidata)
                imsi_outlen_int=len(imsidata)/200
                imsi_outlen_flo=len(imsidata)/200.0
                car_outlen_int=len(cardata)/200
                face_outlen_int=len(facedata)/200
                car_outlen_flo=len(cardata)/200.0
                face_outlen_flo=len(facedata)/200.0
                car_max_len=len(cardata)
                face_max_len=len(facedata)
                imsi_max_len=len(imsidata)
                #print("car_max_len:",car_outlen_int," ","face_max_len:",face_outlen_int," ","imsi_max_len:",imsi_outlen_int)
                max_mod_200=max(imsi_outlen_int,car_outlen_int,face_outlen_int)+1
                #print(max_mod_200)
                if imsi_outlen_flo>imsi_outlen_int or car_outlen_flo>car_outlen_int or face_outlen_flo>face_outlen_int:
                    for i in range(max_mod_200):
                        curd.sitetime_insert("collecsites",site,str_start_time,[],[],[],[])
                else:
                    for i in range(max_mod_200-1):
                        curd.sitetime_insert("collecsites",site,str_start_time,[],[],[],[])
                update_exec(imsi_outlen_flo,"collecsites",site,str_start_time,'imsi',imsidata,imsi_outlen_int,imsi_max_len)
                update_exec(car_outlen_flo,"collecsites",site,str_start_time,'lpn',cardata,car_outlen_int,car_max_len)
                update_exec(face_outlen_flo,"collecsites",site,str_start_time,'face',facedata,face_outlen_int,face_max_len)
                #print(site)
                #exit()
                #curd.sit_colse
        #def update_exec(type_outlen_flo,collectionname,site,str_start_time,typelist,datalist,type_outlen_int,type_max_len):
        #end = time.time()
        #print("end_time : ",end)
        #print('ALL Insert Task runs %s(ms).' % ((end - start)*1000))

    if __name__ == '__main__':
        start = time.time()
        p=Pool(30)
        #print("start_time : ",start)
        time_interval=curd.get_time_interval('20190310','20191230')
        for i in range(len(time_interval)-1):  ##从时间切片中,选取每一个切片时间段
            #print(i)
            #res=p.apply_async(data_exec,args=(i,))
            result=p.apply_async(data_exec, args=(i,time_interval))
        p.close()
        p.join()
        end = time.time()
        print("end_time : ",end)
        print('ALL Insert Task runs %s(ms).' % ((end - start)*1000))
     

    四、开发中遇到的问题

    1、如何将一段时间按照两分钟进行划分
    2、实例化mongodb连接后,在脚本运行中,连接如何close
    3、进程线程池的使用
  • 相关阅读:
    Excel 如何复制粘贴一整行
    如何修改文件的扩展名(后缀)
    中文乱码之myEclipse项目导入时中文乱码(待)
    如何在java中导入jar包
    如何在myEclipse中创建配置文件,比如:XXX.properties
    Postman安装教程
    API是什么?——回答:接口。(待)
    找回J2EE 之再学习打卡记录
    让外界可以访问电脑上的网站的几种方式——花生壳,域名,IIS(待)
    18、任务暂停挂起
  • 原文地址:https://www.cnblogs.com/xibuhaohao/p/12167940.html
Copyright © 2011-2022 走看看