/**
* 线程配置
*/
public enum SystemMsgSendConfig {
;
public static final int POOL_MAXIMUM_SIZE = 10;
public static final int POOL_KEEP_ALIVE_TIME = 1;
public static final int POOL_AWAIT_TIME_OUT = 20;
public static final int POOL_TRREAD_NUM = 3;//线程数
public static final Integer size = 500;//页大小
public static final Boolean paging = true;//是否分页
}
/**
* 自定义多线程
*/
private void sendSystemNotifyToUser(NotifyAddForm form) {
if(StringUtil.isEmpty(form.getSenderUserId())){
throw new BusinessException("发送用户不能为空!");
}
//记录消息内容
String notifyId = saveNotifyContent(form);
//0-主账号 1-子账号 2-全部
Integer selectType = NotifyScopeEnum.selectType(form.getScope());
/**
* 批处理
*/
Integer page = 1;
int ALL_NUM = 0;//全部数量
long totalStartTime = System.currentTimeMillis(); // 获取开始时间
List<NotifyUser> notifyUserList = new ArrayList<>();
List<String> userIds = new ArrayList<>();
logger.info("[系统消息 - 批量发送 - 开始] notifyId={}, form={}", notifyId, JSON.toJSONString(form));
while (page > 0){
//获取分页数据
userIds = fetchUserList(userIds, selectType, paging, page, SystemMsgSendConfig.size);
if(CollectionUtil.isEmpty(userIds)){
page = 0;
break;
}
//组装数据
notifyUserList = getNotifyUserList(notifyUserList, userIds, notifyId, form);
//跑批
long unitStartTime = System.currentTimeMillis(); // 获取开始时间
String threadName = Thread.currentThread().getName();
logger.info("[单次执行信息 - 开始] notifyId={}, 线程={}, 当前页码={}, 当前页大小={}条", notifyId, threadName, page, notifyUserList.size());
ALL_NUM = ALL_NUM + notifyUserList.size();
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(POOL_CORE_SIZE,
SystemMsgSendConfig.POOL_MAXIMUM_SIZE,
SystemMsgSendConfig.POOL_KEEP_ALIVE_TIME,
TimeUnit.MINUTES,
new LinkedBlockingQueue<Runnable>(SystemMsgSendConfig.POOL_TRREAD_NUM));
//int POOL_TRREAD_NUM = 3;//开启指定线程去分批处理数据
List<NotifyUser> thread_1 = new LinkedList<>();
List<NotifyUser> thread_2 = new LinkedList<>();
List<NotifyUser> thread_3 = new LinkedList<>();
try {
CountDownLatch latch = new CountDownLatch(SystemMsgSendConfig.POOL_TRREAD_NUM);
for (NotifyUser user : notifyUserList) {
long bitNum = user.getTimeStamp() % SystemMsgSendConfig.POOL_TRREAD_NUM;
if (bitNum == 0) {
thread_1.add(user);
} else if (bitNum == 1) {
thread_2.add(user);
} else if (bitNum == 2) {
thread_3.add(user);
}
}
threadPool.execute(new Thread(new UserTask(latch, thread_1, notifyId)));
threadPool.execute(new Thread(new UserTask(latch, thread_2, notifyId)));
threadPool.execute(new Thread(new UserTask(latch, thread_3, notifyId)));
logger.info("[单次执行信息 - 线程] notifyId={}, 当前线程池大小={}, 当前队列大小={}", notifyId, threadPool.getPoolSize(), threadPool.getQueue().size());
latch.await();//同步阻塞,直到所有线程工作完成
threadPool.shutdown();
//支持等待以前提交的任务停止执行
//所有任务关闭请求或线程中断或超时,阻塞取消
threadPool.awaitTermination(SystemMsgSendConfig.POOL_AWAIT_TIME_OUT, TimeUnit.MINUTES);
threadPool.shutdownNow();
logger.info("[单次执行信息 - 结束] notifyId={}, page={}, totalTime={}ms", notifyId, page, System.currentTimeMillis() - unitStartTime);
} catch (Exception e) {
page = 0;
threadPool.shutdownNow();
logger.error("[系统消息-批量发送] exception-msg:", e);
throw new BusinessException("系统消息-批量发送失败!");
}
//设置分页信息
userIds.clear();
notifyUserList.clear();
page ++;
}
logger.info("[系统消息 - 批量发送 - 结束] notifyId={}, totalTime={}ms, allNum={}", notifyId, System.currentTimeMillis()-totalStartTime, ALL_NUM);
return;
}
/**
* 执行任务
*/
class UserTask implements Runnable {
private CountDownLatch latch;
List<NotifyUser> notifyUserList = null;
String notifyId = null;
public UserTask(CountDownLatch latch, List<NotifyUser> notifyUserList, String notifyId) {
this.latch = latch;
this.notifyUserList = notifyUserList;
this.notifyId = notifyId;
}
@Override
public void run() {
try {
//发送系统消息
notifyUserService.batchSave(notifyUserList);
} catch (Exception e) {
logger.error("[系统消息-用户批量发送->UserTask->(Thread)Run],error is {}", e);
} finally {
latch.countDown();
}
}
/**
* 获取用户ids集合
* @param notifyUserList
* @return
*/
private List<String> getUserIdsList(List<NotifyUser> notifyUserList) {
List<String> list = new ArrayList<>();
notifyUserList.forEach(notifyUser -> {
list.add(notifyUser.getUserId());
});
return list;
}
}