MailBackupSystem流程:
入口:web.xml 拦截所有jsp页面,跳转到/views/admin/login.jsp登陆
①新建工单:
(1)quanyi_list.js
function newOrder() { $('#dlg').dialog('open').dialog('setTitle', 'New User'); $('#fm').form('clear'); $('#up').form('clear'); $('#fm-select').combobox('setValue', '+MAILMF'); url = 'quanyi/insert.htmls'; }
(2)BatchDiffconfigController控制器
1 @Controller 2 @RequestMapping(value = "/quanyi") 3 public class BatchDiffconfigController { 4 @RequestMapping(value = "/insert") 5 @ResponseBody 6 public String insert(HttpServletRequest request, HttpServletResponse response, BatchDiffconfigPO batchDiffconfigPO) { 7 int result = 0; 8 try { 9 result = batchDiffconfigService.insert(batchDiffconfigPO); 10 } catch (DuplicateKeyException e) { 11 return ResultTemplate.fail("已存在相同的工单代码"); 12 } catch (Exception e) { 13 return ResultTemplate.fail("保存失败,请联系管理员"); 14 } 15 if (result > 0) { 16 return ResultTemplate.success(); 17 } else { 18 return ResultTemplate.fail(); 19 } 20 21 } 22 }
(3)业务代码
1 @Service 2 public class BatchDiffconfigServiceImpl implements BatchDiffconfigService { 3 @Override 4 public int insert(BatchDiffconfigPO batchDiffconfigPO) throws Exception { 5 Long batchid = batchDiffconfigPOExtMapper.createPrimaryKey(); 6 batchDiffconfigPO.setBatchid(batchid); 7 batchDiffconfigPO.setCreatetime(new Date()); 8 if (StringUtils.isEmpty(batchDiffconfigPO.getFileiuploadpath())) { 9 batchDiffconfigPO.setProcessstatus(Constants.BATCH_ORDER_NEED_UPLOAD_FILE); 10 } else { 11 batchDiffconfigPO.setProcessstatus(Constants.BATCH_ORDER_READIY); 12 } 13 return batchDiffconfigPOMapper.insert(batchDiffconfigPO); 14 15 } 16 }
(3)BatchDiffconfigPOMapper.xml对应查询sql
1 <insert id="insert" parameterType="com.richinfo.quanyi.model.BatchDiffconfigPO" > 2 insert into BATCH_DIFFCONFIG (BATCHID, BATCHCODE, PROVCODE, 3 AREACODE, BEGINDATE, ENDDATE, 4 MEMO, FILEIUPLOADPATH, PROCESSSTATUS, 5 CREATENAME, MODIFYNAME, CREATETIME, 6 MODIFYTIME, SERVICECODE, UPLOADNUM, 7 STATUS, INCSUCCESS, INCFAILNUM, 8 REDUCESUCCESS, REDUCEFAILNUM) 9 values (#{batchid,jdbcType=DECIMAL}, #{batchcode,jdbcType=VARCHAR}, #{provcode,jdbcType=DECIMAL}, 10 #{areacode,jdbcType=DECIMAL}, #{begindate,jdbcType=TIMESTAMP}, #{enddate,jdbcType=TIMESTAMP}, 11 #{memo,jdbcType=VARCHAR}, #{fileiuploadpath,jdbcType=VARCHAR}, #{processstatus,jdbcType=DECIMAL}, 12 #{createname,jdbcType=VARCHAR}, #{modifyname,jdbcType=VARCHAR}, #{createtime,jdbcType=TIMESTAMP}, 13 #{modifytime,jdbcType=TIMESTAMP}, #{servicecode,jdbcType=VARCHAR}, #{uploadnum,jdbcType=DECIMAL}, 14 #{status,jdbcType=DECIMAL}, #{incsuccess,jdbcType=DECIMAL}, #{incfailnum,jdbcType=DECIMAL}, 15 #{reducesuccess,jdbcType=DECIMAL}, #{reducefailnum,jdbcType=DECIMAL}) 16 </insert>
(4)最终涉及的表BATCH_DIFFCONFIG
②执行工单:
(1)quanyi_list.js
1 function excuteOrder() { 2 $('#excute-fm').form('submit', { 3 url: 'quanyi/excute.htmls', 4 onSubmit: function () { 5 return $(this).form('validate'); 6 }, 7 success: function (result) { 8 var result = eval('(' + result + ')'); 9 console.log(result); 10 if (result.code == "S_OK") { 11 $('#excute-div').dialog('close'); // close the dialog 12 $('#dg').datagrid('reload'); 13 $.messager.show({ 14 title: 'Success', 15 msg: result.msg 16 }); 17 } else if(result.code =="S_TIMEOUT"){ 18 redirectToLogin(result.msg); 19 }else { 20 $.messager.show({ 21 title: 'Error', 22 msg: result.msg 23 }); 24 } 25 } 26 }); 27 }
(2)BatchDiffconfigController控制器
1 @Controller 2 @RequestMapping(value = "/quanyi") 3 public class BatchDiffconfigController { 4 @RequestMapping(value = "/excute") 5 @ResponseBody 6 public String excute(HttpServletRequest request, HttpServletResponse response, UserDiffConfigPO userDiffConfigPO, 7 @RequestParam("batchid") long batchid) { 8 LOG.info(userDiffConfigPO.toString()); 9 return batchDiffconfigService.excute(userDiffConfigPO, batchid); 10 } 11 }
(3)业务代码
1 @Service 2 public class BatchDiffconfigServiceImpl implements BatchDiffconfigService { 3 @Override 4 public String excute(UserDiffConfigPO userDiffConfigPO, long batchid) { 5 BatchDiffconfigPO batchDiffconfigPO = findByPrimaryKey(batchid); 6 if (batchDiffconfigPO == null) { 7 return ResultTemplate.fail("工单不存在"); 8 } 9 10 String uploadpath = batchDiffconfigPO.getFileiuploadpath(); 11 if (StringUtils.isEmpty(uploadpath)) { 12 return ResultTemplate.fail("未上传文件."); 13 } 14 StringBuilder sb = new StringBuilder(); 15 String filePath = sb.append(Constants.FILE_UPLOAD_ROOT_PATH).append("/") 16 .append(batchDiffconfigPO.getFileiuploadpath()).toString(); 17 File zipFile = new File(filePath); 18 if (!zipFile.exists()) { 19 LOG.error("[BatchDiffconfigServiceImpl] zipFile not exist. zipFileName:{}", filePath); 20 return ResultTemplate.fail("上传文件不存在,请重新上传."); 21 } 22 23 String fullFilePath = Constants.FILE_UPLOAD_ROOT_PATH + "/" + batchDiffconfigPO.getFileiuploadpath(); 24 if (batchDiffconfigPO.getProcessstatus() != Constants.BATCH_ORDER_READIY 25 && batchDiffconfigPO.getProcessstatus() != Constants.BATCH_ORDER_FAILED) { 26 LOG.error("[BatchDiffconfigServiceImpl].excute fail .status error.status:{}", 27 batchDiffconfigPO.getProcessstatus()); 28 return ResultTemplate.fail("该状态下不能执行任务"); 29 } 30 BatchDiffconfigPO newBatchDiffconfigPO = new BatchDiffconfigPO(); 31 newBatchDiffconfigPO.setBatchid(batchid); 32 newBatchDiffconfigPO.setModifytime(new Date()); 33 newBatchDiffconfigPO.setProcessstatus(Constants.BATCH_ORDER_DEALING); 34 int resultCode = batchDiffconfigPOMapper.updateByPrimaryKeySelective(newBatchDiffconfigPO); 35 if (resultCode == 0) { 36 LOG.error("[MailNotifyServiceImpl].excuteUpdateMailNotify update fail.batchid: {},batchcode :{}", batchid, 37 batchDiffconfigPO.getBatchcode()); 38 return ResultTemplate.fail("更新工单状态失败"); 39 } 40 BatchDiffconfigOrderMainThread batchDiffconfigOrderMainThread = new BatchDiffconfigOrderMainThread(this, 41 batchDiffconfigPO, userDiffConfigPO); 42 Thread thread = new Thread(batchDiffconfigOrderMainThread); 43 thread.setName("UpdateMailNotifyMainThread"); 44 thread.start(); 45 return ResultTemplate.success(); 46 47 } 48 }
(4)多线程处理
1、BatchDiffconfigOrderMainThread的run()方法执行
1 /* 2 * 3 * @ClassName: BatchDiffconfigOrderMainThread 4 * @Description: 任务工单主线程 5 * @author JaceJiang 6 * @date 2018-1-18 下午2:15:46 7 */ 8 public class BatchDiffconfigOrderMainThread implements Runnable { 9 private static final Logger LOG = LoggerFactory.getLogger(BatchDiffconfigOrderMainThread.class); 10 11 private BatchDiffconfigService batchDiffconfigService; 12 private BatchDiffconfigPO batchDiffconfigPO; 13 private UserDiffConfigPO userDiffConfigPO; 14 15 public BatchDiffconfigOrderMainThread(BatchDiffconfigService batchDiffconfigService, 16 BatchDiffconfigPO batchDiffconfigPO, UserDiffConfigPO userDiffConfigPO) { 17 this.batchDiffconfigService = batchDiffconfigService; 18 this.batchDiffconfigPO = batchDiffconfigPO; 19 this.userDiffConfigPO = userDiffConfigPO; 20 } 21 22 @Override 23 public void run() { 24 ExecutorService executor = Executors.newFixedThreadPool(1); 25 Callable<Boolean> batchDiffconfigOrderTask = new BatchDiffconfigOrderTask(batchDiffconfigService, 26 batchDiffconfigPO, userDiffConfigPO); 27 BatchDiffconfigOrderFutureTask ft = new BatchDiffconfigOrderFutureTask(batchDiffconfigOrderTask, 28 batchDiffconfigService, batchDiffconfigPO); 29 executor.submit(ft); 30 executor.shutdown(); 31 LOG.info("[BatchDiffconfigOrderMainThread] run()"); 32 } 33 }
2、BatchDiffconfigOrderTask:处理工单主任务,消费和生产线程在此创建,最终等待结果并且更新相应记录
1 /* 2 * 3 * @ClassName: BatchDiffconfigOrderTask 4 * @Description: 处理工单主任务,消费和生产线程在此创建,最终等待结果并且更新相应记录 5 * @author JaceJiang 6 * @date 2018-1-18 下午2:17:02 7 */ 8 public class BatchDiffconfigOrderTask implements Callable<Boolean> { 9 10 private static final Logger LOG = LoggerFactory.getLogger(BatchDiffconfigOrderTask.class); 11 12 private static final int THREAD_NUMS = Integer.valueOf(SysConfig.getProperty("UPDATE_MAIL_NOTIFY_THREAD", "1")); 13 14 private BatchDiffconfigService batchDiffconfigService; 15 private BatchDiffconfigPO batchDiffconfigPO; 16 private UserDiffConfigPO userDiffConfigPO; 17 18 public BatchDiffconfigOrderTask(BatchDiffconfigService batchDiffconfigService, BatchDiffconfigPO batchDiffconfigPO, 19 UserDiffConfigPO userDiffConfigPO) { 20 this.batchDiffconfigService = batchDiffconfigService; 21 this.batchDiffconfigPO = batchDiffconfigPO; 22 this.userDiffConfigPO = userDiffConfigPO; 23 } 24 25 @Override 26 public Boolean call() throws Exception { 27 CountDownLatch countDownLatch = new CountDownLatch(THREAD_NUMS); 28 BlockingQueue<String> taskQueues = new LinkedBlockingQueue<String>(1000);//阻塞队列 29 AtomicInteger successCounter = new AtomicInteger(0); 30 AtomicInteger failCounter = new AtomicInteger(0); 31 StringBuffer sb = new StringBuffer(); 32 String fileUrl = batchDiffconfigPO.getFileiuploadpath(); 33 String filePath = sb.append(Constants.FILE_UPLOAD_ROOT_PATH).append("/").append(fileUrl).toString(); 34 35 long begin = System.currentTimeMillis(); 36 LOG.info("[BatchDiffconfigOrderTask] run batchid:{}.batchcode:{}.filePath:{}", batchDiffconfigPO.getBatchid(), 37 batchDiffconfigPO.getBatchcode(), filePath); 38 File zipFile = new File(filePath); 39 if (!zipFile.exists()) { 40 LOG.error("[BatchDiffconfigOrderTask] zipFile not exist. zipFileName:{}", filePath); 41 return false; 42 } 43 44 LOG.info("[BatchDiffconfigOrderTask] UpdateMailNotifyConsumeThread open thread."); 45 for (int i = 0; i < THREAD_NUMS; i++) { 46 Thread thread = new Thread(new BatchDiffconfigOrderConsumeThread(taskQueues, successCounter, failCounter, 47 countDownLatch, userDiffConfigPO, batchDiffconfigService));//消费者 48 thread.setName("BatchDiffconfigOrderTask " + i); 49 thread.start(); 50 } 51 52 LOG.info("[BatchDiffconfigOrderTask] UpdateMailNotifyProduceThread open thread."); 53 Thread thread = new Thread(new BatchDiffconfigOrderProduceThread(taskQueues, filePath)); 54 thread.setName("UpdateMailNotifyProduceThread ");//生产者 55 thread.start(); 56 try { 57 countDownLatch.await(6, TimeUnit.HOURS); 58 } catch (InterruptedException e1) { 59 LOG.error("[BatchDiffconfigOrderTask]InterruptedException excuteUpdateTask open thread."); 60 e1.printStackTrace(); 61 } 62 LOG.info("[BatchDiffconfigOrderTask] excuteUpdateTask update BatchDiffconfigPO."); 63 BatchDiffconfigPO newBatchDiffconfigPO = new BatchDiffconfigPO(); 64 newBatchDiffconfigPO.setBatchid(batchDiffconfigPO.getBatchid()); 65 newBatchDiffconfigPO.setModifytime(new Date()); 66 newBatchDiffconfigPO.setProcessstatus(Constants.BATCH_ORDER_FINISHED); 67 newBatchDiffconfigPO.setIncsuccess(Long.valueOf(successCounter.get())); 68 newBatchDiffconfigPO.setIncfailnum(Long.valueOf(failCounter.get())); 69 newBatchDiffconfigPO.setUploadnum(newBatchDiffconfigPO.getIncfailnum() + newBatchDiffconfigPO.getIncsuccess()); 70 int resultCode = batchDiffconfigService.update(newBatchDiffconfigPO); 71 LOG.info( 72 "[BatchDiffconfigOrderTask] ======================================= excuteUpdateTask. batchid{},batchcode{}, Total cost {} s.", 73 newBatchDiffconfigPO.getBatchid(), newBatchDiffconfigPO.getBatchcode(), 74 (System.currentTimeMillis() - begin) * 0.001f); 75 if (resultCode == 0) { 76 LOG.error("[BatchDiffconfigOrderTask] updateByPrimaryKey fail.{}", newBatchDiffconfigPO); 77 return false; 78 } 79 return true; 80 } 81 82 }
3、消费者线程(读取队列中的号码请求ps,获取uin,存入数据库)
1 package com.richinfo.quanyi.thread; 2 3 import java.sql.Connection; 4 import java.sql.DriverManager; 5 import java.sql.PreparedStatement; 6 import java.sql.SQLException; 7 import java.util.HashMap; 8 import java.util.Map; 9 import java.util.Map.Entry; 10 import java.util.concurrent.BlockingQueue; 11 import java.util.concurrent.CountDownLatch; 12 import java.util.concurrent.TimeUnit; 13 import java.util.concurrent.atomic.AtomicInteger; 14 15 import org.slf4j.Logger; 16 import org.slf4j.LoggerFactory; 17 import org.springframework.util.StringUtils; 18 19 import com.richinfo.common.PsfServer; 20 import com.richinfo.common.SysConfig; 21 import com.richinfo.pfsc.bean.GPFSBean.MsgType; 22 import com.richinfo.quanyi.model.UserDiffConfigPO; 23 import com.richinfo.quanyi.service.BatchDiffconfigService; 24 25 /* 26 * 27 * @ClassName: BatchDiffconfigOrderConsumeThread 28 * @Description: 消耗线程,读取队列中的号码请求ps,获取uin,存入数据库 29 * @author JaceJiang 30 * @date 2018-1-18 下午2:13:46 31 */ 32 public class BatchDiffconfigOrderConsumeThread implements Runnable { 33 34 private static final Logger LOG = LoggerFactory.getLogger(BatchDiffconfigOrderConsumeThread.class); 35 private BlockingQueue<String> taskQueues; 36 private AtomicInteger successCounter; 37 private AtomicInteger failCounter; 38 private CountDownLatch countDownLatch; 39 private UserDiffConfigPO userDiffConfigPO; 40 private BatchDiffconfigService batchDiffconfigService; 41 42 public BatchDiffconfigOrderConsumeThread(BlockingQueue<String> taskQueues, AtomicInteger successCounter, 43 AtomicInteger failCounter, CountDownLatch countDownLatch, UserDiffConfigPO userDiffConfigPO, 44 BatchDiffconfigService batchDiffconfigService) { 45 this.taskQueues = taskQueues; 46 this.successCounter = successCounter; 47 this.failCounter = failCounter; 48 this.countDownLatch = countDownLatch; 49 this.userDiffConfigPO = userDiffConfigPO; 50 this.batchDiffconfigService = batchDiffconfigService; 51 } 52 53 @Override 54 public void run() { 55 boolean tag = true; 56 String usernumber = null; 57 LOG.info("[BatchDiffconfigOrderConsumeThread]run() {} is running.", Thread.currentThread().getName()); 58 String url = SysConfig.getProperty("jdbc.url"); 59 String user = SysConfig.getProperty("jdbc.username"); 60 String password = SysConfig.getProperty("jdbc.password"); 61 Connection con = null; 62 PreparedStatement pst = null; 63 // /////////// 64 String url2 = SysConfig.getProperty("hd.jdbc.url"); 65 String user2 = SysConfig.getProperty("hd.jdbc.username"); 66 String password2 = SysConfig.getProperty("hd.jdbc.password"); 67 Connection con2 = null; 68 PreparedStatement pst2 = null; 69 // /////////// 70 71 java.sql.Date nowDate = new java.sql.Date(System.currentTimeMillis()); 72 try { 73 Class.forName("oracle.jdbc.driver.OracleDriver"); 74 con = DriverManager.getConnection(url, user, password); 75 con2 = DriverManager.getConnection(url2, user2, password2); 76 // 关闭事务自动提交 77 con.setAutoCommit(false); 78 con2.setAutoCommit(false); 79 String excuteSql = createDynSql(userDiffConfigPO); 80 pst = con.prepareStatement(excuteSql); 81 pst2 = con2.prepareStatement(excuteSql); 82 } catch (ClassNotFoundException e1) { 83 e1.printStackTrace(); 84 LOG.error("[BatchDiffconfigOrderConsumeThread]run()ClassNotFoundException.", e1); 85 return; 86 } catch (SQLException e1) { 87 e1.printStackTrace(); 88 LOG.error("[BatchDiffconfigOrderConsumeThread]run()SQLException.", e1); 89 return; 90 } 91 int counter = 0; 92 int counter2 = 0; 93 Map<String, String> cacheMaps = new HashMap<String, String>(512); 94 Map<String, String> cacheMaps2 = new HashMap<String, String>(512);// 灰度 95 while (tag) { 96 try { 97 usernumber = taskQueues.poll(10, TimeUnit.SECONDS); 98 } catch (InterruptedException e) { 99 e.printStackTrace(); 100 break; 101 } 102 if (usernumber == null) { 103 LOG.info("[BatchDiffconfigOrderConsumeThread]run() {} is finish.", Thread.currentThread().getName()); 104 break; 105 } 106 if (usernumber.length() != 11) { 107 LOG.info(Thread.currentThread().getName() + "wrong number" + usernumber); 108 failCounter.incrementAndGet(); 109 LOG.error("[BatchDiffconfigOrderConsumeThread]run() number wrong.number:{}", usernumber); 110 } else { // 调用远程接口 111 String uin = PsfServer.reqPsServiceByMsgType(MsgType.MT_MAIL_NOTIFY_QUERY_VALUE, usernumber); 112 if (StringUtils.isEmpty(uin)) { 113 LOG.error("[BatchDiffconfigOrderConsumeThread]run() uin null.usernumber:{}", usernumber); 114 failCounter.incrementAndGet(); 115 continue; 116 } 117 118 String part = PsfServer.reqPsServiceByMsgType(MsgType.MT_GET_PARTID_VALUE, usernumber); 119 LOG.info("[BatchDiffconfigOrderConsumeThread]run() part:{}.usernumber:{}", part, usernumber); 120 if (StringUtils.isEmpty(part)) { 121 LOG.error("[BatchDiffconfigOrderConsumeThread]run() part null.usernumber:{}", usernumber); 122 failCounter.incrementAndGet(); 123 continue; 124 } else if ("1".equals(part)) {// 灰度分区 125 cacheMaps2.put(usernumber, uin); 126 boolean excuteOneByOneTag = false; 127 try { 128 pst2.setString(1, usernumber); 129 pst2.setLong(2, Long.valueOf(uin)); 130 pst2.setDate(3, nowDate); 131 pst2.addBatch(); 132 counter2++; 133 if (counter2 % 500 == 0) { 134 LOG.info("[BatchDiffconfigOrderConsumeThread]run() batch insert . now counter {}", counter2); 135 // 执行批量更新 136 try { 137 pst2.executeBatch(); 138 // 语句执行完毕,提交本事务 139 con2.commit(); 140 } catch (Exception e) { 141 e.printStackTrace(); 142 LOG.error("[BatchDiffconfigOrderConsumeThread]run() executeBatch error.", e); 143 excuteOneByOneTag = true; 144 } 145 if (excuteOneByOneTag) { 146 excuteOneByOneGrey(cacheMaps2, pst2, con2, nowDate, successCounter, failCounter); 147 } else { 148 successCounter.addAndGet(counter2); 149 counter2 = 0; 150 } 151 excuteOneByOneTag = false; 152 cacheMaps2.clear(); 153 } 154 /* 155 * if (StringUtils.isEmpty(uin) || "0".equals(uin)) { 156 * failCounter.incrementAndGet(); } else { 157 * successCounter.incrementAndGet(); } 158 */ 159 } catch (SQLException e) { 160 cacheMaps2.clear(); 161 e.printStackTrace(); 162 LOG.error("[BatchDiffconfigOrderConsumeThread]run() batch insert . now counter {}", counter2, e); 163 } 164 } else {// ("12".equals(part)) { 全网分区 165 cacheMaps.put(usernumber, uin); 166 boolean excuteOneByOneTag = false; 167 try { 168 pst.setString(1, usernumber); 169 pst.setLong(2, Long.valueOf(uin)); 170 pst.setDate(3, nowDate); 171 pst.addBatch(); 172 counter++; 173 if (counter % 500 == 0) { 174 LOG.info("[BatchDiffconfigOrderConsumeThread]run() batch insert . now counter {}", counter); 175 // 执行批量更新 176 try { 177 pst.executeBatch(); 178 // 语句执行完毕,提交本事务 179 con.commit(); 180 } catch (Exception e) { 181 e.printStackTrace(); 182 LOG.error("[BatchDiffconfigOrderConsumeThread]run() executeBatch error.", e); 183 excuteOneByOneTag = true; 184 } 185 if (excuteOneByOneTag) { 186 excuteOneByOne(cacheMaps, pst, con, nowDate, successCounter, failCounter); 187 } else { 188 successCounter.addAndGet(counter); 189 counter = 0; 190 } 191 excuteOneByOneTag = false; 192 cacheMaps.clear(); 193 } 194 /* 195 * if (StringUtils.isEmpty(uin) || "0".equals(uin)) { 196 * failCounter.incrementAndGet(); } else { 197 * successCounter.incrementAndGet(); } 198 */ 199 } catch (SQLException e) { 200 cacheMaps.clear(); 201 e.printStackTrace(); 202 LOG.error("[BatchDiffconfigOrderConsumeThread]run() batch insert . now counter {}", counter, e); 203 } 204 } 205 } 206 usernumber = null; 207 } 208 ////////////////////// 209 if (counter > 0) { 210 boolean excuteOneByOneTag = false; 211 try { 212 pst.executeBatch(); 213 con.commit(); 214 } catch (SQLException e) { 215 e.printStackTrace(); 216 LOG.error("[BatchDiffconfigOrderConsumeThread]run() batch insert . now counter {}", counter, e); 217 excuteOneByOneTag = true; 218 } 219 if (excuteOneByOneTag) { 220 excuteOneByOne(cacheMaps, pst, con, nowDate, successCounter, failCounter); 221 } else { 222 successCounter.addAndGet(counter); 223 counter = 0; 224 } 225 } 226 if (counter2 > 0) { 227 boolean excuteOneByOneTag = false; 228 try { 229 pst2.executeBatch(); 230 con2.commit(); 231 } catch (SQLException e) { 232 e.printStackTrace(); 233 LOG.error("[BatchDiffconfigOrderConsumeThread]run() batch insert . now counter {}", counter2, e); 234 excuteOneByOneTag = true; 235 } 236 if (excuteOneByOneTag) { 237 excuteOneByOneGrey(cacheMaps2, pst2, con2, nowDate, successCounter, failCounter); 238 } else { 239 successCounter.addAndGet(counter2); 240 counter2 = 0; 241 } 242 } 243 try { 244 pst.close(); 245 con.close(); 246 pst2.close(); 247 con2.close(); 248 } catch (SQLException e) { 249 LOG.error("[BatchDiffconfigOrderConsumeThread]run() close error.", e); 250 } 251 cacheMaps.clear(); 252 cacheMaps2.clear(); 253 countDownLatch.countDown(); 254 } 255 256 private String createDynSql(UserDiffConfigPO userDiffConfigPO) { 257 StringBuilder sbFront = new StringBuilder(); 258 StringBuilder sbBehind = new StringBuilder(); 259 sbFront.append("insert into OS_USERDIFFCONFIG_INFO (SEQ,USERNUMBER,UIN,CREATETIME,"); 260 sbBehind.append("values (get_seq('SEQ_OS_USERDIFFCONFIG_INFO'),?,?,?,"); 261 if (userDiffConfigPO.getAttmailboxcapacity() != null) { 262 sbFront.append("ATTMAILBOXCAPACITY,"); 263 sbBehind.append(userDiffConfigPO.getAttmailboxcapacity() + ","); 264 } 265 if (userDiffConfigPO.getMaxannexsize() != null) { 266 sbFront.append("MAXANNEXSIZE,"); 267 sbBehind.append(userDiffConfigPO.getMaxannexsize() + ","); 268 } 269 if (userDiffConfigPO.getDiskcapacity() != null) { 270 sbFront.append("DISKCAPACITY,"); 271 sbBehind.append(userDiffConfigPO.getDiskcapacity() + ","); 272 } 273 if (userDiffConfigPO.getStorecabinetcapacity() != null) { 274 sbFront.append("STORECABINETCAPACITY,"); 275 sbBehind.append(userDiffConfigPO.getStorecabinetcapacity() + ","); 276 } 277 if (userDiffConfigPO.getDiskfilesize() != null) { 278 sbFront.append("DISKFILESIZE,"); 279 sbBehind.append(userDiffConfigPO.getDiskfilesize() + ","); 280 } 281 if (userDiffConfigPO.getMailgsendlimit() != null) { 282 sbFront.append("MAILGSENDLIMIT,"); 283 sbBehind.append(userDiffConfigPO.getMailgsendlimit() + ","); 284 } 285 if (userDiffConfigPO.getSmsgsendlimit() != null) { 286 sbFront.append("SMSGSENDLIMIT,"); 287 sbBehind.append(userDiffConfigPO.getSmsgsendlimit() + ","); 288 } 289 if (userDiffConfigPO.getMmsgsendlimit() != null) { 290 sbFront.append("MMSGSENDLIMIT,"); 291 sbBehind.append(userDiffConfigPO.getMmsgsendlimit() + ","); 292 } 293 if (userDiffConfigPO.getAddrstorenum() != null) { 294 sbFront.append("ADDRSTORENUM,"); 295 sbBehind.append(userDiffConfigPO.getAddrstorenum() + ","); 296 } 297 if (userDiffConfigPO.getDisksharenum() != null) { 298 sbFront.append("DISKSHARENUM,"); 299 sbBehind.append(userDiffConfigPO.getDisksharenum() + ","); 300 } 301 if (userDiffConfigPO.getTranscribetimelen() != null) { 302 sbFront.append("TRANSCRIBETIMELEN,"); 303 sbBehind.append(userDiffConfigPO.getTranscribetimelen() + ","); 304 } 305 if (userDiffConfigPO.getFilesharenum() != null) { 306 sbFront.append("FILESHARENUM,"); 307 sbBehind.append(userDiffConfigPO.getFilesharenum() + ","); 308 } 309 if (userDiffConfigPO.getIntegralmultiple() != null) { 310 sbFront.append("INTEGRALMULTIPLE,"); 311 sbBehind.append(userDiffConfigPO.getIntegralmultiple() + ","); 312 } 313 if (userDiffConfigPO.getSmsnotifynum() != null) { 314 sbFront.append("SMSNOTIFYNUM,"); 315 sbBehind.append(userDiffConfigPO.getSmsnotifynum() + ","); 316 } 317 if (userDiffConfigPO.getMmsnotifynum() != null) { 318 sbFront.append("MMSNOTIFYNUM,"); 319 sbBehind.append(userDiffConfigPO.getMmsnotifynum() + ","); 320 } 321 if (userDiffConfigPO.getSmsmonthlimit() != null) { 322 sbFront.append("SMSMONTHLIMIT,"); 323 sbBehind.append(userDiffConfigPO.getSmsmonthlimit() + ","); 324 } 325 if (userDiffConfigPO.getMmsdaylimit() != null) { 326 sbFront.append("MMSDAYLIMIT,"); 327 sbBehind.append(userDiffConfigPO.getMmsdaylimit() + ","); 328 } 329 if (userDiffConfigPO.getMmsmonthlimit() != null) { 330 sbFront.append("MMSMONTHLIMIT,"); 331 sbBehind.append(userDiffConfigPO.getMmsmonthlimit() + ","); 332 } 333 if (userDiffConfigPO.getMaildaylimit() != null) { 334 sbFront.append("MAILDAYLIMIT,"); 335 sbBehind.append(userDiffConfigPO.getMaildaylimit() + ","); 336 } 337 if (userDiffConfigPO.getMailmonthlimit() != null) { 338 sbFront.append("MAILMONTHLIMIT,"); 339 sbBehind.append(userDiffConfigPO.getMailmonthlimit() + ","); 340 } 341 if (userDiffConfigPO.getAliasnum() != null) { 342 sbFront.append("ALIASNUM,"); 343 sbBehind.append(userDiffConfigPO.getAliasnum() + ","); 344 } 345 if (userDiffConfigPO.getAliaslen() != null) { 346 sbFront.append("ALIASLEN,"); 347 sbBehind.append(userDiffConfigPO.getAliaslen() + ","); 348 } 349 if (!StringUtils.isEmpty(userDiffConfigPO.getVipdomain())) { 350 sbFront.append("VIPDOMAIN,"); 351 sbBehind.append(userDiffConfigPO.getVipdomain() + ","); 352 } 353 if (!StringUtils.isEmpty(userDiffConfigPO.getVipidentity())) { 354 sbFront.append("VIPIDENTITY,"); 355 sbBehind.append(userDiffConfigPO.getVipidentity() + ","); 356 } 357 if (!StringUtils.isEmpty(userDiffConfigPO.getCosid())) { 358 sbFront.append("COSID,"); 359 sbBehind.append(userDiffConfigPO.getCosid() + ","); 360 } 361 if (!StringUtils.isEmpty(userDiffConfigPO.getMailgradeid())) { 362 sbFront.append("MAILGRADEID,"); 363 sbBehind.append(userDiffConfigPO.getMailgradeid() + ","); 364 } 365 if (!StringUtils.isEmpty(userDiffConfigPO.getLetterpaperid())) { 366 sbFront.append("LETTERPAPERID,"); 367 sbBehind.append(userDiffConfigPO.getLetterpaperid() + ","); 368 } 369 if (!StringUtils.isEmpty(userDiffConfigPO.getCongracardid())) { 370 sbFront.append("CONGRACARDID,"); 371 sbBehind.append(userDiffConfigPO.getCongracardid() + ","); 372 } 373 if (!StringUtils.isEmpty(userDiffConfigPO.getPostcardid())) { 374 sbFront.append("POSTCARDID,"); 375 sbBehind.append(userDiffConfigPO.getPostcardid() + ","); 376 } 377 if (!StringUtils.isEmpty(userDiffConfigPO.getDermaid())) { 378 sbFront.append("DERMAID,"); 379 sbBehind.append(userDiffConfigPO.getDermaid() + ","); 380 } 381 if (!StringUtils.isEmpty(userDiffConfigPO.getRemark())) { 382 sbFront.append("REMARK,"); 383 sbBehind.append(userDiffConfigPO.getRemark() + ","); 384 } 385 if (userDiffConfigPO.getFilesharesavedays() != null) { 386 sbFront.append("FILESHARESAVEDAYS,"); 387 sbBehind.append(userDiffConfigPO.getFilesharesavedays() + ","); 388 } 389 if (userDiffConfigPO.getSmspresentnum() != null) { 390 sbFront.append("SMSPRESENTNUM,"); 391 sbBehind.append(userDiffConfigPO.getSmspresentnum() + ","); 392 } 393 if (userDiffConfigPO.getMmspresentnum() != null) { 394 sbFront.append("MMSPRESENTNUM,"); 395 sbBehind.append(userDiffConfigPO.getMmspresentnum() + ","); 396 } 397 if (userDiffConfigPO.getSmsdaylimit() != null) { 398 sbFront.append("SMSDAYLIMIT,"); 399 sbBehind.append(userDiffConfigPO.getSmsdaylimit() + ","); 400 } 401 402 String front = sbFront.substring(0, sbFront.lastIndexOf(",")) + ")"; 403 String behind = sbBehind.substring(0, sbBehind.lastIndexOf(",")) + ")"; 404 return front + behind; 405 } 406 407 private void excuteOneByOne(Map<String, String> retryMaps, PreparedStatement pst, Connection con, 408 java.sql.Date nowDate, AtomicInteger success, AtomicInteger fail) { 409 if (retryMaps.size() == 0) 410 return; 411 for (Entry<String, String> entrys : retryMaps.entrySet()) { 412 String userNumber = entrys.getKey(); 413 if (batchDiffconfigService.countByUsernumber(userNumber) > 0) { 414 LOG.error("[BatchDiffconfigOrderConsumeThread]run() excuteOneByOne() userNumber exist.{}", userNumber); 415 fail.incrementAndGet(); 416 continue; 417 } 418 try { 419 pst.setString(1, entrys.getKey()); 420 pst.setLong(2, Long.valueOf(entrys.getValue())); 421 pst.setDate(3, nowDate); 422 pst.addBatch(); 423 pst.executeBatch(); 424 con.commit(); 425 success.incrementAndGet(); 426 } catch (SQLException e) { 427 fail.incrementAndGet(); 428 LOG.error("[BatchDiffconfigOrderConsumeThread]run() excuteOneByOne() error.", e); 429 } 430 } 431 } 432 433 private void excuteOneByOneGrey(Map<String, String> retryMaps, PreparedStatement pst, Connection con, 434 java.sql.Date nowDate, AtomicInteger success, AtomicInteger fail) { 435 if (retryMaps.size() == 0) 436 return; 437 for (Entry<String, String> entrys : retryMaps.entrySet()) { 438 String userNumber = entrys.getKey(); 439 if (batchDiffconfigService.countByUsernumberGrey(userNumber) > 0) { 440 LOG.error("[BatchDiffconfigOrderConsumeThread]run() excuteOneByOne() userNumber exist.{}", userNumber); 441 fail.incrementAndGet(); 442 continue; 443 } 444 try { 445 pst.setString(1, entrys.getKey()); 446 pst.setLong(2, Long.valueOf(entrys.getValue())); 447 pst.setDate(3, nowDate); 448 pst.addBatch(); 449 pst.executeBatch(); 450 con.commit(); 451 success.incrementAndGet(); 452 } catch (SQLException e) { 453 fail.incrementAndGet(); 454 LOG.error("[BatchDiffconfigOrderConsumeThread]run() excuteOneByOne() error.", e); 455 } 456 } 457 } 458 459 public static void main(String[] agrs) { 460 String sql = "1,2,3,4,5,"; 461 String result = sql.substring(0, sql.lastIndexOf(",")); 462 System.out.println(result); 463 } 464 }
4、生产者线程(读取压缩文件,将号码放进队列)
1 package com.richinfo.quanyi.thread; 2 3 import java.io.BufferedInputStream; 4 import java.io.BufferedReader; 5 import java.io.FileInputStream; 6 import java.io.InputStream; 7 import java.io.InputStreamReader; 8 import java.util.concurrent.BlockingQueue; 9 import java.util.concurrent.TimeUnit; 10 import java.util.zip.ZipEntry; 11 import java.util.zip.ZipFile; 12 import java.util.zip.ZipInputStream; 13 14 import org.slf4j.Logger; 15 import org.slf4j.LoggerFactory; 16 17 /* 18 * 19 * @ClassName: BatchDiffconfigOrderProduceThread 20 * @Description: 生产线程,读取压缩文件,将号码放进队列 21 * @author JaceJiang 22 * @date 2018-1-18 下午2:16:07 23 */ 24 public class BatchDiffconfigOrderProduceThread implements Runnable { 25 26 private static final Logger LOG = LoggerFactory.getLogger(BatchDiffconfigOrderProduceThread.class); 27 private BlockingQueue<String> taskQueues; 28 private String fileUrl; 29 30 public BatchDiffconfigOrderProduceThread(BlockingQueue<String> taskQueues, String fileUrl) { 31 this.taskQueues = taskQueues; 32 this.fileUrl = fileUrl; 33 } 34 35 @Override 36 public void run() { 37 LOG.info("[BatchDiffconfigOrderProduceThread] readZipFile.fileUrl :{} ", fileUrl); 38 try { 39 ZipFile zf = new ZipFile(fileUrl); 40 InputStream in = new BufferedInputStream(new FileInputStream(fileUrl)); 41 ZipInputStream zin = new ZipInputStream(in); 42 ZipEntry ze; 43 while ((ze = zin.getNextEntry()) != null) { 44 if (ze.isDirectory()) { 45 LOG.info("[BatchDiffconfigOrderProduceThread] {} is directory.", ze.getName()); 46 } else { 47 System.err.println("file - " + ze.getName() + " : " + ze.getSize() + " bytes"); 48 long size = ze.getSize(); 49 if (size > 0) { 50 BufferedReader br = new BufferedReader(new InputStreamReader(zf.getInputStream(ze))); 51 String line = ""; 52 boolean tags = true; 53 while (tags && (line = br.readLine()) != null) { 54 tags = taskQueues.offer(line, 10, TimeUnit.SECONDS); 55 } 56 br.close(); 57 } 58 } 59 } 60 zin.close(); 61 in.close(); 62 zf.close(); 63 } catch (Exception e) { 64 e.printStackTrace(); 65 LOG.info("[BatchDiffconfigOrderProduceThread] wrong. {}", e); 66 } 67 } 68 }
3、BatchDiffconfigOrderFutureTask:处理任务结束后,对处理结果进行下一步操作
1 /* 2 * 3 * @ClassName: BatchDiffconfigOrderFutureTask 4 * @Description: FutureTask,处理任务结束后,对处理结果进行下一步操作 5 * @author JaceJiang 6 * @date 2018-1-18 下午2:14:39 7 */ 8 public class BatchDiffconfigOrderFutureTask extends FutureTask<Boolean> { 9 10 private static final Logger LOG = LoggerFactory.getLogger(BatchDiffconfigOrderFutureTask.class); 11 private BatchDiffconfigPO batchDiffconfigPO; 12 private BatchDiffconfigService batchDiffconfigService; 13 14 public BatchDiffconfigOrderFutureTask(Callable<Boolean> callable, BatchDiffconfigService batchDiffconfigService, 15 BatchDiffconfigPO batchDiffconfigPO) { 16 super(callable); 17 this.batchDiffconfigPO = batchDiffconfigPO; 18 this.batchDiffconfigService = batchDiffconfigService; 19 } 20 21 @Override 22 protected void done() { 23 Boolean result = false; 24 try { 25 result = get(60, TimeUnit.SECONDS); 26 Long batchid = batchDiffconfigPO.getBatchid(); 27 if (!result) { 28 BatchDiffconfigPO newBatchDiffconfigPO = new BatchDiffconfigPO(); 29 newBatchDiffconfigPO.setModifytime(new Date()); 30 newBatchDiffconfigPO.setStatus(Constants.BATCH_ORDER_FAILED); 31 newBatchDiffconfigPO.setBatchid(batchid); 32 if (batchDiffconfigService.update(newBatchDiffconfigPO) == 0) { 33 LOG.error("[BatchDiffconfigOrderFutureTask] update fail.result:{},batchid:{}", result, batchid); 34 } 35 } 36 LOG.info("[BatchDiffconfigOrderFutureTask] done.result:{},batchid:{}", result, batchid); 37 } catch (TimeoutException | InterruptedException | ExecutionException e) { 38 LOG.error("[BatchDiffconfigOrderFutureTask] error.{}", e); 39 e.printStackTrace(); 40 } 41 } 42 }