zoukankan      html  css  js  c++  java
  • 近两年项目回顾系列——基于MINA+Flex的即时通讯系统

    老规矩,先说需求:
      用户希望有一个类似于QQ的即时通讯工具集成在我们的B/S审查系统里,而且不用单独登录,当登入工作台时这个工具同时也自动登录,主要是一些审查系统中的文字沟通、截图发送,暂无文件发送需求。
    技术分析:
      关于开发IM系统,我们的答案可能有很多种,如:http长连接、websocket、ajax轮询、socket等等。由于我们的项目是用Flex+Java做的,所以用socket是非常方便和容易的,所以我们想基于flex和java的socket自己开发一个IM工具。经过考察,我们选择了apache的成熟开源NIO框架MINA作为IM系统的Server。使用json格式进行交互。拿到需求后,google了几把,和架构师聊过之后,感觉MINA是完全能满足我们的需求,而且比较简单。(Flex的red5也不错,但是相对较复杂)花了两三天学习MINA,出了一个demo,然后将其扩充功能成功移植到项目中。这篇博文,主要讲述demo中的一些思路,列出部分代码,备忘并供后来者学习:
      NIO、MINA是什么不用我多说了,搜一把很多的。我们用MINA开发网络应用主要注意的几个对象有:

    (由于需求比较简单,没有使用MINA高级的东西如自定义协议、报文等功能,)

    下面就来看看核心代码和效果(
    这是第一版,主要为实现功能,Flex界面开发的比较丑,之后会精细地调调,加加皮肤、美化按钮等等,各位看官将就着看吧。

     先看看Server的代码结构:

    主要逻辑在chat包下,conf是为Flex加载策略文件、dao是连接MongoDB保存消息记录、ChatUtil主要是一些常用工具方法的封装、Test是单元测试。

    lib主要是:fastjson-1.1.26.jar、mina-core-2.0.7.jar、mongo-java-driver-2.9.3.jar、slf4j-api-1.6.6.jar和junit包。

    Server核心代码:

    /**
     * 启动即时通讯Server,监听843端口
     * @param args
     * @throws Exception 
     */
    public static void main(String[] args) throws Exception {
        ...略...
        PolicyFileLoader.loadPolicyFile();
        //1、创建接受端
        NioSocketAcceptor acceptor=new NioSocketAcceptor();
        //2、拿到filter链,新增一个自定义filter
        acceptor.getFilterChain().addLast("chatFilter", new ChatFilter());
        //3、端口事件处理对象
        acceptor.setHandler(new ChatHandler());
        //4、开始监听端口
        acceptor.bind(new InetSocketAddress(843));//初始化MongoDB,保存消息用
        Dao.initDB();
        ...略...
    }

    可以看见创建一个MINA的Server需要四步:
    创建接收端—>新增一个自定义filter—>绑定事件处理Handler—>开始监听端口

    下面看看ChanFilter,它继承自org.apache.mina.core.filterchain.IoFilterAdapter,这个类看名字明显是一个缺省适配器的设计模式,我们重写两个方法就OK了。

    messageReceived方法主要指定了接收到消息的一些动作。
      如果是Flex请求安全策略文件,我们直接返回准备好的策略文件,否则我们将字符串使用阿里巴巴的fastJson转换成我们所需的Msg类型。
    filterWrite方法主要指定了发送消息时的一些操作。
      其实就是把我们的Msg对象转成Json字符串然后用UTF-8的byteArray写出去。(这一块写的有点复杂:) )

    package com.demo.mina.server.chat;
    import java.nio.charset.Charset;
    
    
    import org.apache.mina.core.buffer.IoBuffer;
    import org.apache.mina.core.filterchain.IoFilterAdapter;
    import org.apache.mina.core.session.IoSession;
    import org.apache.mina.core.write.DefaultWriteRequest;
    import org.apache.mina.core.write.WriteRequest;
    
    import com.alibaba.fastjson.JSON;
    import com.demo.mina.server.conf.PolicyFileLoader;
    
    /**
     * 聊天系统自定义Filter
     * @author sam
     *
     */
    public class ChatFilter extends IoFilterAdapter{
    
        @Override
        public void messageReceived(NextFilter nextFilter, IoSession session,Object message) throws Exception {
            IoBuffer buffer=(IoBuffer)message;
            String s=new String(buffer.array(),Charset.forName("utf-8")).trim();
            if(!s.equals(PolicyFileLoader.POLICY_FILE_REQUEST_HEADER)){
                //预处理接收的消息
                Msg msg = JSON.parseObject(s, Msg.class);
                msg.setInputContent(s);
                super.messageReceived(nextFilter, session, msg);
            }else {
                session.write(IoBuffer.wrap(PolicyFileLoader.getPolicyFileContent().getBytes()));
            }
        }
        
    
        @Override
        public void filterWrite(NextFilter nextFilter, IoSession session,WriteRequest writeRequest) throws Exception {
            if (writeRequest.getMessage() instanceof Msg) {
                //处理了消息输出
                super.filterWrite(nextFilter, session,new DefaultWriteRequest(IoBuffer.wrap(((Msg)writeRequest.getMessage()).toOutStr().getBytes(Charset.forName("utf-8")))));
            }else if (writeRequest.getMessage() instanceof IoBuffer) {
                super.filterWrite(nextFilter, session,writeRequest);
            }
        }
    }

     再看看前台主要逻辑:

    package com.dc.potal.model
    {
        import com.dc.business.tmaas.json.JSON;
        import com.dc.business.tmaas.tmzr.components.MyEvent;
        import com.dc.potal.vars.ChatConstant;
        
        import flash.events.Event;
        import flash.events.IOErrorEvent;
        import flash.events.ProgressEvent;
        import flash.events.SecurityErrorEvent;
        import flash.net.Socket;
        
        import mx.controls.Alert;
        import mx.events.CloseEvent;
        
        /**
         * 前台Socket封装类
         * @auth sam
         * */
        public class SocketConnector extends Socket{
            
            //constructor
            public function SocketConnector(){
            }
            
            /**
             * 自动登录,有异常则抛出事件,在上层监听解决。
             * */
            public function autoConnect(isListen:Boolean=true):void{
                if(isListen){
                    this.addEventListener(IOErrorEvent.IO_ERROR,socket_ioErrorHandler);
                    this.addEventListener(SecurityErrorEvent.SECURITY_ERROR,socket_securityErrorHandler);
                    this.addEventListener(Event.CONNECT,socket_connectHandler);
                    this.addEventListener(Event.CLOSE,socket_closeHandler);
                    this.addEventListener(ProgressEvent.SOCKET_DATA,socket_socketDataHandler);
                }
                this.connect(ChatConstant.currentHost,ChatConstant.PORT);
                trace("开始登录 "+ChatConstant.currentHost+":"+ChatConstant.PORT);
            }
            
            //ioErrorHandler
            protected function socket_ioErrorHandler(event:IOErrorEvent):void{
                dispatchEvent(new Event("FAILED"));
            }
            
            //securityErrorHandler
            protected function socket_securityErrorHandler(event:SecurityErrorEvent):void{
                dispatchEvent(new Event("SECURITY_FAILED"));
            }
            
            //closeHandler
            protected function socket_closeHandler(event:Event):void{
                dispatchEvent(new Event("CLOSED"));
            }
            
            //连接成功,向Server发送一个登录包,向Server注册Client业务信息
            protected function socket_connectHandler(event:Event):void{
                var msg:Msg = new Msg(Msg.LOGIN);
                msg.userId = ChatConstant.currentUserId;
                msg.userName = ChatConstant.currentUserName;
                msg.orgId = ChatConstant.currentUserOrgId;
                msg.orgName = ChatConstant.currentUserOrgName;
                this.writeUTFBytes(msg.totalJson());
                this.flush();
            }
            
            //接受到消息,转换成Json字符串
            protected function socket_socketDataHandler(event:ProgressEvent):void{
                var str:String = this.readUTFBytes(this.bytesAvailable);
                trace("Received:"+str);
                var msg:* = JSON.decode(str);
                dispatchEvent(new MyEvent("MESSAGE",msg));
            }
        }
    }

    然后在一个聊天窗口(TitleWindow)中初始化socket,使用这个socket与我们之前Java的Client进行通讯即可。

    看看最终效果吧:

      1、用户登录系统后,自动连接消息服务器

    2、加载在线人员列表(暂未开发离线消息发送),同时实时更新上下线信息。

    3、发送消息,可以添加表情、选择字号、加粗、斜体、下划线等效果

    4、发送完的效果 

     5、当有新消息到达时,系统右下角的“喇叭”开始闪动,并显示消息数目

     

     6、使用MongoDB存储消息记录(比较简单,暂时未加消息的关键字检索)

      

     写在最后:

       这个功能是快上线的时候临时开发的,有诸多不善待改进。如:

     1、缺少截屏发送功能(Flex的截屏非常方便,创建一个BitmapData对象,将绘制的矩形区域像素信息传递给它,再编码一下就OK。传输时可以考虑用Base64转换后传给Server

     2、缺少文件发送功能,这个要再深入研究一下MINA。(似乎要用到StreamIoHandler的processStreamIo等方法,可以考虑再重新起一个监听专门处理文件传输需求。

     3、UI优化、写写skin文件、看看美工有没有漂亮的按钮和配色方案。

     4、消息历史需要新增“关键字”检索功能。这个比较容易,MongoDB的Java接口已经写好了,就等前台Flex调用了,而且测试在百万级别记录中检索速度还是让人满意的。

  • 相关阅读:
    数据结构-树与二叉树-思维导图
    The last packet successfully received from the server was 2,272 milliseconds ago. The last packet sent successfully to the server was 2,258 milliseconds ago.
    idea连接mysql报错Server returns invalid timezone. Go to 'Advanced' tab and set 'serverTimezone' property
    redis学习笔记
    AJAX校验注册用户名是否存在
    AJAX学习笔记
    JSON学习笔记
    JQuery基础知识学习笔记
    Filter、Listener学习笔记
    三层架构学习笔记
  • 原文地址:https://www.cnblogs.com/radio/p/3136024.html
Copyright © 2011-2022 走看看