zoukankan      html  css  js  c++  java
  • process_thread_action

    import psycopg2
    import threading
    
    conn_fmac = psycopg2.connect(database='filter_useless_mac', user='user', password='password', host='192.168.168.168',
                                 port='5432')
    
    
    def fetch_rows(f_l):
        r = {}
        with conn_fmac:
            with conn_fmac.cursor() as curs:
                for i in f_l:
                    # http://initd.org/psycopg/docs/faq.html
                    # The arguments in the execute() methods can only represent data to pass to the query: they cannot represent a table or field name:
                    curs.execute('SELECT detail_data FROM apiv2_single_mac_with_res WHERE mac= %s LIMIT 1 ', (i,))
    
                    # psycopg2.InternalError: current transaction is aborted, commands ignored until end of transaction block
                    try:
                        t = curs.fetchone()
                        if t is not None:
                            r[i] = {}
                            r[i] = t[0]
                    except Exception:
                        continue
        return r
    
    
    def update_rows(id, new_val):
        with conn_fmac:
            with conn_fmac.cursor() as curs:
                try:
                    curs.execute(
                        'UPDATE control_group_with_compute_res SET mac_with_res_position_lat_lon_unique_num=%s WHERE oid_timestamp=%s',
                        (new_val, id))
                    print(threading.get_ident(), 'OK')
                except Exception:
                    print(Exception)
    
    
    class MyThread(threading.Thread):
        def __init__(self, func, args, name):
            threading.Thread.__init__(self)
            self.name, self.func, self.args = name, func, args
    
        def run(self):
            self.func(self.args)
    
    
    def main():
        with conn_fmac:
            with conn_fmac.cursor() as curs:
                curs.execute(
                    'SELECT oid_timestamp,mac_with_res_position FROM control_group_with_compute_res WHERE mac_with_res_position IS NOT NULL ORDER BY oid_timestamp DESC ')
                tuple_l = curs.fetchall()
    
        tn, tl, tstep = len(tuple_l), [], 200
    
        def tf(ts):
            print(ts)
            te = ts + tstep
            te = min(te, tn)
            for i in tuple_l[ts:te]:
                oid_timestamp, mac_with_res_position = i
                n_l = []
                for k in mac_with_res_position:
                    longitude, latitude = mac_with_res_position[k]['longitude'], mac_with_res_position[k]['latitude']
                    s = '%s%s' % (longitude, latitude)
                    if s not in n_l:
                        n_l.append(s)
                n = len(n_l)
                if n > 0:
                    update_rows(oid_timestamp, n)
    
        for i in range(0, tn, tstep):
            if i >= tn:
                break
            thread_instance = MyThread(tf, (i), tf.__name__)
            tl.append(thread_instance)
    
        for t in tl:
            t.setDaemon = False
            t.start()
        for t in tl:
            t.join()
    
    
    if __name__ == '__main__':
        main()
    
    import json
    import psycopg2
    import threading
    
    conn_fmac = psycopg2.connect(database='filter_useless_mac', user='postgres', password='postgres', host='192.168.8.8',
                                 port='5432')
    
    def update_rows(id, new_val):
        with conn_fmac:
            with conn_fmac.cursor() as curs:
                try:
                    curs.execute(
                        'UPDATE control_group_with_compute_res SET add_lat_lon_to_original_res=%s WHERE oid_timestamp=%s',
                        (new_val, id))
                    print(threading.get_ident(), 'OK')
                except Exception:
                    print(Exception)
    
    
    class MyThread(threading.Thread):
        def __init__(self, func, args, name):
            threading.Thread.__init__(self)
            self.name, self.func, self.args = name, func, args
    
        def run(self):
            self.func(self.args)
    
    
    def main():
        with conn_fmac:
            with conn_fmac.cursor() as curs:
                sql = "SELECT tmp.oid_timestamp, ja.latitude, ja.longitude FROM ( SELECT oid_timestamp, detail_data ->> 'area_code' AS area_code FROM control_group_with_compute_res) tmp LEFT JOIN jmtool_areacode_longitude_latitude ja ON tmp.area_code = ja.area_code WHERE ja.area_code IS NOT NULL ORDER BY oid_timestamp ASC;"
                curs.execute(sql)
                tuple_l = curs.fetchall()
    
        tn, tl, tstep = len(tuple_l), [], 200
    
        def tf(ts):
            print(ts)
            te = ts + tstep
            te = min(te, tn)
            for i in tuple_l[ts:te]:
                oid_timestamp, lat, lon = i
                r = {}
                r['from'], r['latitude'], r['longitude'] = 'jmtool_areacode', lat, lon
                update_rows(oid_timestamp, json.dumps(r, ensure_ascii=False))
    
        for i in range(0, tn, tstep):
            if i >= tn:
                break
            thread_instance = MyThread(tf, (i), tf.__name__)
            tl.append(thread_instance)
    
        for t in tl:
            t.setDaemon = False
            t.start()
        for t in tl:
            t.join()
    
    
    if __name__ == '__main__':
        main()
    
  • 相关阅读:
    把java程序作为windows服务运行
    安装CentOS7出现dracut-initqueue timeout-starting…starting timeout scripts 解决办法
    在Linux下打包tar文件时添加密码的方法
    tk mybatis通用mapper,复杂and or条件查询
    firewalld 端口转发(机器内转发+机器间转发)
    postgresql中的序列nextval
    postgresql 建立索引
    索引面试题分析
    PostgreSQL不等于查询索引方法
    net-snmp开发中出现“Error opening specified endpoint"" ”的解决方案
  • 原文地址:https://www.cnblogs.com/rsapaper/p/7698029.html
Copyright © 2011-2022 走看看