运维在升级,无聊写博客
最近在实现消息通知平台上面,对于针对不同的通知需要设置优先级,实现当通知队列堵塞的时候可以有限推送高优先级的消息。为了保证通知队列的高效并发,通知队列的消费端是采用多线程并发处理的,因此需要实现一个可以实现优先级的多线程处理逻辑:
对于ThreadPollExecutor来说,
public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
如果实现优先级线程池需要注意一下三点
1.线程池中新加入的线程会放到workQueue中,如果是优先级队列,那么该参数必须要是PriorityBlockingQueue。
2.PriorityBlockingQueue容器中最终存储的是FutureTask对象,改对象是newTaskFor实例化的,因此需要实现继承自Comparable的FutureTask实现【例如:ComparableFutureTask】
3.ComparableFutureTask中实现比较线程的优先级,需要将实例化具有优先级的线程对象【例如:PriorityTask】
如上根据上面的点,可参考的代码如下
【PriorityTask】
public abstract class PriorityTask implements Runnable, Comparable<PriorityTask> { private Integer prority; public PriorityTask(Integer prority) { this.prority = prority; } @Override public abstract void run(); @Override public int compareTo(PriorityTask o) { return prority.compareTo(o.prority); } }
【带有ComparableFutureTask的PriorityThreadPoolExecutor】
import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.FutureTask; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.RunnableFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); } public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); } public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } @Override protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new ComparableFutureTask<T>(runnable, value); } @Override protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new ComparableFutureTask<T>(callable); } protected class ComparableFutureTask<V> extends FutureTask<V> implements Comparable<ComparableFutureTask<V>> { private Object object; public ComparableFutureTask(Callable<V> callable) { super(callable); object = callable; } public ComparableFutureTask(Runnable runnable, V result) { super(runnable, result); object = runnable; } @Override @SuppressWarnings("unchecked") public int compareTo(ComparableFutureTask<V> o) { if (this == o) { return 0; } if (o == null) { return -1; // high priority } if (object != null && o.object != null) { if (object.getClass().equals(o.object.getClass())) { if (object instanceof Comparable) { return ((Comparable) object).compareTo(o.object); } } } return 0; } } }
【使用代码如下】
import java.util.Queue; import java.util.concurrent.ExecutorService; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; import javax.annotation.PostConstruct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.wjs.common.config.ConfigUtil; import com.wjs.message.bo.NotifyMessage; import com.wjs.message.service.notify.NotifyService; import com.wjs.message.service.queue.QueueService; /** * 消息发送队列,优先级队列实现 * * @author Silver * @date 2016年12月20日 下午8:20:45 * * */ @Service("queueService") public class QueueServicePriorityImpl implements QueueService { private static final Logger LOGGER = LoggerFactory.getLogger(QueueServicePriorityImpl.class); private volatile static Queue<NotifyMessage> queue = new PriorityBlockingQueue<NotifyMessage>(100000); @Autowired NotifyService notifyService; /** * 服务初始化,启动队列消费 * * @author Silver * @date 2016年12月21日 上午9:07:20 */ @PostConstruct public void init() { new Thread(new Runnable() { @Override public void run() { Integer execSize = ConfigUtil.getInteger("message.queue.poll.execsize"); if (null == execSize || execSize == 0) { // 由于任务后续是发送邮件/短信/调用APP推送/调用dubbo的,是非CPU密集型的计算,因此线程数控制在核数 * 3的值 execSize = Double.valueOf(Runtime.getRuntime().availableProcessors() * 3).intValue(); } LOGGER.info("Queue_Consume_thread_size:{}",execSize); ExecutorService es = new PriorityThreadPoolExecutor(execSize, execSize, 0L, TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>()); while (true) { //** poll 移除并返问队列头部的元素 如果队列为空,则返回null final NotifyMessage message = queue.poll(); if (null != message) { es.submit(new PriorityTask(message.getPriority()) { @Override public void run() { try { System.out.println(message); } catch (Exception e) { LOGGER.error("MessageQueue-ERROR->:{},", message, e); } } }); } } } }).start(); } @Override public boolean push(NotifyMessage message) { // offer 添加一个元素并返回true 如果队列已满,或者异常情况,则返回false return queue.offer(message); } @Override public Integer size() { return queue.size(); } }
【单测代码如下】
import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; import com.wjs.message.bo.NotifyMessage; import com.wjs.message.bo.NotifyMessageEmail; import com.wjs.message.bo.NotifyMessagePush; import com.wjs.message.bo.NotifyMessageSms; import com.wjs.message.bo.NotifyMessageSys; import com.wjs.message.service.BaseServiceTest; public class QueueTestService extends BaseServiceTest{ @Autowired QueueService queueService; @Test public void testPull(){ for (int i = 10000; i > 1; i--) { NotifyMessage message = new NotifyMessage(); switch (i % 3) { case 0: message = new NotifyMessageEmail(); message.setContent("Email"+i); break; case 1: message = new NotifyMessageSys(); message.setContent("Sys"+i); break; case 2: message = new NotifyMessageSms(); message.setContent("Sms"+i); break; case 3: message = new NotifyMessagePush(); message.setContent("Push"+i); break; default: break; } message.setContent(i+""); message.setPriority(i); queueService.push(message); } try { Thread.sleep(1000000); } catch (InterruptedException e) { e.printStackTrace(); } } }