zoukankan      html  css  js  c++  java
  • 根据redis的pub/sub机制,写一个即时在线聊天应用

    在Redis中,有个Pub/Sub,他的主要的工作流程如:  

    redis订阅一个模式频道如:chat_*,然后由小a想找人聊天了,就发送一个消息“现在有人聊天吗?chat_a”,末尾的chat_a为标识,表示你要在chat_* 这个圈子里面说。这个时候,chat_*这个圈子的管理员,就会对所有加入这个圈子的人发送一条消息。消息内容就是小a说的话。说白了,就是有个大喇叭,你说话声音不够大,但是你想让所有人都听到你的消息,那么你就要先对喇叭说话,然后喇叭把你的话扩散。。。。

    还是根据代码说,直接描述比抽象函数还要抽象。

    首先我们先在配置文件里面配置下订阅的频道对应的监听:

     1   <!--chat-->
     2     <bean id="msgListener" class="com.anhoo.util.MyMsgListener"/>
     3 
     4     <bean id="listenerContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer">
     5         <property name="connectionFactory" ref="jedisConnFactory"/>
     6         <property name="messageListeners">
     7             <map>
     8                 <entry key-ref="msgListener" value-ref="patternTopic"/>
     9             </map>
    10         </property>
    11     </bean>
    12 
    13     <bean id="patternTopic" class="org.springframework.data.redis.listener.PatternTopic">
    14         <constructor-arg value="chat_*"/>
    15     </bean>

    2行是根据监听消息的接口写的监听类,当监听到有消息的时候,就会调用onMessage类

    public class MyMsgListener implements MessageListener {
        @Autowired
        SubService subService;
    @Override
    public void onMessage(Message message, byte[] bytes) { subService.isCall(message); // System.out.println(SerializeUtil.unserialize(bytes).toString()); System.out.println("当前的Message值为:" + message.toString()); } }

    4-11行 是pub/sub监听配置,redis的配置文件是  jedisConnFactory,监听的频道模式为 patternTopic,监听到的消息处理类为 msgListener

    13-15行 是监听频道的设置

    消息的entity为:

    public class MessageEntity implements Serializable {
    
        private String user;
        private String content;
    
        //省略get set
    }

    Controller层代码,主要有三个方法,sendMessage 是在页面中发送发送消息,(前台有一个ajax方法一直在请求)callMsg 当监听到新的消息的时候,返回监听到的消息,addChatUser 当有新的用户加入的时候,做记录,1是方便以后根据用户返回数据,2是防止重复的用户名。

    @Controller
    @RequestMapping("/back")
    public class ChatController {
    
        @Autowired
        PubService pubService;
    
        @Autowired
        SubService subService;
    
        @Autowired
        ChatService chatService;
    
        @ResponseBody
        @RequestMapping("/send")
        public ResultMsg sendMessage(MessageEntity messageEntity) {
            ResultMsg resultMsg = new ResultMsg();
            if (messageEntity != null && !messageEntity.getUser().equals("") && !messageEntity.getContent().equals("")) {
                pubService.sendMessage(messageEntity);
                resultMsg.setMsg("发送成功!");
                resultMsg.setCode(ResultCode.SUCCESS);
                return resultMsg;
            }
            resultMsg.setMsg("输入信息有误!");
            resultMsg.setCode(ResultCode.FAIL);
            return resultMsg;
        }
    
        @ResponseBody
        @RequestMapping("/callBack")
        public ResultMsg callMsg(String user) throws InterruptedException {
            ResultMsg resultMsg = new ResultMsg();
            Logger logger = LogManager.getLogger(ChatController.class);
            MessageEntity message;
            message = subService.callBack(user);
            if (message != null) {
                resultMsg.setCode(ResultCode.SUCCESS);
                resultMsg.setContent(message);
                return resultMsg;
            } else {
                resultMsg.setCode(ResultCode.FAIL);
                return resultMsg;
            }
        }
    
        @ResponseBody
        @RequestMapping("/join")
        public ResultMsg addChatUser(String user) {
            ResultMsg resultMsg = new ResultMsg();
            if (chatService.addUser(user) > 0) {
                resultMsg.setCode(ResultCode.SUCCESS);
            } else {
                resultMsg.setCode(ResultCode.FAIL);
                resultMsg.setMsg("昵称已经存在,请重新输入!");
            }
            return resultMsg;
        }
    }

    Service 有三个 接口类为:

    ChatService
    int addUser(String user); 
    
    PubService
    void sendMessage(MessageEntity messageEntity);
    
    SubService
    void isCall(Message message);
    MessageEntity callBack(String user) throws InterruptedException;

    ChatService接口的具体方法代码

    @Service
    public class ChatServiceImpl implements ChatService {
    
        @Autowired
        StringRedisTemplate stringRedisTemplate;
    
        @Override
        public int addUser(String user) {
            //判断userList 已经其中的用户是否已经存在
            if (stringRedisTemplate.hasKey("userList") && stringRedisTemplate.opsForZSet().score("userList", user) != null) {
                return 0;
            } else {
                //增加新的用户,但是要判断下,是否是第一次刚启动的时候
                int currentInidex;
                if (stringRedisTemplate.hasKey("msgList")) {
                    currentInidex = (int) (-1 - stringRedisTemplate.opsForList().size("msgList"));
                } else {
                    currentInidex = -1;
                }
                stringRedisTemplate.opsForZSet().add("userList", user, currentInidex);
                return 1;
            }
        }
    }

    PubService接口的具体方法代码:

    @Service
    public class PubServiceImpl implements PubService {
    
        @Autowired
        StringRedisTemplate stringRedisTemplate;
    
        @Override
        public void sendMessage(MessageEntity messageEntity) {
            //消息的频道为chat_*
            String channel = "chat_";
            String content = messageEntity.getContent();
            //使得发送消息的 频道为chat_用户名  例如chat_jack 为了后面能根据这个得到 jack用户
            stringRedisTemplate.convertAndSend(channel + messageEntity.getUser(), content);
        }
    }

    SubService接口的具体方法代码:

    @Service
    public class SubServiceImpl implements SubService {
    
        @Autowired
        StringRedisTemplate stringRedisTemplate;
    
        @Autowired
        JedisConnectionFactory jedisConnFactory;
    
        @Override
        public void isCall(Message message) {
            MessageEntity messageEntity = new MessageEntity();
            //请参考配置文件,本例中key,value的序列化方式均为string。
            //其中key必须为stringSerializer。和redisTemplate.convertAndSend对应
            messageEntity.setUser(stringRedisTemplate.getStringSerializer().deserialize(message.getChannel()).split("_")[1]);
            messageEntity.setContent(stringRedisTemplate.getValueSerializer().deserialize(message.getBody()).toString());
            stringRedisTemplate.opsForList().leftPush("msgList", JSON.toJSONString(messageEntity));
    
    //        Jedis jedis = (Jedis) jedisConnFactory.getConnection().getNativeConnection();
    //        stringRedisTemplate.opsForValue().set("broadcast",jedis.pubsubNumPat().toString() );
    //        System.out.println(jedis.pubsubNumPat().toString());
    //        jedis.close();
    
        }
    
        @Override
        public MessageEntity callBack(String user) throws InterruptedException {
    
            //模拟1s 查看一次 不至于一直在连接redis 低于1s的频率连接redis会报错
            Thread.sleep(1000);
    //            String msgTxt = stringRedisTemplate.opsForList().rightPop("msgList");
            //获取当前user 对应的消息 坐标值
            Double index = stringRedisTemplate.opsForZSet().score("userList", user);
    
            long l = new Double(index).longValue();
            if (stringRedisTemplate.hasKey("msgList")) {
                String msgTxt = stringRedisTemplate.opsForList().index("msgList", l);
    
                //只有当msgList 有新的消息的时候,才会获取消息
                if (msgTxt != null && msgTxt != "") {
    //                list.remove(user);
                    MessageEntity messageEntity = JSON.parseObject(msgTxt, MessageEntity.class);
    
                    //消息坐标加-1
                    stringRedisTemplate.opsForZSet().incrementScore("userList", user, -1);
                    return messageEntity;
                }
            }
            return null;
    
        }
    }

    service层用到的redis主要是 list和zset,当有用户发送消息的时候,就把消息放到list中,获取第一条可以是: opsForList().index("msgList", -1) ,第二条为:opsForList().index("msgList", -2),第三台为opsForList().index("msgList", -3)……以此类推,又因为我们前端有个ajax一直发送请求,按道理是只要我们list中有消息,我们就把他拿出来,在页面展示。但是这里又不能实时的判断当前是不是所有的用户都获取过一次,而且仅仅只能为一次。这个时候就根据list的长度以及zset的score来判断了。过程为:当有用户加入的时候, 如果是第一个用户,那么就把他的zset的score设为-1,此时list中的消息为空,只有当我们发送一条消息的时候,onMessage做出响应,再把发送的消息存到list中,这个时候,一直发送请求的ajax发现,此时消息的长度为了1(可以通过 opsForList().index("msgList", -1)得到),而且当前用户的score标志为-1,正好他们一致。那么就把这个消息取出来,在前台页面展示,然后把score自自增-1,等待list里面再次有消息放入(长度为2,可以通过opsForList().index("msgList", -2)获取)的时候才满足取出消息的条件。。以此循环;如果不是第一个加入的,就把现在消息的长度放到score中,只有当接受下一条数据的时候才展示。

    前台代码。js部分:

    $(function () {
        $("#user").focus();
    });
    
    function loading(user) {
        eoooxy.ajax("post", "/back/callBack", {"user":user}, function (r) {
            if (!eoooxy.isEmpty(r) && r.code == '100') {
                var o = r.content;
                var h = "<div style='margin: 10px 20px 10px 20px;'><label>" + o.user + "</label><br><label>" + o.content + "</label></div>";
                $("#chatSpace").append(h);
                $("#chatSpace")[0].scrollTop = $("#chatSpace")[0].scrollHeight;
                //$("#content").focus();
                loading(user);
            } else {
                console.log("当前没有消息,继续请求……");
                loading(user);
            }
        }, "json"/*, function (XMLHttpRequest, status) {
            if (status == 'timeout') {//超时,status还有success,error等值的情况
                loading();
            }
        }, 3000*/)
    }
    
    function chatting() {
        if (eoooxy.isEmpty($("#user").val())) {
            alert("必须先输入昵称,然后点击开始聊天!");
            return false;
        }
        var data = {"user": $("#user").val()};
        eoooxy.ajax("post", "/back/join", data, function (r) {
            if (!eoooxy.isEmpty(r) && r.code == '100') {
                $("#user").attr("disabled", "disabled");
                loading($("#user").val());
            } else {
                alert(r.msg);
            }
        }, "json")
    }
    
    function sendInfo() {
        var message = $("#content");
        var data = {"user": $("#user").val(), "content": $("#content").val()};
        // var data = {"user": "jack", "content": $("#content").val()};
        eoooxy.ajax("post", "/back/send", data, function (r) {
            if (!eoooxy.isEmpty(r) && r.code == '100') {
                message.val('');
                $("#chatSpace")[0].scrollTop = $("#chatSpace")[0].scrollHeight;
                message.focus();
            } else {
                alert(r.msg);
            }
    
        })
    }

    jsp部分:

    <%@ page language="java" contentType="text/html; charset=UTF-8"
             pageEncoding="UTF-8" %>
    <html>
    <head>
        <%@include file="/WEB-INF/view/common/meta.jsp" %>
        <title>聊天室</title>
        ${f:addJs("resources/js/back/chat.js")}
    </head>
    <body>
    <div style=" auto;" align="center">
        <h1>多人聊天室</h1>
        <span>
            <input id="user" style="resize: none;outline: none;" placeholder="昵称,必须输入">
            <button onclick="chatting()">开始聊天</button>
        </span>
        <div id="chatSpace"
             style=" 600px;height: 500px; border: solid 1px #CCCCCC; overflow-y: auto;text-align: left">
    
        </div>
        <div style=" 600px; height: 100px;border: solid 1px #CCCCCC; margin-top: 20px;">
            <textarea id="content"
                      style=" 590px;height: 60px; resize: none;border: 0px;outline: none;margin: 5px;"></textarea>
            <button style="float: right;margin-right: 10px;" id="btn_send" onclick="sendInfo()">确定</button>
        </div>
    </div>
    </body>
    </html>

    以上就是根据pub/sub 以及ajax长连接写的一个在线实时聊天系统(实际上延迟1s),如果有错,请指出,谢谢!

    因为这边采用的是ajax长连接(就是一直问:有没有消息啊,有没有消息啊,有的话拿走然后继续问。。),所以会占用资源。如果我们能更好的优化他,我们可以使用H5的新的特性WebSocket来构建实时的聊天系统,具体这边我就不介绍了,因为我也还没搞透彻,没有调查没有发言权。。。

    示例代码git连接:https://github.com/eoooxy/anhoo

  • 相关阅读:
    ViewPager自动轮播、小圆点切换
    android自定义view属性
    单点触控
    自定义view、自定义圆形imageview
    为textview中的字符串着色
    xstream解析、httputils请求
    WebView加载页面
    HorizontalScrollView滑动 ViewPager切换
    Spring Boot 热部署(IDEA与Gradle)
    Linux下安装JDK
  • 原文地址:https://www.cnblogs.com/eoooxy/p/6924214.html
Copyright © 2011-2022 走看看