/** * 线程配置 */ public enum SystemMsgSendConfig { ; public static final int POOL_MAXIMUM_SIZE = 10; public static final int POOL_KEEP_ALIVE_TIME = 1; public static final int POOL_AWAIT_TIME_OUT = 20; public static final int POOL_TRREAD_NUM = 3;//线程数 public static final Integer size = 500;//页大小 public static final Boolean paging = true;//是否分页 }
/** * 自定义多线程 */ private void sendSystemNotifyToUser(NotifyAddForm form) { if(StringUtil.isEmpty(form.getSenderUserId())){ throw new BusinessException("发送用户不能为空!"); } //记录消息内容 String notifyId = saveNotifyContent(form); //0-主账号 1-子账号 2-全部 Integer selectType = NotifyScopeEnum.selectType(form.getScope()); /** * 批处理 */ Integer page = 1; int ALL_NUM = 0;//全部数量 long totalStartTime = System.currentTimeMillis(); // 获取开始时间 List<NotifyUser> notifyUserList = new ArrayList<>(); List<String> userIds = new ArrayList<>(); logger.info("[系统消息 - 批量发送 - 开始] notifyId={}, form={}", notifyId, JSON.toJSONString(form)); while (page > 0){ //获取分页数据 userIds = fetchUserList(userIds, selectType, paging, page, SystemMsgSendConfig.size); if(CollectionUtil.isEmpty(userIds)){ page = 0; break; } //组装数据 notifyUserList = getNotifyUserList(notifyUserList, userIds, notifyId, form); //跑批 long unitStartTime = System.currentTimeMillis(); // 获取开始时间 String threadName = Thread.currentThread().getName(); logger.info("[单次执行信息 - 开始] notifyId={}, 线程={}, 当前页码={}, 当前页大小={}条", notifyId, threadName, page, notifyUserList.size()); ALL_NUM = ALL_NUM + notifyUserList.size(); ThreadPoolExecutor threadPool = new ThreadPoolExecutor(POOL_CORE_SIZE, SystemMsgSendConfig.POOL_MAXIMUM_SIZE, SystemMsgSendConfig.POOL_KEEP_ALIVE_TIME, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(SystemMsgSendConfig.POOL_TRREAD_NUM)); //int POOL_TRREAD_NUM = 3;//开启指定线程去分批处理数据 List<NotifyUser> thread_1 = new LinkedList<>(); List<NotifyUser> thread_2 = new LinkedList<>(); List<NotifyUser> thread_3 = new LinkedList<>(); try { CountDownLatch latch = new CountDownLatch(SystemMsgSendConfig.POOL_TRREAD_NUM); for (NotifyUser user : notifyUserList) { long bitNum = user.getTimeStamp() % SystemMsgSendConfig.POOL_TRREAD_NUM; if (bitNum == 0) { thread_1.add(user); } else if (bitNum == 1) { thread_2.add(user); } else if (bitNum == 2) { thread_3.add(user); } } threadPool.execute(new Thread(new UserTask(latch, thread_1, notifyId))); threadPool.execute(new Thread(new UserTask(latch, thread_2, notifyId))); threadPool.execute(new Thread(new UserTask(latch, thread_3, notifyId))); logger.info("[单次执行信息 - 线程] notifyId={}, 当前线程池大小={}, 当前队列大小={}", notifyId, threadPool.getPoolSize(), threadPool.getQueue().size()); latch.await();//同步阻塞,直到所有线程工作完成 threadPool.shutdown(); //支持等待以前提交的任务停止执行 //所有任务关闭请求或线程中断或超时,阻塞取消 threadPool.awaitTermination(SystemMsgSendConfig.POOL_AWAIT_TIME_OUT, TimeUnit.MINUTES); threadPool.shutdownNow(); logger.info("[单次执行信息 - 结束] notifyId={}, page={}, totalTime={}ms", notifyId, page, System.currentTimeMillis() - unitStartTime); } catch (Exception e) { page = 0; threadPool.shutdownNow(); logger.error("[系统消息-批量发送] exception-msg:", e); throw new BusinessException("系统消息-批量发送失败!"); } //设置分页信息 userIds.clear(); notifyUserList.clear(); page ++; } logger.info("[系统消息 - 批量发送 - 结束] notifyId={}, totalTime={}ms, allNum={}", notifyId, System.currentTimeMillis()-totalStartTime, ALL_NUM); return; }
/** * 执行任务 */
class UserTask implements Runnable { private CountDownLatch latch; List<NotifyUser> notifyUserList = null; String notifyId = null; public UserTask(CountDownLatch latch, List<NotifyUser> notifyUserList, String notifyId) { this.latch = latch; this.notifyUserList = notifyUserList; this.notifyId = notifyId; } @Override public void run() { try { //发送系统消息 notifyUserService.batchSave(notifyUserList); } catch (Exception e) { logger.error("[系统消息-用户批量发送->UserTask->(Thread)Run],error is {}", e); } finally { latch.countDown(); } } /** * 获取用户ids集合 * @param notifyUserList * @return */ private List<String> getUserIdsList(List<NotifyUser> notifyUserList) { List<String> list = new ArrayList<>(); notifyUserList.forEach(notifyUser -> { list.add(notifyUser.getUserId()); }); return list; } }