zoukankan      html  css  js  c++  java
  • 近两年项目回顾系列——基于MINA+Flex的即时通讯系统

    老规矩,先说需求:
      用户希望有一个类似于QQ的即时通讯工具集成在我们的B/S审查系统里,而且不用单独登录,当登入工作台时这个工具同时也自动登录,主要是一些审查系统中的文字沟通、截图发送,暂无文件发送需求。
    技术分析:
      关于开发IM系统,我们的答案可能有很多种,如:http长连接、websocket、ajax轮询、socket等等。由于我们的项目是用Flex+Java做的,所以用socket是非常方便和容易的,所以我们想基于flex和java的socket自己开发一个IM工具。经过考察,我们选择了apache的成熟开源NIO框架MINA作为IM系统的Server。使用json格式进行交互。拿到需求后,google了几把,和架构师聊过之后,感觉MINA是完全能满足我们的需求,而且比较简单。(Flex的red5也不错,但是相对较复杂)花了两三天学习MINA,出了一个demo,然后将其扩充功能成功移植到项目中。这篇博文,主要讲述demo中的一些思路,列出部分代码,备忘并供后来者学习:
      NIO、MINA是什么不用我多说了,搜一把很多的。我们用MINA开发网络应用主要注意的几个对象有:

    (由于需求比较简单,没有使用MINA高级的东西如自定义协议、报文等功能,)

    下面就来看看核心代码和效果(
    这是第一版,主要为实现功能,Flex界面开发的比较丑,之后会精细地调调,加加皮肤、美化按钮等等,各位看官将就着看吧。

     先看看Server的代码结构:

    主要逻辑在chat包下,conf是为Flex加载策略文件、dao是连接MongoDB保存消息记录、ChatUtil主要是一些常用工具方法的封装、Test是单元测试。

    lib主要是:fastjson-1.1.26.jar、mina-core-2.0.7.jar、mongo-java-driver-2.9.3.jar、slf4j-api-1.6.6.jar和junit包。

    Server核心代码:

    /**
     * 启动即时通讯Server,监听843端口
     * @param args
     * @throws Exception 
     */
    public static void main(String[] args) throws Exception {
        ...略...
        PolicyFileLoader.loadPolicyFile();
        //1、创建接受端
        NioSocketAcceptor acceptor=new NioSocketAcceptor();
        //2、拿到filter链,新增一个自定义filter
        acceptor.getFilterChain().addLast("chatFilter", new ChatFilter());
        //3、端口事件处理对象
        acceptor.setHandler(new ChatHandler());
        //4、开始监听端口
        acceptor.bind(new InetSocketAddress(843));//初始化MongoDB,保存消息用
        Dao.initDB();
        ...略...
    }

    可以看见创建一个MINA的Server需要四步:
    创建接收端—>新增一个自定义filter—>绑定事件处理Handler—>开始监听端口

    下面看看ChanFilter,它继承自org.apache.mina.core.filterchain.IoFilterAdapter,这个类看名字明显是一个缺省适配器的设计模式,我们重写两个方法就OK了。

    messageReceived方法主要指定了接收到消息的一些动作。
      如果是Flex请求安全策略文件,我们直接返回准备好的策略文件,否则我们将字符串使用阿里巴巴的fastJson转换成我们所需的Msg类型。
    filterWrite方法主要指定了发送消息时的一些操作。
      其实就是把我们的Msg对象转成Json字符串然后用UTF-8的byteArray写出去。(这一块写的有点复杂:) )

    package com.demo.mina.server.chat;
    import java.nio.charset.Charset;
    
    
    import org.apache.mina.core.buffer.IoBuffer;
    import org.apache.mina.core.filterchain.IoFilterAdapter;
    import org.apache.mina.core.session.IoSession;
    import org.apache.mina.core.write.DefaultWriteRequest;
    import org.apache.mina.core.write.WriteRequest;
    
    import com.alibaba.fastjson.JSON;
    import com.demo.mina.server.conf.PolicyFileLoader;
    
    /**
     * 聊天系统自定义Filter
     * @author sam
     *
     */
    public class ChatFilter extends IoFilterAdapter{
    
        @Override
        public void messageReceived(NextFilter nextFilter, IoSession session,Object message) throws Exception {
            IoBuffer buffer=(IoBuffer)message;
            String s=new String(buffer.array(),Charset.forName("utf-8")).trim();
            if(!s.equals(PolicyFileLoader.POLICY_FILE_REQUEST_HEADER)){
                //预处理接收的消息
                Msg msg = JSON.parseObject(s, Msg.class);
                msg.setInputContent(s);
                super.messageReceived(nextFilter, session, msg);
            }else {
                session.write(IoBuffer.wrap(PolicyFileLoader.getPolicyFileContent().getBytes()));
            }
        }
        
    
        @Override
        public void filterWrite(NextFilter nextFilter, IoSession session,WriteRequest writeRequest) throws Exception {
            if (writeRequest.getMessage() instanceof Msg) {
                //处理了消息输出
                super.filterWrite(nextFilter, session,new DefaultWriteRequest(IoBuffer.wrap(((Msg)writeRequest.getMessage()).toOutStr().getBytes(Charset.forName("utf-8")))));
            }else if (writeRequest.getMessage() instanceof IoBuffer) {
                super.filterWrite(nextFilter, session,writeRequest);
            }
        }
    }

     再看看前台主要逻辑:

    package com.dc.potal.model
    {
        import com.dc.business.tmaas.json.JSON;
        import com.dc.business.tmaas.tmzr.components.MyEvent;
        import com.dc.potal.vars.ChatConstant;
        
        import flash.events.Event;
        import flash.events.IOErrorEvent;
        import flash.events.ProgressEvent;
        import flash.events.SecurityErrorEvent;
        import flash.net.Socket;
        
        import mx.controls.Alert;
        import mx.events.CloseEvent;
        
        /**
         * 前台Socket封装类
         * @auth sam
         * */
        public class SocketConnector extends Socket{
            
            //constructor
            public function SocketConnector(){
            }
            
            /**
             * 自动登录,有异常则抛出事件,在上层监听解决。
             * */
            public function autoConnect(isListen:Boolean=true):void{
                if(isListen){
                    this.addEventListener(IOErrorEvent.IO_ERROR,socket_ioErrorHandler);
                    this.addEventListener(SecurityErrorEvent.SECURITY_ERROR,socket_securityErrorHandler);
                    this.addEventListener(Event.CONNECT,socket_connectHandler);
                    this.addEventListener(Event.CLOSE,socket_closeHandler);
                    this.addEventListener(ProgressEvent.SOCKET_DATA,socket_socketDataHandler);
                }
                this.connect(ChatConstant.currentHost,ChatConstant.PORT);
                trace("开始登录 "+ChatConstant.currentHost+":"+ChatConstant.PORT);
            }
            
            //ioErrorHandler
            protected function socket_ioErrorHandler(event:IOErrorEvent):void{
                dispatchEvent(new Event("FAILED"));
            }
            
            //securityErrorHandler
            protected function socket_securityErrorHandler(event:SecurityErrorEvent):void{
                dispatchEvent(new Event("SECURITY_FAILED"));
            }
            
            //closeHandler
            protected function socket_closeHandler(event:Event):void{
                dispatchEvent(new Event("CLOSED"));
            }
            
            //连接成功,向Server发送一个登录包,向Server注册Client业务信息
            protected function socket_connectHandler(event:Event):void{
                var msg:Msg = new Msg(Msg.LOGIN);
                msg.userId = ChatConstant.currentUserId;
                msg.userName = ChatConstant.currentUserName;
                msg.orgId = ChatConstant.currentUserOrgId;
                msg.orgName = ChatConstant.currentUserOrgName;
                this.writeUTFBytes(msg.totalJson());
                this.flush();
            }
            
            //接受到消息,转换成Json字符串
            protected function socket_socketDataHandler(event:ProgressEvent):void{
                var str:String = this.readUTFBytes(this.bytesAvailable);
                trace("Received:"+str);
                var msg:* = JSON.decode(str);
                dispatchEvent(new MyEvent("MESSAGE",msg));
            }
        }
    }

    然后在一个聊天窗口(TitleWindow)中初始化socket,使用这个socket与我们之前Java的Client进行通讯即可。

    看看最终效果吧:

      1、用户登录系统后,自动连接消息服务器

    2、加载在线人员列表(暂未开发离线消息发送),同时实时更新上下线信息。

    3、发送消息,可以添加表情、选择字号、加粗、斜体、下划线等效果

    4、发送完的效果 

     5、当有新消息到达时,系统右下角的“喇叭”开始闪动,并显示消息数目

     

     6、使用MongoDB存储消息记录(比较简单,暂时未加消息的关键字检索)

      

     写在最后:

       这个功能是快上线的时候临时开发的,有诸多不善待改进。如:

     1、缺少截屏发送功能(Flex的截屏非常方便,创建一个BitmapData对象,将绘制的矩形区域像素信息传递给它,再编码一下就OK。传输时可以考虑用Base64转换后传给Server

     2、缺少文件发送功能,这个要再深入研究一下MINA。(似乎要用到StreamIoHandler的processStreamIo等方法,可以考虑再重新起一个监听专门处理文件传输需求。

     3、UI优化、写写skin文件、看看美工有没有漂亮的按钮和配色方案。

     4、消息历史需要新增“关键字”检索功能。这个比较容易,MongoDB的Java接口已经写好了,就等前台Flex调用了,而且测试在百万级别记录中检索速度还是让人满意的。

  • 相关阅读:
    安装和使用Glassfish
    3GP 的 DASH设计原则与标准
    hdu 2686 Matrix 最小费用最大流 或 多线程DP
    [Win32]客户端程序
    二叉查找树中节点的删除。
    Android:获取屏幕完整宽高,包含状态栏
    Django学习总结之五模型
    畸形的从业观
    调试版本和发行版本
    jsp中常见的错误处理(未完待续)
  • 原文地址:https://www.cnblogs.com/radio/p/3136024.html
Copyright © 2011-2022 走看看