zoukankan      html  css  js  c++  java
  • spring+rabbitmq+stomp搭建websocket消息推送(非spring boot方式)

    前言:

    两年前做过spring+activemq+stomp的ws推送,那个做起来很简单,但现在公司用的mq中间件是rabbitmq,因此需要通过rabbitmq去做ws通信。仔细搜了搜百度/谷歌,网上通过spring boot+rabbitmq+stomp的教程文章倒是一搜一大把,可惜目前的项目是非spring boot的,没法套用。只好自己去捣鼓。搞了几个小时,终于弄出来了,特此与大家分享下。

    RabbitMQ:

     怎么安装就不是本篇讨论的话题了,自己百度/谷歌之。rabbitmq默认自带了stomp插件,但是需要自己启用。命令为:

    rabbitmq-plugins enable rabbitmq_stomp

    来来来,给个文档地址参考参考,http://www.rabbitmq.com/stomp.html。默认用guest用户去连接,密码也是guest。

    这里有个问题,看rabbitmq配置文件,stomp协议端口默认是61613,但是用ws协议连接却始终连接不上,所以只能用web stomp端口,端口号是15674,这个跟activemq有所区别。(P.S. 此处最好有大神来解惑,或者告知如何用61613来连

    Javascript:

    前端代码撸起来最方便,关键是调试也容易,因此先来。

    var stompClient = null;
    
    var headers = {
      login: 'guest',
      passcode: 'guest'
    };
    
    function wsConnect(url) {
        var ws = new SockJS(url);
        stompClient = Stomp.over(ws);
    
        //var ws = new WebSocket(url);
        //stompClient = Stomp.over(ws);
    
        // SockJS does not support heart-beat: disable heart-beats
        stompClient.heartbeat.outgoing = 0;
        stompClient.heartbeat.incoming = 0;
    
        stompClient.connect(headers, function (frame) {
            console.log('Connected: ' + frame);
    
            stompClient.subscribe('/topic/test', function (sms) {
                var obj = JSON.parse(sms.body)
                var count = obj.totalCount;
    
                console.log("count: " + count);
            });
    
        });
    }
    

    然后就连接呗。

    $(function(){
        var url = "http://host:15674/stomp";
        wsConnect(url);
    });   
    

     撸完准备测试,当然是选择chrome喽,页面加载后,打开console控制台,可以看到web socket连上了,前端大功告成。

      

    Java:

    定义一个StompService类专门用来发送stomp消息。注意:rabbitmq 3.7以后stomp插件不再支持sockjs,因此写法会有变化。

    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.messaging.converter.StringMessageConverter;
    import org.springframework.messaging.simp.stomp.StompHeaders;
    import org.springframework.stereotype.Component;
    import org.springframework.web.socket.WebSocketHttpHeaders;
    import org.springframework.web.socket.client.WebSocketClient;
    import org.springframework.web.socket.client.standard.StandardWebSocketClient;
    import org.springframework.web.socket.messaging.WebSocketStompClient;
    import org.springframework.web.socket.sockjs.client.SockJsClient;
    import org.springframework.web.socket.sockjs.client.Transport;
    import org.springframework.web.socket.sockjs.client.WebSocketTransport;
    
    import javax.annotation.PostConstruct;
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * stomp服务  rabbitmq做中间件
     * @author Selwyn
     * @version $Id: WebSocketConfig.java, v 0.1 9/7/2018 9:59 AM Selwyn Exp $
     */
    @Component
    public class StompService {
    
        private static final String URL_TEMPLATE = "http://%s:%s/stomp";
    
        @Value("${rabbit.host}")
        private String host;
    
        //@Value("${rabbit.stomp.port}")
        private Integer port = 15674;
    
        /**
         * 连接用户名
         */
        //@Value("${rabbit.stomp.login}")
        private String login = "guest";
        /**
         * 连接密码
         */
        //@Value("${rabbit.stomp.passCode}")
        private String passCode = "guest";
    
        private String url;
    
        @PostConstruct
        public void init()
        {
            url = String.format(URL_TEMPLATE, host, port);
        }
    
        /**
         * 发送stomp消息
         * @param dest  目的地 比如/topic/test
         * @param toSend  要发送的信息
         * @param <T>
         */
        public <T> void connectAndSend(String dest, T toSend)
        {
            WebSocketClient client = new StandardWebSocketClient();
    
            List<Transport> transports = new ArrayList<>(1);
            transports.add(new WebSocketTransport( client) );
            //rabbitmq 3.7以后就别这么写了。直接new WebSocketStompClient(client)就行
            WebSocketClient transport = new SockJsClient(transports);
            WebSocketStompClient stompClient = new WebSocketStompClient(transport);
            //StompSessionHandlerAdapter默认的payload类型是String, 因此MessageConverter必须是StringMessageConverter
            stompClient.setMessageConverter(new StringMessageConverter());
    
            final CustomStompSessionHandler sessionHandler =
                    new CustomStompSessionHandler(dest, toSend);
    
            WebSocketHttpHeaders headers = new WebSocketHttpHeaders();
            headers.setSecWebSocketProtocol("13");
    
            //连接用户名/密码也是必须的,否则连不上
            StompHeaders sHeaders = new StompHeaders();
            sHeaders.add("login", this.login);
            sHeaders.add("passcode", this.passCode);
    
            //开始连接,回调连接上后发送stomp消息
            stompClient.connect(url, headers, sHeaders, sessionHandler);
    
            //要同步得到发送结果的话,用CountDownLatch来做或者connect结果的future对象做get
        }
    
    }
    

    然后编写回调handler类。

    import com.alibaba.fastjson.JSON;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.messaging.simp.stomp.StompCommand;
    import org.springframework.messaging.simp.stomp.StompHeaders;
    import org.springframework.messaging.simp.stomp.StompSession;
    import org.springframework.messaging.simp.stomp.StompSessionHandlerAdapter;
    
    
    /**
     * 自定义stomp session 回调handler
     * @author Selwyn
     * @version $Id: CustomStompSessionHandler.java, v 0.1 9/7/2018 3:43 PM Selwyn Exp $
     */
    @Slf4j
    public class CustomStompSessionHandler extends StompSessionHandlerAdapter {
    
        /**
         * 要发送的对象,将会json化传输出去
         */
        private Object toSend;
    
        /**
         * 目的地,一般是topic地址
         */
        private String dest;
    
        public CustomStompSessionHandler(String dest, Object toSend) {
            this.toSend = toSend;
            this.dest = dest;
        }
    
        @Override
        public void handleFrame(StompHeaders headers, Object payload) {
            super.handleFrame(headers, payload);
        }
    
        @Override
        public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
            super.afterConnected(session, connectedHeaders);
            String msg = JSON.toJSONString(toSend);
            try{
                session.send(dest, msg);
            }catch(Exception e)
            {
                log.error("failed to send stomp msg({}) to destination {}", msg, dest);
            }finally {
                //做完了关闭呗
                session.disconnect();
            }
        }
    
        @Override
        public void handleException(StompSession session, StompCommand command, StompHeaders headers, byte[] payload, Throwable exception) {
            super.handleException(session, command, headers, payload, exception);
            log.error("stomp error: {}", exception);
        }
    
        @Override
        public void handleTransportError(StompSession session, Throwable exception) {
            super.handleTransportError(session, exception);
            log.error("stomp transport error: {}", exception);
        }
    
        public void setToSend(Object toSend) {
            this.toSend = toSend;
        }
    
        public void setDest(String dest) {
            this.dest = dest;
        }
    }
    

    再自己写个controller或者写个单元测试方法,这里就不给出代码了,撸完后启动服务,就可以测试消息推送了,实践证明,真香!

    结尾:

    其实整个过程还没完,需要考虑到连接中断等情况,客户端和服务后台都需要做好重连机制。通过sockjs这种方式连接是没有心跳机制的,这个比activemq带的stomp插件要low。个人建议,如果能用spring boot的话尽量用那种方式去实现stomp消息推送。

      

  • 相关阅读:
    linux系统下抢占式内核与非抢占式内核的区别
    Cache映射
    Delphi利用系统环境变量获取常用系统目录
    visual studio2008中AJAX的安装配置,及错误!
    网站配置工具无法建立与数据库的连接的解决方案
    PowDesigner工具的使用
    近日网站开发收获(一)
    (转载)power designer 12.5和破解补丁下载
    《Sqlserver 之我的新大陆》
    学习之路
  • 原文地址:https://www.cnblogs.com/selwynHome/p/9609298.html
Copyright © 2011-2022 走看看