zoukankan      html  css  js  c++  java
  • 多线程-2

    /**
     * 线程配置
     */
    public enum SystemMsgSendConfig {
        ;
        public static final int POOL_MAXIMUM_SIZE = 10;
        public static final int POOL_KEEP_ALIVE_TIME = 1;
        public static final int POOL_AWAIT_TIME_OUT = 20;
        public static final int POOL_TRREAD_NUM = 3;//线程数
        public static final Integer size = 500;//页大小
        public static final Boolean paging = true;//是否分页
    }
    

      

         /**
    	 * 自定义多线程
    	 */
    	private void sendSystemNotifyToUser(NotifyAddForm form) {
    
    		if(StringUtil.isEmpty(form.getSenderUserId())){
    			throw new BusinessException("发送用户不能为空!");
    		}
    
    		//记录消息内容
    		String notifyId = saveNotifyContent(form);
    		//0-主账号 1-子账号 2-全部
    		Integer selectType = NotifyScopeEnum.selectType(form.getScope());
    
    		/**
    		 * 批处理
    		 */
    		Integer page = 1;
    		int ALL_NUM = 0;//全部数量
    		long totalStartTime = System.currentTimeMillis(); // 获取开始时间
    		List<NotifyUser> notifyUserList = new ArrayList<>();
    		List<String> userIds = new ArrayList<>();
    		logger.info("[系统消息 - 批量发送 - 开始] notifyId={}, form={}", notifyId, JSON.toJSONString(form));
    		while (page > 0){
    
    			//获取分页数据
    			userIds = fetchUserList(userIds, selectType, paging, page, SystemMsgSendConfig.size);
    			if(CollectionUtil.isEmpty(userIds)){
    				page = 0;
    				break;
    			}
    
    			//组装数据
    			notifyUserList = getNotifyUserList(notifyUserList, userIds, notifyId, form);
    
    			//跑批
    			long unitStartTime = System.currentTimeMillis(); // 获取开始时间
    			String threadName = Thread.currentThread().getName();
    			logger.info("[单次执行信息 - 开始] notifyId={}, 线程={}, 当前页码={}, 当前页大小={}条", notifyId, threadName, page, notifyUserList.size());
    			ALL_NUM = ALL_NUM + notifyUserList.size();
    			ThreadPoolExecutor threadPool = new ThreadPoolExecutor(POOL_CORE_SIZE,
    					SystemMsgSendConfig.POOL_MAXIMUM_SIZE,
    					SystemMsgSendConfig.POOL_KEEP_ALIVE_TIME,
    					TimeUnit.MINUTES,
    					new LinkedBlockingQueue<Runnable>(SystemMsgSendConfig.POOL_TRREAD_NUM));
    			//int POOL_TRREAD_NUM = 3;//开启指定线程去分批处理数据
    			List<NotifyUser> thread_1 = new LinkedList<>();
    			List<NotifyUser> thread_2 = new LinkedList<>();
    			List<NotifyUser> thread_3 = new LinkedList<>();
    			try {
    				CountDownLatch latch = new CountDownLatch(SystemMsgSendConfig.POOL_TRREAD_NUM);
    				for (NotifyUser user : notifyUserList) {
    					long bitNum = user.getTimeStamp() % SystemMsgSendConfig.POOL_TRREAD_NUM;
    					if (bitNum == 0) {
    						thread_1.add(user);
    					} else if (bitNum == 1) {
    						thread_2.add(user);
    					} else if (bitNum == 2) {
    						thread_3.add(user);
    					}
    				}
    				threadPool.execute(new Thread(new UserTask(latch, thread_1, notifyId)));
    				threadPool.execute(new Thread(new UserTask(latch, thread_2, notifyId)));
    				threadPool.execute(new Thread(new UserTask(latch, thread_3, notifyId)));
    				logger.info("[单次执行信息 - 线程] notifyId={}, 当前线程池大小={}, 当前队列大小={}", notifyId, threadPool.getPoolSize(), threadPool.getQueue().size());
    				latch.await();//同步阻塞,直到所有线程工作完成
    				threadPool.shutdown();
    				//支持等待以前提交的任务停止执行
    				//所有任务关闭请求或线程中断或超时,阻塞取消
    				threadPool.awaitTermination(SystemMsgSendConfig.POOL_AWAIT_TIME_OUT, TimeUnit.MINUTES);
    				threadPool.shutdownNow();
    				logger.info("[单次执行信息 - 结束] notifyId={}, page={}, totalTime={}ms", notifyId, page, System.currentTimeMillis() - unitStartTime);
    			} catch (Exception e) {
    				page = 0;
    				threadPool.shutdownNow();
    				logger.error("[系统消息-批量发送] exception-msg:", e);
    				throw new BusinessException("系统消息-批量发送失败!");
    			}
    
    			//设置分页信息
    			userIds.clear();
    			notifyUserList.clear();
    			page ++;
    		}
    		logger.info("[系统消息 - 批量发送 - 结束] notifyId={}, totalTime={}ms, allNum={}", notifyId, System.currentTimeMillis()-totalStartTime, ALL_NUM);
    		return;
    	}

     

         /**
    	 * 执行任务
    	 */
    class UserTask implements Runnable {
    
    		private CountDownLatch latch;
    		List<NotifyUser> notifyUserList = null;
    		String notifyId = null;
    		public UserTask(CountDownLatch latch, List<NotifyUser> notifyUserList, String notifyId) {
    			this.latch = latch;
    			this.notifyUserList = notifyUserList;
    			this.notifyId = notifyId;
    		}
    
    		@Override
    		public void run() {
    			try {
    				//发送系统消息
    				notifyUserService.batchSave(notifyUserList);
    			} catch (Exception e) {
    				logger.error("[系统消息-用户批量发送->UserTask->(Thread)Run],error is {}", e);
    			} finally {
    				latch.countDown();
    			}
    		}
    
    		/**
    		 * 获取用户ids集合
    		 * @param notifyUserList
    		 * @return
    		 */
    		private List<String> getUserIdsList(List<NotifyUser> notifyUserList) {
    			List<String> list = new ArrayList<>();
    			notifyUserList.forEach(notifyUser -> {
    				list.add(notifyUser.getUserId());
    			});
    			return list;
    		}
    
    	}
    

      

  • 相关阅读:
    [kuangbin带你飞]专题七 线段树
    [kuangbin带你飞]专题六 最小生成树
    [kuangbin带你飞]专题五 并查集
    [kuangbin带你飞]专题四 最短路练习
    [kuangbin带你飞]专题三 Dancing Links
    [kuangbin带你飞]专题二 搜索进阶
    [kuangbin带你飞]专题一 简单搜索
    常用算法模板
    ACM程序设计选修课——Problem E:(ds:图)公路村村通(Prim)
    HDU——5667Sequence(矩阵快速幂+费马小定理应用)
  • 原文地址:https://www.cnblogs.com/wanhua-wu/p/9036769.html
Copyright © 2011-2022 走看看