zoukankan      html  css  js  c++  java
  • 【springcloud】【转载】基于redis消息订阅和websocket实现的消息推送

    基于redis消息订阅和websocket技术实现的消息推送

                本文【转载】自:https://my.oschina.net/freide/blog/2991435

    依赖文件

        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-websocket</artifactId>
            </dependency>
        </dependencies>

    创建Redis消息监听者容器

    @Configuration
    public class RedisConfig {
    
        /**
         * 创建消息监听器
         * @param factory
         * @return
         */
        @Bean
        public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory factory) {
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(factory);
            return container;
        }
    }

    创建Websocket配置类

     

    import org.springframework.context.annotation.Bean;
    import org.springframework.stereotype.Component;
    import org.springframework.web.socket.server.standard.ServerEndpointExporter;
    
    /**
    * 这个配置类的作用是要注入ServerEndpointExporter,这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint。
    * 如果是使用独立的servlet容器,而不是直接使用springboot的内置容器,就不要注入ServerEndpointExporter,因为它将由容器自己提供和管理。

    */ @Component
    public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }

     

    创建消息订阅监听者类

     

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.data.redis.connection.Message;
    import org.springframework.data.redis.connection.MessageListener;
    import org.springframework.stereotype.Component;
    
    import javax.websocket.Session;
    import java.io.IOException;
    
    /**
     * redis消息订阅监听者
     */
    @Component
    public class RedisSubscribeListener implements MessageListener {
    
        private Logger logger = LoggerFactory.getLogger(this.getClass());
    
        //webSocket客户端会话对象
        private Session session;
    
        /**
         * 接收发布者消息
         * @param message
         * @param bytes
         */
        @Override
        public void onMessage(Message message, byte[] bytes) {
            String msg = new String(message.getBody());
            logger.info("[{}]主题发布:{}", new String(bytes), msg);
            if (session != null && session.isOpen()) {
                try {
                    session.getBasicRemote().sendText(msg);
                } catch (IOException e) {
                    logger.error("[redis监听器]发布消息异常:{}", e);
                }
            }
        }
    
        public Session getSession() {
            return session;
        }
    
        public void setSession(Session session) {
            this.session = session;
        }
    }

     这个消息订阅监听者类持有websocket的客户端会话对象(session),当接收到订阅的消息时,通过这个会话对象(session)将消息发送到前端,从而实现消息的主动推送。

    创建Websocket服务端类

    @Component
    
    @ServerEndpoint("/websocket/server")
    
    public class WebSocketServer {
    
        /**
    
         * 因为@ServerEndpoint不支持注入,所以使用SpringUtils获取IOC实例
    
         */
    
        private RedisMessageListenerContainer redisMessageListenerContainer = SpringUtils.getBean(RedisMessageListenerContainer.class);
    
        //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
    
         private static  AtomicInteger onlineCount=new AtomicInteger(0);
    
         //concurrent包的线程安全Set,用来存放每个客户端对应的webSocket对象。若要实现服务端与单一客户端通信的话,可以使用Map来存放,其中Key可以为用户标识
    
         private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();
    
         //与某个客户端的连接会话,需要通过它来给客户端发送数据
    
         private Session session;
    
         private SubscribeListener subscribeListener;
    
        /**
    
         * 连接建立成功调用的方法
    
         * @param session  可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据
    
         */
    
        @OnOpen
    
        public void onOpen(Session session){
    
            this.session = session;
    
            webSocketSet.add(this);     //加入set中
    
            addOnlineCount();           //在线数加1
    
            System.out.println("有新连接加入!当前在线人数为" + getOnlineCount());
    
            subscribeListener = new SubscribeListener();
    
            subscribeListener.setSession(session);
    
            //设置订阅topic
    
            redisMessageListenerContainer.addMessageListener(subscribeListener, new ChannelTopic("TOPIC"));
    
        }
    
        /**
    
         * 连接关闭调用的方法
    
         */
    
        @OnClose
    
        public void onClose() throws IOException {
    
            webSocketSet.remove(this);  //从set中删除
    
            subOnlineCount();           //在线数减1
    
            redisMessageListenerContainer.removeMessageListener(subscribeListener);
    
            System.out.println("有一连接关闭!当前在线人数为" + getOnlineCount());
    
        }
    
    
    
        /**
    
         * 收到客户端消息后调用的方法
    
         * @param message 客户端发送过来的消息
    
         * @param session 可选的参数
    
         */
    
        @OnMessage
    
        public void onMessage(String message, Session session) {
    
            System.out.println("来自客户端的消息:" + message);
    
            //群发消息
    
            for(WebSocketServer item: webSocketSet){
    
                try {
    
                    item.sendMessage(message);
    
                } catch (IOException e) {
    
                    e.printStackTrace();
    
                    continue;
    
                }
    
            }
    
        }
    
    
    
        /**
    
         * 发生错误时调用
    
         * @param session
    
         * @param error
    
         */
    
        @OnError
    
        public void onError(Session session, Throwable error){
    
            System.out.println("发生错误");
    
            error.printStackTrace();
    
        }
    
    
    
        /**
    
         * 这个方法与上面几个方法不一样。没有用注解,是根据自己需要添加的方法。
    
         * @param message
    
         * @throws IOException
    
         */
    
        public void sendMessage(String message) throws IOException {
    
            this.session.getBasicRemote().sendText(message);
    
        }
    
    
    
        public   int getOnlineCount() {
    
            return onlineCount.get();
    
        }
    
    
    
        public   void addOnlineCount() {
    
            WebSocketServer.onlineCount.getAndIncrement();
    
        }
    
    
    
        public   void subOnlineCount() {
    
            WebSocketServer.onlineCount.getAndDecrement();
    
        }
    
    }
    @ServerEndpoint 注解是一个类层次的注解,它的功能主要是将目前的类定义成一个websocket服务器端,注解的值将被用于监听用户连接的终端访问URL地址,
    客户端可以通过这个URL来连接到WebSocket服务器端使用springboot的唯一区别是要@Component声明下,而使用独立容器是由容器自己管理websocket的,
    但在springboot中连容器都是spring管理的。

     虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean,所以可以用一个静态set保存起来。


    注意的是在客户端链接关闭的方法onClose中,一定要 删除之前的订阅监听对象,就是下面这行代码:
    redisMessageListenerContainer.removeMessageListener(subscribeListener);
     
     否则在浏览器刷一下之后,后台会报如下错误:
    java.lang.IllegalStateException: The WebSocket session [0] has been closed and no method (apart from close())
    may be called on a closed session
     
    原因就是当链接关闭之后,session对象就没有了,而订阅者对象还是会接收消息,在用session对象发送消息时会报错。
    虽然代码中加了判断 if(null != session && session.isOpen()) { 可以避免报错,但是为了防止内存泄漏,应该把没有用的监听者对象从容器中删除。

    创建前端页面

       在resourcestatic目录下创建html页面,命名为websocket.html。代码如下:

     <!doctype html>
    
    <html xmlns:th="http://www.thymeleaf.org">
    
    <head>
    
        <meta charset="utf-8"></meta>
    
        <title>websocket</title>
    
    </head>
    
    <h4>
    
    使用redis订阅消息和websocket实现消息推送
    
    </h4>
    
    <br/>
    
    <h5>收到的订阅消息:</h5>
    
    <div id="message_id"></div>
    
    </body>
    
    <script type="text/javascript">
    
        var websocket = null;
    
        //当前浏览前是否支持websocket
    
        if("WebSocket" in window){
    
            var url = "ws://localhost:8080/demo/websocket/server";
    
            websocket = new WebSocket(url);
    
        }else{
    
            alert("浏览器不支持websocket");
    
        }
    
    
    
        websocket.onopen = function(event){
    
            setMessage("打开连接");
    
        }
    
    
    
        websocket.onclose = function(event){
    
            setMessage("关闭连接");
    
        }
    
    
    
        websocket.onmessage = function(event){
    
            setMessage(event.data);
    
        }
    
    
    
        websocket.onerror = function(event){
    
            setMessage("连接异常");
    
        }
    
    
    
        //监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
    
        window.onbeforeunload = function(){
    
            closeWebsocket();
    
        }
    
    
    
        //关闭websocket
    
        function closeWebsocket(){
    
            //3代表已经关闭
    
            if(3!=websocket.readyState){
    
                websocket.close();
    
            }else{
    
                alert("websocket之前已经关闭");
    
            }
    
        }
    
        //将消息显示在网页上
    
        function setMessage(message){
    
            document.getElementById('message_id').innerHTML += message + '<br/>';
    
        }
    
    </script>
    
    </html>
    View Code

    启动服务进行测试

    1. 启动springboot服务,浏览器输入地址:http://localhost:8080/demo/websocket.html,此时页面显示如下

     2.打开redis客户端,在命令行输入publish  TOPIC   “this is test message”

     浏览器页面显示如下:

    说明刚刚发布的消息已经主动推送到浏览器显示了。

      完整代码见: https://gitee.com/freide/springboot

     

    【转载】自 https://my.oschina.net/freide/blog/2991435

     

     

     

    ____________________________特此,勉励____________________________
    本文作者cheng2839
    本文链接https://www.cnblogs.com/cheng2839
    关于博主:评论和私信会在第一时间回复。
    版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
    声援博主:如果您觉得文章对您有帮助,可以点击文章右下角【推荐】一下。您的鼓励是博主的最大动力!
  • 相关阅读:
    视频: 不抱怨才有今天的马云---励志演讲
    ArcGIS图框工具5.2发布,支持ArcGIS10.0,10.110.2,支持国家2000坐标系
    arcgis 10.2 安装教程(含下载地址)
    delete
    基金销售牌照火热的背后,基金销售牌照、基金支付牌照
    快递业务经营许可证企业信息(截止2016.6.30)
    1月北上广P2P平台之最 平台数成交量现双降
    公募基金牌照:谁在布局?
    delete
    各地互联网小贷牌照申请全揭秘
  • 原文地址:https://www.cnblogs.com/cheng2839/p/13637235.html
Copyright © 2011-2022 走看看