zoukankan      html  css  js  c++  java
  • websocket redis实现集群即时消息聊天

    websocket与redismq实现集群消息聊天

    1.application.properties

    server.port=8081
    
    #thymeleaf配置
    #是否启用模板缓存。
    spring.thymeleaf.cache=false
    #是否为Web框架启用Thymeleaf视图解析。
    spring.thymeleaf.enabled=true
    #在SpringEL表达式中启用SpringEL编译器。
    spring.thymeleaf.enable-spring-el-compiler=true
    #模板文件编码。
    spring.thymeleaf.encoding=UTF-8
    #要应用于模板的模板模式。另请参见Thymeleaf的TemplateMode枚举。
    spring.thymeleaf.mode=HTML5
    #在构建URL时添加前缀以查看名称的前缀。
    spring.thymeleaf.prefix=classpath:/templates/
    #Content-Type写入HTTP响应的值。
    spring.thymeleaf.servlet.content-type=text/html
    #在构建URL时附加到视图名称的后缀。
    spring.thymeleaf.suffix=.html
    
    ##单服务器
    spring.redis.host=192.168.159.129
    ##单端口
    spring.redis.port=6379
    ## 连接池最大连接数(使用负值表示没有限制) 
    spring.redis.pool.max-active=300
    ## Redis数据库索引(默认为0) 
    spring.redis.database=0
    ## 连接池最大阻塞等待时间(使用负值表示没有限制) 
    spring.redis.pool.max-wait=-1
    ## 连接池中的最大空闲连接 
    spring.redis.pool.max-idle=100
    ## 连接池中的最小空闲连接 
    spring.redis.pool.min-idle=20
    ## 连接超时时间(毫秒) 
    spring.redis.timeout=60000

    2.pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        
        <groupId>com.szw.learn</groupId>
        <artifactId>websocket_redis_mq_01</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>websocket_redis_mq_01</name>
        
        
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>1.5.16.RELEASE</version>
        </parent>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
            <java.version>1.8</java.version>
            <maven.test.skip>true</maven.test.skip>
            <skipTests>true</skipTests>
            <thymeleaf.version>3.0.7.RELEASE</thymeleaf.version>
            <thymeleaf-layout-dialect.version>2.1.2</thymeleaf-layout-dialect.version>
            <start-class>com.szw.learn.WsMqApplication</start-class>
        </properties>
    
        <dependencies>
            <!-- 使用web启动器 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <!-- 测试 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
    
            <!-- 模板引擎 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-thymeleaf</artifactId>
            </dependency>
            
            <!-- redis id与1.5之前的变了 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-redis</artifactId>
            </dependency>
            
            <!-- websocket -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-websocket</artifactId>
            </dependency>
        </dependencies>
    
        <repositories>
            <repository>
                <id>nexus-aliyun</id>
                <name>Nexus aliyun</name>
                <url>http://maven.aliyun.com/nexus/content/groups/public</url>
                <releases>
                    <enabled>true</enabled>
                </releases>
                <snapshots>
                    <enabled>false</enabled>
                </snapshots>
            </repository>
        </repositories>
        <pluginRepositories>
            <pluginRepository>
                <id>nexus-aliyun</id>
                <name>Nexus aliyun</name>
                <url>http://maven.aliyun.com/nexus/content/groups/public</url>
                <releases>
                    <enabled>true</enabled>
                </releases>
                <snapshots>
                    <enabled>false</enabled>
                </snapshots>
            </pluginRepository>
        </pluginRepositories>
    
        <build>
            <plugins>
                <!-- 要将源码放上去,需要加入这个插件 -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-source-plugin</artifactId>
                    <configuration>
                        <attach>true</attach>
                    </configuration>
                    <executions>
                        <execution>
                            <phase>compile</phase>
                            <goals>
                                <goal>jar</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
                <!-- 打包 -->
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <configuration>
                        <fork>true</fork>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    </project>

    3.SpringUtils.java

    package com.szw.learn.util;
    
    import org.springframework.beans.BeansException;
    import org.springframework.beans.factory.NoSuchBeanDefinitionException;
    import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
    import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
    import org.springframework.stereotype.Repository;
    
    @Repository
    public final class SpringUtils implements BeanFactoryPostProcessor {
    
        private static ConfigurableListableBeanFactory beanFactory; // Spring应用上下文环境
    
        @Override
        public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
            SpringUtils.beanFactory = beanFactory;
        }
        
        public static ConfigurableListableBeanFactory getBeanFactory() {
            return beanFactory;
        }
    
        /**
         * 获取对象
         *
         * @param name
         * @return Object 一个以所给名字注册的bean的实例
         * @throws org.springframework.beans.BeansException
         *
         */
        @SuppressWarnings("unchecked")
        public static <T> T getBean(String name) throws BeansException {
            return (T) getBeanFactory().getBean(name);
        }
    
        /**
         * 获取类型为requiredType的对象
         *
         * @param clz
         * @return
         * @throws org.springframework.beans.BeansException
         *
         */
        public static <T> T getBean(Class<T> clz) throws BeansException {
            T result = (T) getBeanFactory().getBean(clz);
            return result;
        }
    
        /**
         * 如果BeanFactory包含一个与所给名称匹配的bean定义,则返回true
         *
         * @param name
         * @return boolean
         */
        public static boolean containsBean(String name) {
            return getBeanFactory().containsBean(name);
        }
    
        /**
         * 判断以给定名字注册的bean定义是一个singleton还是一个prototype。 如果与给定名字相应的bean定义没有被找到,将会抛出一个异常(NoSuchBeanDefinitionException)
         *
         * @param name
         * @return boolean
         * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
         *
         */
        public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException {
            return getBeanFactory().isSingleton(name);
        }
    
        /**
         * @param name
         * @return Class 注册对象的类型
         * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
         *
         */
        public static Class<?> getType(String name) throws NoSuchBeanDefinitionException {
            return getBeanFactory().getType(name);
        }
    
        /**
         * 如果给定的bean名字在bean定义中有别名,则返回这些别名
         *
         * @param name
         * @return
         * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
         *
         */
        public static String[] getAliases(String name) throws NoSuchBeanDefinitionException {
            return getBeanFactory().getAliases(name);
        }
    
    }

    4.redis

    发布service:

    package com.szw.learn.redismq;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.core.StringRedisTemplate;
    import org.springframework.stereotype.Component;
    
    /**
     * @author 七脉 描述:发布service
     */
    @Component
    public class PublishService {
        @Autowired
        StringRedisTemplate redisTemplate;
    
        /**
         * @author 七脉 描述:发布方法
         * @param channel 消息发布订阅 主题
         * @param message 消息信息
         */
        public void publish(String channel, Object message) {
            // 该方法封装的 connection.publish(rawChannel, rawMessage);
            redisTemplate.convertAndSend(channel, message);
        }
    }

    订阅监听类:

    package com.szw.learn.redismq;
    
    import java.io.IOException;
    
    import javax.websocket.Session;
    
    import org.springframework.data.redis.connection.Message;
    import org.springframework.data.redis.connection.MessageListener;
    import org.springframework.data.redis.core.StringRedisTemplate;
    
    /**
     * @author 七脉 描述:订阅监听类
     */
    public class SubscribeListener implements MessageListener {
        
        private StringRedisTemplate stringRedisTemplate;
        
        private Session session;
        
        /**
         * 订阅接收发布者的消息
         */
        @Override
        public void onMessage(Message message, byte[] pattern) {
            String msg = new String(message.getBody());
            System.out.println(new String(pattern) + "主题发布:" + msg);
            if(null!=session){
                try {
                    session.getBasicRemote().sendText(msg);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        
        public StringRedisTemplate getStringRedisTemplate() {
            return stringRedisTemplate;
        }
    
        public void setStringRedisTemplate(StringRedisTemplate stringRedisTemplate) {
            this.stringRedisTemplate = stringRedisTemplate;
        }
    
        public Session getSession() {
            return session;
        }
    
        public void setSession(Session session) {
            this.session = session;
        }
        
    }

    注册redis监听容器:

    package com.szw.learn.redismq;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
    import org.springframework.data.redis.listener.RedisMessageListenerContainer;
    
    @Configuration
    public class RedisConfig {
        @Autowired
        private JedisConnectionFactory jedisConnectionFactory;
        
        /**
         * @author 七脉 描述:需要手动注册RedisMessageListenerContainer加入IOC容器
         * @return
         */
        @Bean
        public RedisMessageListenerContainer redisMessageListenerContainer() {
    
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            
            container.setConnectionFactory(jedisConnectionFactory);
    
            return container;
    
        }
    }

    5.websocket

    websocket注册:

    package com.szw.learn.websocket;
    
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.web.socket.server.standard.ServerEndpointExporter;
    
    @Configuration
    public class WebsocketConfig {
        /**
         * <br>描 述:    @Endpoint注解的websocket交给ServerEndpointExporter自动注册管理
         * @return
         */
        @Bean
        public ServerEndpointExporter serverEndpointExporter(){
            return new ServerEndpointExporter();
        }
    }

    websocket端点:

    package com.szw.learn.websocket;
    
    import java.io.IOException;
    import java.util.concurrent.CopyOnWriteArraySet;
    
    import javax.websocket.OnClose;
    import javax.websocket.OnError;
    import javax.websocket.OnMessage;
    import javax.websocket.OnOpen;
    import javax.websocket.Session;
    import javax.websocket.server.PathParam;
    import javax.websocket.server.ServerEndpoint;
    
    import org.springframework.data.redis.core.StringRedisTemplate;
    import org.springframework.data.redis.listener.ChannelTopic;
    import org.springframework.data.redis.listener.RedisMessageListenerContainer;
    import org.springframework.stereotype.Component;
    
    import com.szw.learn.redismq.PublishService;
    import com.szw.learn.redismq.SubscribeListener;
    import com.szw.learn.util.SpringUtils;
    /**
     *@ServerEndpoint(value="/websocket")value值必须以/开路
     *备注:@ServerEndpoint注解类不支持使用@Autowire
     *{topic}指:向哪个频道主题里发消息
     *{myname}指:这个消息是谁的。真实环境里可以使用当前登录用户信息
     */
    @Component
    @ServerEndpoint(value="/websocket/{topic}/{myname}")
    public class WebsocketEndpoint {
        
        /**
         * 因为@ServerEndpoint不支持注入,所以使用SpringUtils获取IOC实例
         */
        private StringRedisTemplate redisTampate = SpringUtils.getBean(StringRedisTemplate.class);
        
        private RedisMessageListenerContainer redisMessageListenerContainer = SpringUtils.getBean(RedisMessageListenerContainer.class);
        
        //存放该服务器该ws的所有连接。用处:比如向所有连接该ws的用户发送通知消息。
        private static CopyOnWriteArraySet<WebsocketEndpoint> sessions = new CopyOnWriteArraySet<>();
        
        private Session session;
        
        @OnOpen
        public void onOpen(Session session,@PathParam("topic")String topic){
            System.out.println("java websocket:打开连接");
            this.session = session;
            sessions.add(this);
            SubscribeListener subscribeListener = new SubscribeListener();
            subscribeListener.setSession(session);
            subscribeListener.setStringRedisTemplate(redisTampate);
            //设置订阅topic
            redisMessageListenerContainer.addMessageListener(subscribeListener, new ChannelTopic(topic));
        }
        
        @OnClose
        public void onClose(Session session){
            System.out.println("java websocket:关闭连接");
            sessions.remove(this);
        }
        
        @OnMessage
        public void onMessage(Session session,String message,@PathParam("topic")String topic,@PathParam("myname")String myname) throws IOException{
            message = myname+":"+message;
            System.out.println("java websocket 收到消息=="+message);
            PublishService publishService = SpringUtils.getBean(PublishService.class);
            publishService.publish(topic, message);
        }
        
        @OnError
        public void onError(Session session,Throwable error){
            System.out.println("java websocket 出现错误");
        }
    
        public Session getSession() {
            return session;
        }
    
        public void setSession(Session session) {
            this.session = session;
        }
    }

    测试controller

    package com.szw.learn.websocket;
    
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Controller;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.servlet.ModelAndView;
    
    @Controller
    @RequestMapping("websocket")
    public class WebsocketController {
        
        @Value("${server.port}")
        private String port;
        
        public static final String INDEX = "websocket/index";
        
        /**
         * @author 七脉
         * 描述:聊天页
         * @param topic 发布订阅的频道主题
         * @param myname 发布者的显示名称
         * @return
         */
        @RequestMapping("index/{topic}/{myname}")
        public ModelAndView index(@PathVariable("topic")String topic,@PathVariable("myname")String myname){
            ModelAndView mav = new ModelAndView(INDEX);
            mav.addObject("port", port);
            mav.addObject("topic",topic);
            mav.addObject("myname",myname);
            return mav;
        }
    }

    6.启动类

    package com.szw.learn;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class WsMqApplication {
        public static void main(String[] args) {
            System.setProperty("spring.devtools.restart.enabled", "false");
            SpringApplication.run(WsMqApplication.class, args);
        }
    }

    7.测试页面

    <!doctype html>
    <html xmlns:th="http://www.thymeleaf.org">
    <head>
    <meta charset="utf-8"></meta>
    <title>websocket集群</title>
    </head>
    <body>
        本服务端口号:[[${port}]],使用redismq实现websocket集群<br/>
        [[${topic}]] 频道 聊天中。。。<br/>
        <input id="input_id" type="text" /><button onclick="sendMessage()">发送</button>    <button onclick="closeWebsocket()">关闭</button>
        <div id="message_id"></div>
    </body>
    <script type="text/javascript">
        document.getElementById('input_id').focus();
        var websocket = null;
        //当前浏览前是否支持websocket
        if("WebSocket" in window){
            var url = "ws://127.0.0.1:[[${port}]]/websocket/[[${topic}]]/[[${myname}]]";
            websocket = new WebSocket(url);
        }else{
            alert("浏览器不支持websocket");
        }
        
        websocket.onopen = function(event){
            setMessage("打开连接");
        }
        
        websocket.onclose = function(event){
            setMessage("关闭连接");
        }
        
        websocket.onmessage = function(event){
            setMessage(event.data);
        }
        
        websocket.onerror = function(event){
            setMessage("连接异常");
        }
        
        //监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
        window.onbeforeunload = function(){
            closeWebsocket();
        }
        
        //关闭websocket
        function closeWebsocket(){
            //3代表已经关闭
            if(3!=websocket.readyState){
                websocket.close();
            }else{
                alert("websocket之前已经关闭");
            }
        }
        
        //将消息显示在网页上
        function setMessage(message){
            document.getElementById('message_id').innerHTML += message + '<br/>';
        }
         
        //发送消息
        function sendMessage(){
            //1代表正在连接
            if(1==websocket.readyState){
                var message = document.getElementById('input_id').value;
                //setMessage(message);
                websocket.send(message);
            }else{
                alert("websocket未连接");
            }
            document.getElementById('input_id').value="";
            document.getElementById('input_id').focus();
        }
    </script>
    </html>

    8.测试

      启动两个服务,端口号分别8081、8082(可以+)

      模拟两个端口的地址:

        http://localhost:8081/websocket/index/like/董志峰

        http://localhost:8082/websocket/index/like/史振伟

      如图

      

    源码下载:https://pan.baidu.com/s/1VMQJgXe5vX7uwsyRV57gIw

  • 相关阅读:
    android 75 新闻列表页面
    android 74 下载文本
    android 73 下载图片
    android 72 确定取消对话框,单选对话框,多选对话框
    android 71 ArrayAdapter和SimpleAdapter
    android 70 使用ListView把数据显示至屏幕
    maven如何将本地jar安装到本地仓库
    Centos6.7搭建ISCSI存储服务器
    解决maven打包编译出现File encoding has not been set问题
    MySQL 解决 emoji表情 的方法,使用utf8mb4 字符集(4字节 UTF-8 Unicode 编码)
  • 原文地址:https://www.cnblogs.com/zwcry/p/9723447.html
Copyright © 2011-2022 走看看