zoukankan      html  css  js  c++  java
  • 空气质量国控站点数据插值出全国3181个城市值,利用了多线程

    #coding=utf-8
    import arcpy
    import math
    import sys
    import datetime
    import pymssql
    import json
    import os
    import time
    import uuid
    import logging
    import multiprocessing
    import random
    from arcpy import env
    from arcpy.sa import *
    
    
    
    def drawpng(date1,hour1,pullute,where):
        logging.debug(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')+"进入drawpng:"+pullute);
        #消除多进程报错
        time.sleep(1.1)
        newTempDir = r"E:	empIDWmpenvr_" + str(time.strftime('%Y%m%d%H%M%S')) + str(random.random()*10000);
        os.mkdir(newTempDir)
        os.environ["TEMP"] = newTempDir
        os.environ["TMP"] = newTempDir
    
        env.workspace = r"E:idw	if_hour";
        mapPath =r"E:idw	if_hour";
    
        cnxn =pymssql.connect(host='127.0.0.1', user='sa', password='xzs@123', database='nationAir')
        cursor = cnxn.cursor();
    
        #查询污染物数据
        sql="SELECT b.stationcode,"+pullute+" from monitor_site_hour a,hf_site b where a.site=b.stationname and a.city=b.cityname and datetime='"+date1+" "+hour1+":00:00'"+" and "+where;
        cursor.execute(sql);
        cursorData = cursor.fetchall();
        
    
        #把查询数据保存到SHP
        for item in cursorData:
            fc = "sites_"+pullute+".shp";
            where ="SITEID='"+str(item[0])+"'";
            rows = arcpy.UpdateCursor(fc,where);
            for row in rows:
                row.setValue(str(pullute),float(item[1]));
                rows.updateRow(row)
            del rows,fc;
        logging.debug(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')+"数据保存到shp完成:"+pullute);
    
        #进行差值操作
        inPointFeatures = "sites_"+pullute+".shp";
        arcpy.CheckOutExtension("Spatial")
        outSplineBarriers = Idw(inPointFeatures,pullute);
        tif ="pointraster_"+pullute+".tif";
        if os.path.exists(mapPath+r"pointraster_"+pullute+".tif"):
            #老tif文件删除
            os.remove(mapPath+r"pointraster_"+pullute+".tif")
        outSplineBarriers.save(tif);
        logging.debug(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')+"插值操作完成:"+pullute);
    
        #清理内存
        del tif,inPointFeatures,outSplineBarriers;
        cursor.close;
        cnxn.close;
        logging.debug(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')+"drawpng完成:"+pullute);
    
    def insertData(date1,hour1):
    
        env.workspace = r"E:idw	if_hour";
        mapPath =r"E:idw	if_hour";
    
        cnxn =pymssql.connect(host='127.0.0.1', user='sa', password='xzs@123', database='nationAir')
        cursor = cnxn.cursor();
        cursor1 = cnxn.cursor();
    
        sql1="SELECT  citycode,longitude,latitude FROM hf_city";
        cursor.execute(sql1);
        cursorData = cursor.fetchall();
        
        tif_so2 ="pointraster_so2.tif";
        tif_no2 ="pointraster_no2.tif";
        tif_co ="pointraster_co.tif";
        tif_o3 ="pointraster_o3.tif";
        tif_pm10 ="pointraster_pm10.tif";
        tif_pm25 ="pointraster_pm25.tif";
        
        #insert到数据库
        for item in cursorData:
            point=item[1]+" "+item[2];
            try:
                result_so2 = arcpy.GetCellValue_management(tif_so2,point);
                result_no2 = arcpy.GetCellValue_management(tif_no2,point);
                result_co = arcpy.GetCellValue_management(tif_co,point);
                result_o3 = arcpy.GetCellValue_management(tif_o3,point);
                result_pm10 = arcpy.GetCellValue_management(tif_pm10,point);
                result_pm25 = arcpy.GetCellValue_management(tif_pm25,point);
                updateSql="insert into hf_idw(citycode,datetimes,so2,no2,co,o3,pm10,pm25) values('%s','%s',%s,%s,%s,%s,%s,%s)" % (item[0],date1+" "+hour1+":00:00",result_so2.getOutput(0),result_no2.getOutput(0),result_co.getOutput(0),result_o3.getOutput(0),result_pm10.getOutput(0),result_pm25.getOutput(0));
                cursor1.execute(updateSql);
            except Exception, e:
                logging.debug(e.message);
            cnxn.commit();
                
        #清理内存
        del result_so2,result_no2,result_co,result_o3,result_pm10,result_pm25;
        cursor.close;
        cursor1.close;
        cnxn.close;
        logging.debug(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')+"insertData完成:");
    
    
    LOG_FILENAME="E:idwlog_hour.txt";
    logging.basicConfig(filename=LOG_FILENAME,level=logging.NOTSET);
    
    
    
    if __name__ == '__main__':#windows下必须加这句
    
        cnxn =pymssql.connect(host='127.0.0.1', user='sa', password='xzs@123', database='nationAir')
        cursor = cnxn.cursor();
    
        d1 = datetime.datetime.now();
        date1=d1.strftime('%Y-%m-%d');
        d3= d1 + datetime.timedelta(hours=-1);
        hour1=d3.strftime('%H');
        logging.debug(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')+"小时值7项变量绘图开始"+date1+",小时:"+hour1);
    
        checksql="SELECT count(*) FROM monitor_site_hour where datetime='"+date1+" "+hour1+":00:00'";
        cursor.execute(checksql);
        checkdata= cursor;
    
        num=0;
        for item0 in checkdata:
            num=item0[0];
            logging.debug(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')+"检查数据库数量:"+str(num));
        if num>1000:
            logging.debug(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')+"检查通过,开始计算");
    
            #开启6个线程并行计算
            pool = multiprocessing.Pool(processes = 6)
            pool.apply_async(drawpng, (date1,hour1,"so2","so2!='_'",));
            pool.apply_async(drawpng, (date1,hour1,"no2","no2!='_'",));
            pool.apply_async(drawpng, (date1,hour1,"co","co!='_'",));
            pool.apply_async(drawpng, (date1,hour1,"o3","o3!='_'",));
            pool.apply_async(drawpng, (date1,hour1,"pm10","pm10!='_'",));
            pool.apply_async(drawpng, (date1,hour1,"pm25","pm25!='_'",));
            pool.close()
            pool.join()
    
            insertData(date1,hour1);
    
        logging.debug(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')+"计算结束");
    
        cursor.close;
        cnxn.close;
  • 相关阅读:
    Hbase数据备份&&容灾方案
    maven 高级玩法
    Python操作MySQL -即pymysql/SQLAlchemy用法
    python
    Redis的AOF功能
    Redis的快照功能
    查看哪些进程占用了SWAP分区?
    Java进程CPU使用率高排查
    利用iptables实现基于端口的网络流量统计
    从free命令看Linux内存管理
  • 原文地址:https://www.cnblogs.com/tiandi/p/7771746.html
Copyright © 2011-2022 走看看