zoukankan      html  css  js  c++  java
  • IBMMQ之取发文件

    package com.citic.main;
    
    import java.io.BufferedReader;
    import java.io.File;
    import java.io.FileNotFoundException;
    import java.io.FileOutputStream;
    import java.io.FileReader;
    import java.io.FilenameFilter;
    import java.io.PrintStream;
    import java.sql.SQLException;
    import java.util.HashMap;
    import java.util.regex.Pattern;
    
    import com.citic.util.*;
    import com.citic.util.comm.*;
    import com.citic.msgutil.ProImpl;
    
    /**
     * 本类功能:作为二级主类,上接MAIN方法,下接其它操作子类,主要对报文进行发送接收
     * 落脚点在发送MSG方法上:sendmsgs,最终由此方法提交并关闭MQ队列管理器
     * 不管发送还是接收报文,最终都要保证会调用sendmsgs方法
     * @author db2admin
     *
     */
    public class MessageProcess implements IConstants{
        private static String[] MQString = null;
        private static int readcnt=5;
        private static String dval=ConfigFileUtil.getValue("debuglevel");
        private static int debuglevel=("".equals(dval)?ALL:Integer.parseInt(dval));
        private static MQUtil mqutil=new MQUtil();
        private int i=100;
        /*
        public static void main(String[] args) throws Exception{
            //log2file(ConfigFileUtil.getInstance().getPathName(".."));
            boolean fileflag=false;
            processOutMessage("d:\data\supis\data\datalist.txt");
        }*/
        
        //0.日志处理
        private static void log2file(String filepth){
            try {
                File tfile = new File(filepth + "logs/");
                if (!tfile.exists()) {
                    tfile.mkdir();
                }
    
                FileOutputStream out = new FileOutputStream(tfile.getPath()
                        + "/systemout" + CommFun.strNow().substring(0, 8) + ".txt", true);
                FileOutputStream errout = new FileOutputStream(tfile.getPath()
                        + "/systemerrout" + CommFun.strNow().substring(0, 8) + ".txt", true);
                
                PrintStream ps = new PrintStream(out);
                PrintStream pserr = new PrintStream(errout);
                System.setOut(ps);
                System.setErr(pserr);
            } catch (FileNotFoundException e) {
                e.printStackTrace();
            }
        }
        
        public static void processInMessage(String dataDate, final String filepatten,String sendpath) {
            String filePath = ((sendpath==null || "".equals(sendpath))?ConfigFileUtil.getInstance().getPathName(dataDate)+"send":sendpath);
            CommFun.log(debuglevel, "filePath:"+filePath+",dataDate:"+dataDate+",filepatten:"+filepatten);
            File file = new File(filePath);
            File[] filelist = file.listFiles(new FilenameFilter() {
                private Pattern pattern = Pattern.compile(filepatten.replace("_R",
                        "") + ".*\.xml");
    
                @Override
                public boolean accept(File dir, String name) {
                    return pattern.matcher(name).matches();
                }
            });
            String[] files=new String[filelist.length];
            int i=0;
            for (File f : filelist) {
                // processInMessage(f.getAbsolutePath(),0);
                String fapth=f.getAbsolutePath();
                CommFun.log(debuglevel, "添加文件:"+fapth);
                files[i++]=fapth;
            }
            processInMessage(files,0,false);
            
        }
        
        /**
         * 1.3 2017-03-27 增加MQ的处理部分,如果传入readcnt无效,则需要读取config中的值
         * 接收报文:接收后需要分析报文入库,并产生回执和相应业务报文
         * 是1.1,1.2的主要逻辑实现
         * @param filename如果解析已有文件,则传入文件名,否则""
         * @param readcnt一次读取MQ的条数(不含重复条数)
         * @param redoflag是否重做标志,重做意味着不需要进行解析等流程,直接发送
         * @throws Exception
         */
        public static void processInMessage(String[] filename,int readcnt,boolean redoflag){
            CommFun.log(INFO,filename);
            boolean mqflag=false; //说明是MQ取的文件
            HashMap<String,String> hmmq=null;
            if (!"".equals(filename) && filename != null) {
                ConfigFileUtil.setValue("fileFlag", "true");
                //MQString = new String[] { filename };
                MQString = filename;
            } else {
                String rcnt = (readcnt > 0 ? String.valueOf(readcnt)
                        : ConfigFileUtil.getValue("readcnt"));
                //rcnt="";
                if (rcnt != null && !"".equals(rcnt)) {
                    readcnt = Integer.parseInt(rcnt);
                }
                CommFun.log(INFO, "每次读取MQ条数为:"+readcnt+"!");
    //            MQFileReceiver mqFileReceiver = new MQFileReceiver(readcnt);
                hmmq=mqutil.runGoupReceier(readcnt);
                
    //            MQString = mqFileReceiver.getMQFileArray();
                if (hmmq==null || hmmq.size()<=0) {
                    CommFun.log("没有取到MQ数据!退出!");
                }else{
                    Object[] oba=hmmq.keySet().toArray();
                    MQString=new String[oba.length];
                    for(int i=0;i<oba.length;i++){
                        MQString[i]=(String) oba[i];
    //                    System.out.println(MQString[i]+":"+MQString[i].getClass());
                    }
                }
                mqflag=true;
            }
            if(MQString!=null){
                CommFun.log(debuglevel, "共获取MQ条数为:"+MQString.length);    
            }else{
                CommFun.log(debuglevel, "获取MQ条数为:0");
            }
            
            ProImpl pil=new ProImpl();
            HashMap<String,String> hm0=new HashMap<String, String>();
            HashMap<String,String> hm=null;
            for (int i = 0; MQString != null && i < MQString.length; i++) {
                hm=new HashMap<String, String>();
                CommFun.log(INFO,
                        "解析字串["+MQString.length+"-"+i+":原文件名为:"+(hmmq!=null && hmmq.size()>0?hmmq.get(MQString[i]):filename)+"]前300个字符:["
                                + (mqflag ? MQString[i].substring(0, 200)
                                        : MQString[i]) + "]");
                //对于重新处理标志,不进行解析程序,直接发送
                if(redoflag){
                    hm.put(MQString[i], "");
                }else{
                    CommFun.log(0,"解析文件:"+MQString[i]+"开始!");
                    hm=pil.parserXml(MQString[i]);
                    CommFun.log(0,"解析文件:"+MQString[i]+"完毕!");
                }
                
                StringBuffer[] stb = new StringBuffer[hm.size()];
                
                int i1 = 0;
                for (String s : hm.keySet()) {
                    stb[i1++] = new StringBuffer(s + ":" + hm.get(s));
                    hm0.put(s, "");
                }
                //0.对MQ提取的文件进行记载
                if (mqflag && hm.size() > 0) {
                    CommFun.log(debuglevel, "MQ处理文件!");
                    String tmpfilename = ConfigFileUtil.getInstance().getPathName()
                            + "docs" + File.separator + CommFun.strNowRand().substring(0, 8)
                            + ".txt";
                    CommFun.log(INFO, tmpfilename);
                    FileOperation.stringbuffer2file(stb, tmpfilename);
                }else{
                    CommFun.log(debuglevel, "此次无文件发送,继续循环!");
                    continue;
                }
            }
            //1.发送
            sendmsgs(hm0);
            CommFun.log(debuglevel, "处理完毕!");
        }
        
        public static void processOutMessage(String dataDate, final String filepatten){
            processOutMessage(dataDate,filepatten,"");
        }
        
        /**
         * 2.1对指定日期下的按模式匹配文件列表进行批量发送
         * @param dataDate
         * @param filepatten
         * @throws Exception
         */
        public static void processOutMessage(String dataDate, final String filepatten,String sendpath) {
            String filePath = ((sendpath==null || "".equals(sendpath))?ConfigFileUtil.getInstance().getPathName(dataDate)+"send":sendpath);
            CommFun.log(debuglevel, "filePath:"+filePath+",dataDate:"+dataDate+",filepatten:"+filepatten);
            File file = new File(filePath);
            File[] filelist = file.listFiles(new FilenameFilter() {
                private Pattern pattern = Pattern.compile(filepatten.replace("_S",
                        "").replace(".xml", "") + ".*\.xml");
    
                @Override
                public boolean accept(File dir, String name) {
                    return pattern.matcher(name).matches();
                }
            });
            HashMap<String, String> hm = new HashMap<String, String>();
            for (File f : filelist) {
                // processInMessage(f.getAbsolutePath(),0);
                String fapth=f.getAbsolutePath();
                CommFun.log(debuglevel, "添加文件:"+fapth);
                hm.put(fapth, "");
            }
            sendmsgs(hm);
            CommFun.log(debuglevel, "通过通配符处理列表文件发送完毕,日期为:" + dataDate
                    + ",filepattern:" + filepatten);
        }
        
        /**
         * 2.2输入文件,根据文件列表进行发送
         * @param filename
         */
        public static void processOutMessage(String filename) {
            if (filename == null || "".equals(filename)) {
                CommFun.log(debuglevel, "传入参数无效");
                return;
            }
            
            File file = new File(filename);
            if (!file.exists()) {
                CommFun.log(debuglevel, "[" + filename + "]不存在");
                throw new RuntimeException("[" + filename + "]不存在");
            }
    
            try {
                FileReader filereader = new FileReader(file);
                BufferedReader bf = new BufferedReader(filereader);
                String st;
                HashMap<String,String> hm=new HashMap<String, String>();
                while ((st = bf.readLine()) != null && !"".equals(st)) {
                    CommFun.log(debuglevel, st);
                    if(new File(st).exists()){
                        hm.put(st, "");    
                    }else{
                        CommFun.log(debuglevel, "["+st+"]不存在!");
                    }
                }
                sendmsgs(hm);
                CommFun.log(debuglevel, "通过列表文件发送完毕");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        
        /**
         * 2.3产生相应报文并发送
         * @param hm
         */
        public static void processOutMessage(String dateString, String msgtype,
                String sendType, String appendString, String sender) {
            ProImpl pil=new ProImpl();
            HashMap<String,String> hm0=new HashMap<String, String>();
            HashMap<String,String> hm=null;
            String[] msgtypes=msgtype.split(",");
            for(int i=0;i<msgtypes.length;i++){
                CommFun.log(ALL, "生成第[" + i + "]个报文:[" + msgtypes[i] + "]");
                hm=new HashMap<String, String>();
                hm=pil.createXml(Integer.parseInt(dateString),msgtypes[i],sendType,appendString,sender);
                //0.对MQ提取的文件进行记载
                if (hm.size() > 0) {
                    CommFun.log(debuglevel, "MQ处理文件!");
                    StringBuffer[] stb = new StringBuffer[hm.size()];
                    int i1 = 0;
                    CommFun.log(DATA, "需要发送以下["+hm.size()+"]个文件!");
                    for (String s : hm.keySet()) {
                        String tmpmsgid=hm.get(s);
                        stb[i1++] = new StringBuffer(s + ":" + tmpmsgid);
                        hm0.put(s, tmpmsgid);
                        CommFun.log(DATA, s + ":" + tmpmsgid);
                    }
                    String tmpfilename = ConfigFileUtil.getInstance().getPathName()
                            + "docs" + File.separator + CommFun.strNowRand().substring(0, 8)
                            + ".txt";
                    CommFun.log(DATA, tmpfilename);
                    FileOperation.stringbuffer2file(stb, tmpfilename);
                }else{
                    CommFun.log(debuglevel, "此次无文件发送,继续坛坛循环!");
                    continue;
                }
            }
            
            String flagstr=ConfigFileUtil.getValue("sendflag");
            boolean sendflag=false;
            if(flagstr.length()>0 && flagstr.toUpperCase().startsWith("Y")){
                sendflag=true;
            }
            
            if(sendflag){
                CommFun.log(DATA,"发送文件:"+hm0.toString());
            }else{
                hm0=new HashMap<String, String>();
                CommFun.log(DATA,"只生成数据,不发送"+hm0.toString());
            }
            
            //1.发送
            sendmsgs(hm0);
            try {
                DBOperation.updateMsgHeadSend(hm0.values().toArray());
            } catch (SQLException e) {
                CommFun.log(ERR, e.getMessage());
                e.printStackTrace();
            }
            
            CommFun.log(debuglevel, "处理完毕!");
        }
        
        
        /**
         * 提取公共方法,发送并对MQ进行善后
         * @param hm
         */
        private static void sendmsgs(HashMap<String,String> hm){
            //1.发送
            if(hm!=null && hm.size()>0){
                CommFun.log("MQ共需要发送文件["+hm.size()+"]个");
                int i=0;
                String[] strs=new String[hm.size()];
                for (String s : hm.keySet()) {
                    if (s == null || "".equals(s)) {
                        CommFun.log(debuglevel, "[" + s + "]" + "文件为空,不发送!");
                    } else {
                        CommFun.log(debuglevel, "[" + s + "]" + "文件不为空,发送!");
                        strs[i++]=s;
                    }
                }
                mqutil.runGoupSender(strs);
                CommFun.log("MQ已发送文件["+hm.size()+"]个");
            }
            // 2.对MQ队列管理器进行相应管理,最终的MQ处理
            mqutil.commit();
        }
    }
    View Code
  • 相关阅读:
    Callable的Future模式
    并发队列
    并发工具类和线程池
    安全与死锁问题
    ConcurrentHashMap底层实现
    Map集合常见面试题
    List集合框架
    类加载器
    Activiti中个人任务分配
    流程定义
  • 原文地址:https://www.cnblogs.com/silencemaker/p/12632254.html
Copyright © 2011-2022 走看看