zoukankan      html  css  js  c++  java
  • websocket+rabbitmq实战

    1. websocket+rabbitmq实战

    1.1. 前言

      接到的需求是后台定向给指定web登录用户推送消息,且可能同一账号会登录多个客户端都要接收到消息

    1.2. 遇坑

    1. 基于springboot环境搭建的websocket+rabbitmq,搭建完成后发现websocket每隔一段时间会断开,看网上有人因为nginx的连接超时机制断开,而我这似乎是因为长连接空闲时间太长而断开
    2. 经过测试,如果一直保持每隔段时间发送消息,那么连接不会断开,所以我采用了断开重连机制,分三种情况
      1. 服务器正常,客户端正常且空闲时间不超过1分钟,则情况正常,超过一分钟会断线,前端发起请求重连
      2. 服务器正常,客户端关闭或注销,服务器正常收到通知,去除对应客户端session
      3. 服务器异常,客户端正常,客户端发现连不上服务器会尝试重连3次,3次都连不上放弃重连
    3. rabbitmq定向推送,按需求需要一台机器对应一批用户,所以定制化需要服务启动的时候定向订阅该ip对应的队列名,简单说就是动态队列名的设定,所以又复杂了点,不能直接在注解写死。同时因为使用的apollo配置中心,同一集群应该相同的配置,所以也不能通过提取配置的方式设定值,为了这个点设置apollo的集群方式有点小题大做,所以采用动态读取数据库对应的ip取出对应的队列名。
    4. 部署线上tomcat的话,不需要加上一块代码
    /**
     * 使用tomcat启动无需配置
     */
    //@Configuration
    //@ConditionalOnProperty(name="websocket.enabled",havingValue = "true")
    public class WebSocketConfig {
        @Bean
        public ServerEndpointExporter serverEndpointExporter() {
            return new ServerEndpointExporter();
        }
    }
    

    1.3. 正式代码

    1.3.1. rabbimq部分

    1. application.properties配置
    spring.rabbitmq.addresses = i.tzxylao.com:5672
    spring.rabbitmq.username = admin
    spring.rabbitmq.password = 123456
    spring.rabbitmq.virtual-host = /
    spring.rabbitmq.connection-timeout = 15000
    
    1. 交换机和队列配置
    /**
     * @author laoliangliang
     * @date 2019/3/29 11:41
     */
    @Configuration
    @ConditionalOnProperty(name="websocket.enabled",havingValue = "true")
    public class RabbitmqConfig {
    
        final public static String EXCHANGENAME = "websocketExchange";
    
        /**
         * 创建交换器
         */
        @Bean
        FanoutExchange exchange() {
            return new FanoutExchange(EXCHANGENAME);
        }
    
        @Bean
        public Queue queue(){
            return new Queue(orderQueueName());
        }
    
        @Bean
        Binding bindingExchangeMessage(Queue queue,FanoutExchange exchange) {
            return BindingBuilder.bind(queue).to(exchange);
        }
    
        @Bean
        public SimpleMessageListenerContainer messageListenerContainer(OrderReceiver orderReceiver, @Qualifier("rabbitConnectionFactory") CachingConnectionFactory cachingConnectionFactory){
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory);
            // 监听队列的名称
            container.setQueueNames(orderQueueName());
            container.setExposeListenerChannel(true);
            // 设置每个消费者获取的最大消息数量
            container.setPrefetchCount(100);
            // 消费者的个数
            container.setConcurrentConsumers(1);
            // 设置确认模式为自动确认
            container.setAcknowledgeMode(AcknowledgeMode.AUTO);
            container.setMessageListener(orderReceiver);
            return container;
        }
    
    
        /**
         * 在这里写获取订单队列名的具体过程
         * @return
         */
        public String orderQueueName(){
            return "orderChannel";
        }
    }
    
    
    1. 消息监听类
    /**
     * @author laoliangliang
     * @date 2019/3/29 11:38
     */
    @Component
    @Slf4j
    @ConditionalOnProperty(name="websocket.enabled",havingValue = "true")
    public class OrderReceiver implements ChannelAwareMessageListener {
    
        @Autowired
        private MyWebSocket myWebSocket;
    
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            byte[] body = message.getBody();
            log.info("接收到消息:" + new String(body));
            try {
                myWebSocket.sendMessage(new String(body));
            } catch (IOException e) {
                log.error("send rabbitmq message error", e);
            }
        }
    }
    

    1.3.2. websocket部分

    1. 配置服务端点
    @Configuration
    @ConditionalOnProperty(name="websocket.enabled",havingValue = "true")
    public class WebSocketConfig {
        @Bean
        public ServerEndpointExporter serverEndpointExporter() {
            return new ServerEndpointExporter();
        }
    }
    
    1. 核心代码
    /**
     * @author laoliangliang
     * @date 2019/3/28 14:40
     */
    public abstract class AbstractWebSocket {
    
        protected static Map<String, CopyOnWriteArraySet<Session>> sessionStore = new HashMap<>();
    
        public void sendMessage(String message) throws IOException {
            List<String> userCodes = beforeSendMessage();
            for (String userCode : userCodes) {
                CopyOnWriteArraySet<Session> sessions = sessionStore.get(userCode);
                //阻塞式的(同步的)
                if (sessions !=null && sessions.size() != 0) {
                    for (Session s : sessions) {
                        if (s != null) {
                            s.getBasicRemote().sendText(message);
                        }
                    }
                }
            }
        }
    
        /**
         * 删选给谁发消息
         * @return
         */
        protected abstract List<String> beforeSendMessage();
    
        protected void clearSession(Session session) {
            Collection<CopyOnWriteArraySet<Session>> values = sessionStore.values();
            for (CopyOnWriteArraySet<Session> sessions : values) {
                for (Session session1 : sessions) {
                    if (session.equals(session1)) {
                        sessions.remove(session);
                    }
                }
            }
        }
    }
    
    @ServerEndpoint(value = "/websocket")
    @Component
    @ConditionalOnProperty(name="websocket.enabled",havingValue = "true")
    public class MyWebSocket extends AbstractWebSocket {
    
    
        private static Logger log = LogManager.getLogger(MyWebSocket.class);
    
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        @PostConstruct
        public void init() {
            /*ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
            executorService.scheduleAtFixedRate(new Runnable() {
    
                int i = 0;
    
                @Override
                public void run() {
                    amqpTemplate.convertAndSend(RabbitFanout.EXCHANGENAME, "",("msg num : " + i).getBytes());
                    i++;
                }
            }, 50, 1, TimeUnit.SECONDS);*/
        }
    
        /**
         * 连接建立成功调用的方法
         *
         * @param session 可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据
         */
        @OnOpen
        public void onOpen(Session session) throws TimeoutException {
            log.info("websocket connect");
            //10M
            session.setMaxTextMessageBufferSize(10485760);
        }
    
        /**
         * 连接关闭调用的方法
         */
        @OnClose
        public void onClose(Session session) {
            clearSession(session);
        }
    
        /**
         * 收到客户端消息后调用的方法
         *
         * @param message 客户端发送过来的消息
         * @param session 可选的参数
         */
        @OnMessage
        public void onMessage(String message, Session session) {
            log.info("from client request:" + message);
            CopyOnWriteArraySet<Session> sessions = sessionStore.get(message);
            if (sessions == null) {
                sessions = new CopyOnWriteArraySet<>();
            }
            sessions.add(session);
            sessionStore.put(message, sessions);
        }
    
        /**
         * 发生错误时调用
         *
         * @param session
         * @param error
         */
        @OnError
        public void onError(Session session, Throwable error) {
            clearSession(session);
        }
    
        /**
         * 这里返回需要给哪些用户发送消息
         * @return
         */
        @Override
        protected List<String> beforeSendMessage() {
            //TODO 给哪些用户发送消息
            return Lists.newArrayList("6");
        }
    }
    

    1.3.3. 前端代码

    var websocket = null;
    var reconnectCount = 0;
    function connectSocket(){
        var data = basicConfig();
        if(data.websocketEnable !== "true"){
            return;
        }
        //判断当前浏览器是否支持WebSocket
        if ('WebSocket' in window) {
            if(data.localIp && data.localIp !== "" && data.serverPort && data.serverPort !== ""){
                websocket = new WebSocket("ws://"+data.localIp+":"+data.serverPort+data.serverContextPath+"/websocket");
            }else{
                return;
            }
        }else {
            alert('当前浏览器 不支持WebSocket')
        }
    
        //连接发生错误的回调方法
        websocket.onerror = function () {
            console.log("连接发生错误");
        };
    
        //连接成功建立的回调方法
        websocket.onopen = function () {
            reconnectCount = 0;
            console.log("连接成功");
        };
    
        //接收到消息的回调方法,此处添加处理接收消息方法,当前是将接收到的信息显示在网页上
        websocket.onmessage = function (event) {
            console.log("receive message:" + event.data);
        };
    
        //连接关闭的回调方法
        websocket.onclose = function () {
            console.log("连接关闭,如需登录请刷新页面。");
            if(reconnectCount === 3) {
                reconnectCount = 0;
                return;
            }
            connectSocket();
            basicConfig();
            reconnectCount++;
        };
    
        //添加事件监听
        websocket.addEventListener('open', function () {
            websocket.send(data.userCode);
        });
    
    
        //监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
        window.onbeforeunload = function () {
            console.log("closeWebSocket");
        };
    
    
    }
    
    connectSocket();
    
    function basicConfig(){
            var result = {};
            $.ajax({
                type: "post",
                async: false,
                url: "${request.contextPath}/basicConfig",
                data: {},
                success: function (data) {
                    result = data;
                }
            });
            return result;
        }
    

    1.3.4. 后端提供接口

        @ApolloConfig
        private Config config;
    
        @RequestMapping(value = {"/basicConfig"})
        @ResponseBody
        public Map<String, Object> getUserCode(HttpSession session) {
            Map<String, Object> map = new HashMap<>(2);
            map.put("userCode",String.valueOf(session.getAttribute("userCode")));
            String websocketEnable = config.getProperty("websocket.enabled", "false");
            String serverContextPath  = config.getProperty("server.context-path", "");
            map.put("websocketEnable", websocketEnable);
            map.put("serverContextPath", serverContextPath);
    
            String localIp = config.getProperty("local.ip", "");
            String serverPort = config.getProperty("server.port", "80");
    
            map.put("localIp", localIp);
            map.put("serverPort", serverPort);
            return map;
        }
    
  • 相关阅读:
    submile text3常用快捷键
    校外登录知网
    关于tensorflow和numpy版本问题FutureWarning: Passing (type, 1) or '1type' as a synonym of type is deprecated;
    全序列卷积神经网络( deep fully convolutional neural network, DFCNN)实践记录
    Windows anaconda中下载tensorflow,keras和其他库在pycharm中的配置
    Windows10 CUDA 、CUDNN、tensorflow、kreas、python版本选择和安装
    kaldi安装
    python 基础算法
    Python 笔记
    Qt-QCustomplot参考
  • 原文地址:https://www.cnblogs.com/sky-chen/p/10636964.html
Copyright © 2011-2022 走看看