zoukankan      html  css  js  c++  java
  • 优先级线程池实现

    运维在升级,无聊写博客

      最近在实现消息通知平台上面,对于针对不同的通知需要设置优先级,实现当通知队列堵塞的时候可以有限推送高优先级的消息。为了保证通知队列的高效并发,通知队列的消费端是采用多线程并发处理的,因此需要实现一个可以实现优先级的多线程处理逻辑:

    对于ThreadPollExecutor来说,

    public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)

    如果实现优先级线程池需要注意一下三点

    1.线程池中新加入的线程会放到workQueue中,如果是优先级队列,那么该参数必须要是PriorityBlockingQueue。

    2.PriorityBlockingQueue容器中最终存储的是FutureTask对象,改对象是newTaskFor实例化的,因此需要实现继承自Comparable的FutureTask实现【例如:ComparableFutureTask】

    3.ComparableFutureTask中实现比较线程的优先级,需要将实例化具有优先级的线程对象【例如:PriorityTask】

    如上根据上面的点,可参考的代码如下

    【PriorityTask】

    public abstract class PriorityTask implements Runnable, Comparable<PriorityTask> {
    
        private Integer prority;
    
    
    
        public PriorityTask(Integer prority) {
            this.prority = prority;
        }
    
        @Override
        public abstract void run();
        
    
        @Override
        public int compareTo(PriorityTask o) {
    
            return prority.compareTo(o.prority);
        }
    }
    View Code

    【带有ComparableFutureTask的PriorityThreadPoolExecutor】

    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.Callable;
    import java.util.concurrent.FutureTask;
    import java.util.concurrent.RejectedExecutionHandler;
    import java.util.concurrent.RunnableFuture;
    import java.util.concurrent.ThreadFactory;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    
    public class PriorityThreadPoolExecutor extends ThreadPoolExecutor {
    
        public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
                        long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
                    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
                }
    
                public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
                        long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
                        RejectedExecutionHandler handler) {
                    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
                }
    
                public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
                        long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
                        ThreadFactory threadFactory) {
                    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
                }
    
                public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
                        long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
                        ThreadFactory threadFactory, RejectedExecutionHandler handler) {
                    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
                }
    
                @Override
                protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
                    return new ComparableFutureTask<T>(runnable, value);
                }
    
                @Override
                protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
                    return new ComparableFutureTask<T>(callable);
                }
    
                protected class ComparableFutureTask<V>
                        extends FutureTask<V> implements Comparable<ComparableFutureTask<V>> {
                    private Object object;
    
                    public ComparableFutureTask(Callable<V> callable) {
                        super(callable);
                        object = callable;
                    }
    
                    public ComparableFutureTask(Runnable runnable, V result) {
                        super(runnable, result);
                        object = runnable;
                    }
    
                    @Override
                    @SuppressWarnings("unchecked")
                    public int compareTo(ComparableFutureTask<V> o) {
                        if (this == o) {
                            return 0;
                        }
                        if (o == null) {
                            return -1; // high priority
                        }
                        if (object != null && o.object != null) {
                            if (object.getClass().equals(o.object.getClass())) {
                                if (object instanceof Comparable) {
                                    return ((Comparable) object).compareTo(o.object);
                                }
                            }
                        }
                        return 0;
                    }
                }
    }
    View Code

    【使用代码如下】

    import java.util.Queue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.PriorityBlockingQueue;
    import java.util.concurrent.TimeUnit;
    
    import javax.annotation.PostConstruct;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    import com.wjs.common.config.ConfigUtil;
    import com.wjs.message.bo.NotifyMessage;
    import com.wjs.message.service.notify.NotifyService;
    import com.wjs.message.service.queue.QueueService;
    
    /**
     * 消息发送队列,优先级队列实现
     * 
     * @author Silver
     * @date 2016年12月20日 下午8:20:45
     * 
     *
     */
    @Service("queueService")
    public class QueueServicePriorityImpl implements QueueService {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(QueueServicePriorityImpl.class);
    
        private volatile static Queue<NotifyMessage> queue = new PriorityBlockingQueue<NotifyMessage>(100000);
    
        @Autowired
        NotifyService notifyService;
    
        /**
         * 服务初始化,启动队列消费
         * 
         * @author Silver
         * @date 2016年12月21日 上午9:07:20
         */
        @PostConstruct
        public void init() {
    
            new Thread(new Runnable() {
    
                @Override
                public void run() {
    
                    Integer execSize = ConfigUtil.getInteger("message.queue.poll.execsize");
                    if (null == execSize || execSize == 0) {
                        // 由于任务后续是发送邮件/短信/调用APP推送/调用dubbo的,是非CPU密集型的计算,因此线程数控制在核数  * 3的值
                        execSize = Double.valueOf(Runtime.getRuntime().availableProcessors() * 3).intValue();
                    }
                    LOGGER.info("Queue_Consume_thread_size:{}",execSize);
                    ExecutorService es = new PriorityThreadPoolExecutor(execSize, execSize,
                                    0L, TimeUnit.MILLISECONDS,
                                    new PriorityBlockingQueue<Runnable>());
                    while (true) {
                        //**    poll         移除并返问队列头部的元素    如果队列为空,则返回null
                        final NotifyMessage message = queue.poll();
                        if (null != message) {
                            es.submit(new PriorityTask(message.getPriority()) {
    
                                @Override
                                public void run() {
    
                                    try {
                                        System.out.println(message);
                                    } catch (Exception e) {
                                        LOGGER.error("MessageQueue-ERROR->:{},", message, e);
                                    }
                                }
                            });
                        }
                    }
    
                }
            }).start();
        }
    
        @Override
        public boolean push(NotifyMessage message) {
            //        offer  添加一个元素并返回true 如果队列已满,或者异常情况,则返回false
    
            return queue.offer(message);
        }
    
        @Override
        public Integer size() {
    
            return queue.size();
        }
    
    }
    View Code

    【单测代码如下】

    import org.junit.Test;
    import org.springframework.beans.factory.annotation.Autowired;
    
    import com.wjs.message.bo.NotifyMessage;
    import com.wjs.message.bo.NotifyMessageEmail;
    import com.wjs.message.bo.NotifyMessagePush;
    import com.wjs.message.bo.NotifyMessageSms;
    import com.wjs.message.bo.NotifyMessageSys;
    import com.wjs.message.service.BaseServiceTest;
    
    
    public class QueueTestService extends BaseServiceTest{
    
        
        @Autowired
        QueueService queueService;
        
        @Test
        public void testPull(){
            for (int i = 10000; i > 1; i--) {
    
                NotifyMessage message = new NotifyMessage();
                switch (i % 3) {
                case 0:
                    message = new NotifyMessageEmail();
                    message.setContent("Email"+i);
                    break;
                case 1:
                    message = new NotifyMessageSys();
                    message.setContent("Sys"+i);
                    break;
                case 2:
                    message = new NotifyMessageSms();
                    message.setContent("Sms"+i);
                    break;
                case 3:
                    message = new NotifyMessagePush();
                    message.setContent("Push"+i);
                    break;
    
                default:
                    break;
                }
                message.setContent(i+"");
                message.setPriority(i);
                queueService.push(message);
            }
            
            try {
                Thread.sleep(1000000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
    }
    View Code
  • 相关阅读:
    MySQL 你好,死锁
    Python+Scrapy+Selenium数据采集
    令牌桶限频(Token Bucket)
    Go 逃逸分析
    ElasticSearch 连载二 中文分词
    ElasticSearch 连载一 基础入门
    基于GitLab CI搭建Golang自动构建环境
    Go 性能分析之案例一
    MySQL InnoDB 行记录格式(ROW_FORMAT)
    MySQL InnoDB 逻辑存储结构
  • 原文地址:https://www.cnblogs.com/loveyou/p/6227352.html
Copyright © 2011-2022 走看看