zoukankan      html  css  js  c++  java
  • 在 MongoDB 中查找缺失的数据包

      1 package syncPacker;
      2 
      3 import java.io.BufferedReader;
      4 import java.io.File;
      5 import java.io.FileReader;
      6 import java.io.FileWriter;
      7 import java.io.IOException;
      8 import java.net.UnknownHostException;
      9 import java.util.ArrayList;
     10 import java.util.Date;
     11 import java.util.HashMap;
     12 import java.util.List;
     13 import java.util.Map;
     14 
     15 import org.apache.http.message.BasicNameValuePair;
     16 import org.apache.log4j.Logger;
     17 import org.springframework.beans.factory.annotation.Autowired;
     18 import org.springframework.beans.factory.annotation.Value;
     19 import org.springframework.stereotype.Service;
     20 
     21 import com.mongodb.BasicDBObject;
     22 import com.mongodb.DB;
     23 import com.mongodb.DBCollection;
     24 import com.mongodb.DBObject;
     25 import com.mongodb.Mongo;
     26 import com.pro.framework.action.BaseController;
     27 
     28 import logCenter.SendLog;
     29 import net.sf.json.JSONObject;
     30 import syncPacker.bean.PatentBibliographicChangeBean;
     31 import syncPacker.bean.SyncDataPackageBean;
     32 import utils.DateUtils;
     33 import utils.DatetimeUtils;
     34 import utils.HttpUtils;
     35 
     36 /**
     37  * 增量数据分块打包 全处理
     38  * 
     39  * http://localhost:8080/PatentSearchExtend/syncPacker!pack.action
     40  * 
     41  * http://10.78.2.21:8080/PatentSearchExtend/syncPacker!pack.action?bean.
     42  * tableName=E_BIBLIOGRAPHIC_CHANGE_TEMP&bean.maxRowsPerSyncPackerPackage= 10000
     43  */
     44 @Service
     45 public class SyncPacker_201603 extends BaseController {
     46 
     47     private static final long serialVersionUID = 1L;
     48 
     49     // 初始化:数据库接口
     50     @Autowired
     51     private SyncPackerDao dao;
     52 
     53     // 初始化:发送端地址
     54     @Value("${url_syncSender}")
     55     private String url_syncSender;
     56 
     57     // 初始化:本地文件存储路径
     58     @Value("${path_syncPacker_package}")
     59     private String path_syncPacker_package;
     60 
     61     // 初始化:读取最大数据包名称的地址
     62     @Value("${url_selectMaxPackageNumber}")
     63     private String url_selectMaxPackageNumber;
     64 
     65     // 初始化:存储最大数据包名称的地址
     66     @Value("${url_insertSyncDataPackage}")
     67     private String url_insertSyncDataPackage;
     68 
     69     // 初始化:查询条件Bean
     70     private SyncDataPackageBean bean = new SyncDataPackageBean();
     71     // 初始化:查询结果List
     72     private List<PatentBibliographicChangeBean> pbcList = new ArrayList<PatentBibliographicChangeBean>();
     73     // 初始化:形成的数据包名称
     74     private List<PatentBibliographicChangeBean> pbcList_withPackageName = new ArrayList<PatentBibliographicChangeBean>();
     75     // 初始化:已打包的增量数据ID表单
     76     private List<String> pdaIdList = new ArrayList<String>();
     77     // 初始化:用于删除数据的临时ID List
     78     private List<String> idPartList = new ArrayList<String>();
     79     // 初始化:传输协议
     80     private HttpUtils httpUtils = new HttpUtils();
     81     // 初始化:键值串
     82     private List<BasicNameValuePair> paramList = new ArrayList<BasicNameValuePair>();
     83     // 初始化:历史最大包编号
     84     private String maxPackageNumber;
     85     // 初始化:记录打包完成后的数据版本
     86     private Integer centerNodeDataVersion;
     87     // 发送远程日志
     88     private SendLog sendLog = new SendLog();
     89     // 记录本地日志
     90     private Logger logger = Logger.getLogger(SyncPacker_201603.class);
     91     // 初始化:判断程序是否正在运行
     92     public static boolean isRunning = false;
     93 
     94     // 本次处理完成后的最大包编号
     95     private String packedPackageNumber;
     96     // 用于返回json的成功信息
     97     private String success = "success";
     98     
     99 
    100     /** 文件补发用:指定数据重新打包 */
    101     // http://localhost:8080/PatentSearchExtend/syncPacker!packByPackageNumber.action?bean.packageNumberStart=000101&bean.packageNumberEnd=000102
    102     public String packByPackageNumber() throws Exception {
    103 
    104         logMemory("本次请求处理开始。", bean.getPackageNumberStart() + " - " + bean.getPackageNumberEnd() );
    105 
    106         String job_start = DateUtils.getCurrentTimeString(0);
    107         
    108         try {
    109             
    110 
    111             for ( int i = Integer.valueOf(bean.getPackageNumberStart()); 
    112                     i <= Integer.valueOf(bean.getPackageNumberEnd()); i++) {                
    113 
    114                 // 开始时间
    115                 String package_start = DateUtils.getCurrentTimeString(0);
    116                 
    117                 // 包编号
    118                 String packageNumber = String.format( "%06d", i );
    119                 
    120                 logMemory("开始,包编号:", packageNumber );
    121                 
    122                 //(1)读历史表
    123                 pbcList = selectList_changeHistory( packageNumber ); 
    124                 
    125                 if( null == pbcList ) {
    126                     logMemory("<span style="color:red;">数据为空 !!!</span>", "");
    127                     
    128                 } else{
    129                     logMemory("数据查询完毕,数据量为", String.valueOf( pbcList.size() ));
    130 
    131                     //(2)插入MongoDB
    132                     insertMongoDB( pbcList );
    133                     pbcList.clear();
    134                 };
    135                 
    136                 logMemory("传输结束,包编号:" + packageNumber ," 用时: "
    137                         + DatetimeUtils.getDistanceTimes_string(package_start, DateUtils.getCurrentTimeString(0)) );
    138             }
    139         } catch (Exception e) {
    140             // 日志输出
    141             logMemory("系统发生异常", e.getMessage());
    142             e.printStackTrace();
    143         }
    144         
    145         logMemory("本次请求处理完成,程序结束。", bean.getPackageNumberStart() + " - " + bean.getPackageNumberEnd()
    146                 + " 用时: " + DatetimeUtils.getDistanceTimes_string( job_start, DateUtils.getCurrentTimeString(0)) );
    147         
    148         return SUCCESS;
    149     }
    150     
    151 
    152     /**
    153      * 读历史表
    154      * @param packageNumber
    155      * @return
    156      */
    157     private List<PatentBibliographicChangeBean> selectList_changeHistory( String packageNumber ){
    158         
    159         for ( int i = 0; i < 100; i++ ) {
    160             
    161             try {
    162                 return dao.selectList_changeHistory( packageNumber );                
    163             } catch (Exception e) {
    164                 // TODO Auto-generated catch block
    165 //                e.printStackTrace();
    166                 logMemory("系统发生异常", e.getMessage());
    167                 try {
    168                     Thread.sleep(500);
    169                     logMemory("暂停 0.5 秒", "" );
    170                 } catch (InterruptedException e1) {
    171                     // TODO Auto-generated catch block
    172                     e1.printStackTrace();
    173                 }
    174             } 
    175         }// loop end
    176         return null ;
    177         
    178     }
    179     
    180     private PatentBibliographicChangeBean select_changeHistory( String packageNumber ){
    181         
    182         for ( int i = 0; i < 100; i++ ) {
    183             
    184             try {
    185                 return dao.select_changeHistory( packageNumber );                
    186             } catch (Exception e) {
    187                 // TODO Auto-generated catch block
    188 //                e.printStackTrace();
    189                 logMemory("系统发生异常", e.getMessage());
    190                 try {
    191                     Thread.sleep(500);
    192                     logMemory("暂停 0.5 秒", "" );
    193                 } catch (InterruptedException e1) {
    194                     // TODO Auto-generated catch block
    195                     e1.printStackTrace();
    196                 }
    197             } 
    198         }// loop end
    199         return null ;
    200         
    201     }
    202     
    203     
    204     
    205 
    206     /**
    207      * 插入 MongoDB
    208      */
    209     public void insertMongoDB( List<PatentBibliographicChangeBean> pbcList ) {    
    210 
    211         //# MongoDB(数据加载目标)
    212         String syncLoadIntoMongoDbService = "10.78.2.23:27017";
    213         String syncLoadIntoMongoDbName = "patent_search_extend";
    214         String syncLoadIntoMongoTable = "patent_bibliographic_20160319";
    215 
    216         // 加载开始
    217 //        logger.info(DateUtils.getNow() + " Load start: " + syncLoadIntoMongoDbService );
    218 
    219         Mongo m = null;
    220         try {
    221             m = new Mongo( syncLoadIntoMongoDbService );
    222         } catch (UnknownHostException e) {
    223             e.printStackTrace();
    224             logger.info(DateUtils.getNow() + " UnknownHostException:" + e.getMessage());
    225         }
    226 
    227         // 库名
    228         DB db = m.getDB( syncLoadIntoMongoDbName );
    229 //        logger.info( DateUtils.getNow() + " Db:" + syncLoadIntoMongoDbName );
    230 
    231         // 表名
    232         DBCollection collection = db.getCollection( syncLoadIntoMongoTable );
    233 //        logger.info(DateUtils.getNow() + " Table:" + syncLoadIntoMongoTable );
    234         
    235         // 循环列表,将每个元素插入数据库
    236         for( PatentBibliographicChangeBean pbcBean : pbcList ){
    237             
    238             
    239             
    240             //(1)读取一条著录数据
    241 //            JSONObject json = packByPackageNumber_readBibl( pbcBean.getId() );
    242             
    243 //            if( null == json ) return ;
    244 
    245             // 列,值
    246             BasicDBObject insDoc = new BasicDBObject();
    247             
    248             insDoc.put("abstract_No"    , pbcBean.getAbstract_No()    );
    249             insDoc.put("app_Addr"       , pbcBean.getApp_Addr()       );
    250             insDoc.put("app_Cn"         , pbcBean.getApp_Cn()         );
    251             insDoc.put("app_Country"    , pbcBean.getApp_Country()    );
    252             insDoc.put("app_Date"       , pbcBean.getApp_Date()       );
    253             insDoc.put("app_Name"       , pbcBean.getApp_Name()       );
    254             insDoc.put("app_Sn"         , pbcBean.getApp_Sn()         );
    255             insDoc.put("app_Type"       , pbcBean.getApp_Type()       );
    256             insDoc.put("app_Zip"        , pbcBean.getApp_Zip()        );
    257             insDoc.put("ecla"           , pbcBean.getEcla()           );
    258             insDoc.put("fi"             , pbcBean.getFi()             );
    259             insDoc.put("ft"             , pbcBean.getFt()             );
    260             insDoc.put("id"             , pbcBean.getId()             );
    261             insDoc.put("inv_Title"      , pbcBean.getInv_Title()      );
    262             insDoc.put("invent_Type"    , pbcBean.getInvent_Type()    );
    263             insDoc.put("inventor"       , pbcBean.getInventor()       );
    264             insDoc.put("ipc_Standard"   , pbcBean.getIpc_Standard()   );
    265             insDoc.put("locarno"        , pbcBean.getLocarno()        );
    266             insDoc.put("operation_Time" , pbcBean.getOperation_Time() );
    267             insDoc.put("operation_Type" , pbcBean.getOperation_Type() );
    268             insDoc.put("package_Number" , pbcBean.getPackage_Number() );
    269             insDoc.put("pct_App_Cn"     , pbcBean.getPct_App_Cn()     );
    270             insDoc.put("pct_App_Date"   , pbcBean.getPct_App_Date()   );
    271             insDoc.put("pct_App_Sn"     , pbcBean.getPct_App_Sn()     );
    272             insDoc.put("pct_Date"       , pbcBean.getPct_Date()       );
    273             insDoc.put("pct_Pub_Cn"     , pbcBean.getPct_Pub_Cn()     );
    274             insDoc.put("pct_Pub_Date"   , pbcBean.getPct_Pub_Date()   );
    275             insDoc.put("pct_Pub_Lang"   , pbcBean.getPct_Pub_Lang()   );
    276             insDoc.put("pct_Pub_Sn"     , pbcBean.getPct_Pub_Sn()     );
    277             insDoc.put("prn"            , pbcBean.getPrn()            );
    278             insDoc.put("prn_Cn"         , pbcBean.getPrn_Cn()         );
    279             insDoc.put("prn_Date"       , pbcBean.getPrn_Date()       );
    280             insDoc.put("prn_Sn"         , pbcBean.getPrn_Sn()         );
    281             insDoc.put("prn_Type"       , pbcBean.getPrn_Type()       );
    282             insDoc.put("pub_Cn"         , pbcBean.getPub_Cn()         );
    283             insDoc.put("pub_Date"       , pbcBean.getPub_Date()       );
    284             insDoc.put("pub_Sn"         , pbcBean.getPub_Sn()         );
    285             insDoc.put("pub_Type"       , pbcBean.getPub_Type()       );
    286             insDoc.put("uc"             , pbcBean.getUc()             );
    287             
    288             collection.insert(insDoc);
    289             insDoc.clear();
    290             
    291         }
    292 
    293         // 循环遍历pdaBeanList
    294 //        System.out.println("loading ...");
    295 //        logger.info(DateUtils.getNow() + " rows:" + pdaBeanList.size());
    296 
    297         // 当前记录编号
    298         // int currentRowNumber = 0;
    299 
    300         if ( m != null) m.close();
    301         
    302 //        System.out.println("Load finished.");
    303     }
    304     
    305 
    306     
    307     /**
    308      * 在 MongoDB 中查找缺失的数据包
    309      * @return
    310      * @throws Exception
    311      */
    312     public String findLostPackages_from_mongoDB() throws Exception {
    313         
    314 
    315         //# MongoDB(数据加载目标)
    316         String syncLoadIntoMongoDbService = "10.78.2.23:27017";
    317         String syncLoadIntoMongoDbName = "patent_search_extend";
    318         String syncLoadIntoMongoTable = "patent_bibliographic_20160319";
    319 
    320         // 加载开始
    321 //        logger.info(DateUtils.getNow() + " Load start: " + syncLoadIntoMongoDbService );
    322 
    323         Mongo m = null;
    324         try {
    325             m = new Mongo( syncLoadIntoMongoDbService );
    326         } catch (UnknownHostException e) {
    327             e.printStackTrace();
    328             logger.info(DateUtils.getNow() + " UnknownHostException:" + e.getMessage());
    329         }
    330 
    331         // 库名
    332         DB db = m.getDB( syncLoadIntoMongoDbName );
    333 //        logger.info( DateUtils.getNow() + " Db:" + syncLoadIntoMongoDbName );
    334 
    335         // 表名
    336         DBCollection collection = db.getCollection( syncLoadIntoMongoTable );
    337 //        logger.info(DateUtils.getNow() + " Table:" + syncLoadIntoMongoTable );        
    338         
    339         PatentBibliographicChangeBean pbcBean = new PatentBibliographicChangeBean();
    340         
    341 
    342         for ( int i = Integer.valueOf(bean.getPackageNumberStart()); 
    343                 i <= Integer.valueOf(bean.getPackageNumberEnd()); i++) {    
    344             
    345             //(1)从 Oracle 取一个著录ID
    346             pbcBean = select_changeHistory( String.format( "%06d", i ));
    347             
    348             if( null == pbcBean ) {
    349                 logMemory("数据为空!  ", "包编号:"+i+",著录ID:" + pbcBean.getId()  ); continue ;
    350             }
    351             
    352 //            logMemory("包编号:"+i+",著录ID:", pbcBean.getId() );
    353             
    354             //(2)查询 MongoDB 中是否存在        
    355             BasicDBObject docFind = new BasicDBObject( "id", pbcBean.getId() );
    356             DBObject findResult = collection.findOne( docFind );
    357             
    358             if( null == findResult || "".equals( findResult )){
    359                 logMemory("缺失  !!! ", "包编号:"+i+",著录ID:" + pbcBean.getId()  );
    360             }
    361         }
    362         
    363         // retrieve
    364 
    365 
    366         if ( m != null) m.close();
    367         
    368         
    369         return SUCCESS ;
    370     }
    371     
    372     
    373     
    374     
    375     
    376     
    377     
    378     
    379     
    380     
    381     
    382     
    383     
    384     
    385 
    386     /** 记录日志 */
    387     private void logMemory(String behavior, String content) {
    388         // 向服务器发送日志
    389 //        sendLog.send("syncPacker", behavior, content);
    390         // 记录本地日志
    391         logger.info(DateUtils.getNow() + " " + behavior + " :" + content);
    392         // 控制台输出日志
    393 //        System.out.println("syncPacker : " + DateUtils.getNow() + " " + behavior + " :" + content);
    394     }
    395 
    396     @Override
    397     public String insert() throws Exception {
    398         return null;
    399     }
    400 
    401     @Override
    402     public String update() throws Exception {
    403         return null;
    404     }
    405 
    406     @Override
    407     public String selectList() throws Exception {
    408         return null;
    409     }
    410 
    411     @Override
    412     public String delete() throws Exception {
    413         return null;
    414     }
    415 
    416     public static boolean isRunning() {
    417         return isRunning;
    418     }
    419 
    420     public static void setRunning(boolean isRunning) {
    421         SyncPacker_201603.isRunning = isRunning;
    422     }
    423 
    424     public Integer getCenterNodeDataVersion() {
    425         return centerNodeDataVersion;
    426     }
    427 
    428     public void setCenterNodeDataVersion(Integer centerNodeDataVersion) {
    429         this.centerNodeDataVersion = centerNodeDataVersion;
    430     }
    431 
    432     public String getSuccess() {
    433         return success;
    434     }
    435 
    436     public void setSuccess(String success) {
    437         this.success = success;
    438     }
    439 
    440     public String getMaxPackageNumber() {
    441         return maxPackageNumber;
    442     }
    443 
    444     public void setMaxPackageNumber(String maxPackageNumber) {
    445         this.maxPackageNumber = maxPackageNumber;
    446     }
    447 
    448     public String getPackedPackageNumber() {
    449         return packedPackageNumber;
    450     }
    451 
    452     public void setPackedPackageNumber(String packedPackageNumber) {
    453         this.packedPackageNumber = packedPackageNumber;
    454     }
    455 
    456     public SyncDataPackageBean getBean() {
    457         return bean;
    458     }
    459 
    460     public void setBean(SyncDataPackageBean bean) {
    461         this.bean = bean;
    462     }
    463 }
  • 相关阅读:
    C语言I博客作业07
    C语言I博客作业06
    C语言I博客作业05
    C语言I博客作业04
    C语言I博客作业02
    Django连接MySql数据库
    asyncio异步编程
    Django-rest framework框架
    Git版本管理工具详细教程
    MySQL的sql_mode模式说明及设置
  • 原文地址:https://www.cnblogs.com/livon/p/5302164.html
Copyright © 2011-2022 走看看