zoukankan      html  css  js  c++  java
  • Java服务器端消息队列实战

    服务端口监听--报文接收--报文解码--业务处理--报文编码--写回客户端

    从服务端与客户端成功握手并产生一个socket后,为了提高吞吐能力,接下来的事情就可以交给多线程去处理。

    为了对接入的请求做合理的限制、控制,引入消息队列缓冲技术。

    队列,主动推送消息和被动拉取消息两种方式实现,并且可以在两种实现上增加自定义的策略,例如:流量控制等。

    接下来将使用Java语言实现队列与多线程整合技术的实现。

    这里直接使用LinkedBlockingQueue队列,自带队列阻塞功能,免去线程安全控制。

    package hope.queue.blockdemo;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.LinkedBlockingQueue;
    /**
     * 消息队列缓冲定义
     * @author hp
     *
     */
    public class PushBlockQueue extends LinkedBlockingQueue<Object>{
        
        private static final long serialVersionUID = -8224792866430647454L;
        private static ExecutorService es = Executors.newFixedThreadPool(10);//线程池
        private static PushBlockQueue pbq = new PushBlockQueue();//单例
        private boolean flag = false;
        
        private PushBlockQueue(){}
        
        public static PushBlockQueue getInstance(){
            return pbq;
        }
        
        /**
         * 队列监听启动
         */
        public void start(){
            if(!this.flag){
                this.flag = true;
            }else{
                throw new IllegalArgumentException("队列已处于启动状态,不允许重复启动.");
            }
            new Thread(new Runnable(){
                @Override
                public void run() {
                    while(flag){
                        try {
                            Object obj = take();//使用阻塞模式获取队列消息
                            //将获取消息交由线程池处理
                            es.execute(new PushBlockQueueHandler(obj));
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }).start();
            
        }
        
        /**
         * 停止队列监听
         */
        public void stop(){
            this.flag = false;
        }
    }

    定义队列处理器,这个处理器实现Runnable接口,是为了与线程池做衔接。

    package hope.queue.blockdemo;
    /**
     * 队列消息处理实现
     * @author hp
     *
     */
    public class PushBlockQueueHandler implements Runnable {
    
        private Object obj;
        public PushBlockQueueHandler(Object obj){
            this.obj = obj;
        }
        
        @Override
        public void run() {
            doBusiness();
        }
        
        /**
         * 业务处理时限
         */
        public void doBusiness(){
            System.out.println(" 处理请求 "+obj );
        }
    
    }

    测试实例

    package hope.queue.blockdemo;
    
    public class AppTest {
    
        /**
         * @param args
         * @throws Exception 
         */
        public static void main(String[] args) throws Exception {
            PushBlockQueue.getInstance().start();
            for(;;){
                Thread.sleep(1000);
                PushBlockQueue.getInstance().put("0123456");
            }
        }
    
    }

    输出结果

    处理请求 0123456
    处理请求 0123456
    处理请求 0123456
    处理请求 0123456
    处理请求 0123456
    处理请求 0123456
    处理请求 0123456
    处理请求 0123456
    处理请求 0123456
    处理请求 0123456
    处理请求 0123456
    处理请求 0123456
    处理请求 0123456
    处理请求 0123456
    处理请求 0123456
    处理请求 0123456
    处理请求 0123456
    处理请求 0123456
    处理请求 0123456
    处理请求 0123456
    处理请求 0123456
    处理请求 0123456
    处理请求 0123456
    处理请求 0123456
    处理请求 0123456
    处理请求 0123456
    处理请求 0123456
    处理请求 0123456

    这种模式可以应用到很多场景,希望对大家工作上有所帮助。

  • 相关阅读:
    Xshell相关优化
    Inotify+rsync远程实时同步
    MySQL主从复制故障解决
    Docker部署centos7容器
    Salt-ssh批量部署minion
    MySQL数据库二
    防火墙2
    MySQl数据库
    防火墙
    http原理2
  • 原文地址:https://www.cnblogs.com/orionhp/p/6364149.html
Copyright © 2011-2022 走看看