zoukankan      html  css  js  c++  java
  • Java与Flex通信

      提到通信就得面临两个问题,一是通信协议的选择,二是数据协议的定义。通信协议耳熟能详的就有好几种,TCPUDPHTTPFTP等等。数据协议是一种数据交换的格式,像jason,xml,amf3,google protocol都可以用作数据协议,你也可以自己根据通信的效率,安全等因素来定义自己的数据协议。

      通信系统的开发是一项很复杂的工作,不要以为往发服务端发一个Hello World!就认为完全掌握了通信系统的开发。概括来说要开发一个健壮的通信系统,必须从这几个方面来着手。

      一,通信粘包的处理

      这里包的概念是逻辑上的数据包,也就是我们发送的一个完整业务消息包,粘包情况有两种,一种是粘在一起的包都是完整的数据包,另一种情况是粘在一起的包有不完整的包。不是所有的粘包现象都需要处理,若传输的数据为不带结构的连续流数据(如文件传输),则不必把粘连的包分开(简称分包)。但在实际工程应用中,传输的数据一般为带结构的数据,这时就需要做分包处理。

      为了避免粘包现象,可采取以下几种措施。一是对于发送方引起的粘包现象,用户可通过编程设置来避免,TCP提供了强制数据立即传送的操作指令pushTCP软件收到该操作指令后,就立即将本段数据发送出去,而不必等待发送缓冲区满;二是对于接收方引起的粘包,则可通过优化程序设计、精简接收进程工作量、提高接收进程优先级等措施,使其及时接收数据,从而尽量避免出现粘包现象;三是由接收方控制,将一包数据按结构字段,人为控制分多次接收,然后合并,通过这种手段来避免粘包。

      以上提到的三种措施,都有其不足之处。总的来说降低了通信系统的吞吐量。我们可以自己设计一个分包算法来处理粘包的问题,该算法的实现是这样的:

    1. 当有数据到达时,将数据压入程序缓冲区。
    2. 循环处理缓冲区,如果缓冲区长度大于包头长度,则取出长度信息n,否则跳出循环,如果缓冲区的长度大于n,则从缓冲区取出一个完整包进行处理,否则跳出循环。

    如果你是Java的爱好都可以参考一下Mina和netty2的实现,像Mina和Netty2都提供了粘包处理类可供使用,像Mina的CumulativeProtocolDecoder类,Netty2的LengthFieldBasedFrameDecoder。

      二,数据协议选择

      现在已经有很多数据协议可供我们选择,像jason,xml,amf3,google protocol等等,这些协议相应的语言都有API来对自身数据做协议处理,我们选择协标准无非就是效率和大小,这里每个人可以根据实际的应用环境选择适合的数据协议。

      三,网络系统的安全性

      网络安全是一个永远的话题,对通信数据加密一般常RSA对byte流加密,FLOOD验证,IP黑名单验证都是必须考虑到的。

      以上是做网络开发必须了解的一些基础知识,在这里我们使用一个具体的实例来加深一下理解,Java与Flex使用AMF3数据协议通信。做过网络开发的一般都会知道套接字(SOCKET),很多语言都会通SOCKET来提供对网络操作的API,Java的提供的NIO SOCKET是一个高效的异步通信API,当然可以在这个基础上来开发我们的网络应用,但这种Native API需要我们花很多精力来处理网络通信的细节,消弱了我们对业务的关心。为我们开发带来很多不便性,幸好Java有很多现成的NIO SOCKET框架可供使用,MinaNetty2xSocket等等,这些框架处理了很多底层的通信问题,提供了一些易用的API以供使用。在这个实例中我们使用Netty2来做通信框架。

      定义消息包,消息包有定长包和不定长包,不定长包无非就是要在消息包中加入长度信息,以对收到的网络字节流进行分界。消息包的定义如下

      定义AMF3数据协议的编码和解码器 

    AMF3Encoder
     1 /*
     2  * @(#)AMF3Encoder.java    0.1 05/11/17
     3  *
     4  * Copyright 2010 QISI, Inc. All rights reserved.
     5  * QISI PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
     6  */
     7 package com.qidea.pushserver.codec;
     8 import java.io.ByteArrayOutputStream;
     9 import org.jboss.netty.buffer.ChannelBuffer;
    10 import org.jboss.netty.buffer.ChannelBuffers;
    11 import org.jboss.netty.channel.Channel;
    12 import org.jboss.netty.channel.ChannelHandlerContext;
    13 import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
    14 import com.qidea.pushserver.Constants;
    15 import com.qidea.pushserver.message.CommandMessage;
    16 import com.qidea.pushserver.message.PushMessage;
    17 import flex.messaging.io.SerializationContext;
    18 import flex.messaging.io.amf.Amf3Output;
    19 /**
    20  * 
    21  * @author sunwei
    22  * @version 2010-7-21
    23  * @since JDK1.5
    24  */
    25 public class AMF3Encoder extends OneToOneEncoder
    26 {
    27     /**
    28      * 
    29      */
    30     @Override
    31     protected Object encode(ChannelHandlerContext arg0, Channel arg1,
    32             Object arg2) throws Exception
    33     {
    34         ByteArrayOutputStream stream = new ByteArrayOutputStream();
    35         SerializationContext serializationContext = new SerializationContext();
    36         Amf3Output amf3Output = new Amf3Output(serializationContext);
    37         amf3Output.setOutputStream(stream);
    38         amf3Output.writeObject(arg2);
    39         byte[] objSe = stream.toByteArray();
    40         if (objSe != null && objSe.length > 0)
    41         {
    42             ChannelBuffer buffer = ChannelBuffers.buffer(objSe.length + 8);
    43             if (arg2 instanceof PushMessage)
    44                 buffer.writeInt(Constants.MAGIC_NUM_PUSH_MSG);
    45             else if (arg2 instanceof CommandMessage)
    46                 buffer.writeInt(Constants.MAGIC_NUM_COMMAND_MSG);
    47             buffer.writeInt(objSe.length);
    48             buffer.writeBytes(objSe);
    49             return buffer;
    50         }
    51         return null;
    52     }
    53 }
    AMF3Decoder
     1 /*
     2  * @(#)AMF3Decoder.java    0.1 05/11/17
     3  *
     4  * Copyright 2010 QISI, Inc. All rights reserved.
     5  * QISI PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
     6  */
     7 package com.qidea.pushserver.codec;
     8 import java.io.ByteArrayInputStream;
     9 import org.jboss.netty.buffer.ChannelBuffer;
    10 import org.jboss.netty.channel.Channel;
    11 import org.jboss.netty.channel.ChannelHandlerContext;
    12 import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
    13 import org.slf4j.Logger;
    14 import org.slf4j.LoggerFactory;
    15 import flex.messaging.io.SerializationContext;
    16 import flex.messaging.io.amf.Amf3Input;
    17 /**
    18  * amf3协议解码类
    19  * 
    20  * @author sunwei
    21  * @version 2010-7-21
    22  * @since JDK1.5
    23  */
    24 public class AMF3Decoder extends LengthFieldBasedFrameDecoder
    25 {
    26     public static final Logger logger = LoggerFactory
    27             .getLogger(AMF3Decoder.class);
    28     /**
    29      * 
    30      * @param maxFrameLength
    31      *            包的最大大小
    32      * @param lengthFieldOffset
    33      *            包头信息,长度的偏移位
    34      * @param lengthFieldLength
    35      *            包头信息,长度位数
    36      */
    37     public AMF3Decoder(int maxFrameLength, int lengthFieldOffset,
    38             int lengthFieldLength)
    39     {
    40         super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
    41     }
    42     /**
    43      * 
    44      * @param maxFrameLength
    45      */
    46     public AMF3Decoder(int maxFrameLength)
    47     {
    48         super(maxFrameLength, 4400);
    49     }
    50     /**
    51      * 
    52      */
    53     @Override
    54     protected Object decode(ChannelHandlerContext ctx, Channel channel,
    55             ChannelBuffer buffer) throws Exception
    56     {
    57         ChannelBuffer frame = (ChannelBuffer) super
    58                 .decode(ctx, channel, buffer);
    59         if (frame == null)
    60         {
    61             return null;
    62         }
    63         //
    64         int magicNum = frame.readInt();
    65         int dataLength = frame.readInt();
    66         logger.info("magic num={},data length={}", magicNum, dataLength);
    67         // 读AMF3字节流的内容
    68         byte[] content = new byte[frame.readableBytes()];
    69         frame.readBytes(content);
    70         SerializationContext serializationContext = new SerializationContext();
    71         Amf3Input amf3Input = new Amf3Input(serializationContext);
    72         amf3Input.setInputStream(new ByteArrayInputStream(content));
    73         Object message = amf3Input.readObject();
    74         return message;
    75     }
    76 }
    77 

      构建服务端

    PushProtocolHandler
     1 public class PushProtocolHandler extends SimpleChannelHandler
     2 {
     3     public static Logger log = LoggerFactory
     4             .getLogger(PushProtocolHandler.class);
     5     /**
     6      * 
     7      */
     8     @Override
     9     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
    10     {
    11         if (e.getMessage() != null)
    12         {
    13             ChannelManager channelManager = PushServerContext
    14                     .getBean(ChannelManager.class);
    15             if (e.getMessage() instanceof CommandMessage)
    16             {
    17                 channelManager.handleMsg((CommandMessage) e.getMessage(), e
    18                         .getChannel());
    19             }
    20             else if (e.getMessage() instanceof PushMessage)
    21             {
    22                 channelManager.handleMsg((PushMessage) e.getMessage(), e
    23                         .getChannel());
    24             }
    25             else
    26             {
    27                 log.warn("unkown message {}", e);
    28             }
    29         }
    30     }
    31 }
    PushServerPipelineFactory
     1 import static org.jboss.netty.channel.Channels.*;
     2 /**
     3  * 
     4  * @author sunwei
     5  * @version 2010-7-22
     6  * @since JDK1.5
     7  */
     8 public class PushServerPipelineFactory implements ChannelPipelineFactory
     9 {
    10     @Override
    11     public ChannelPipeline getPipeline() throws Exception
    12     {
    13         ChannelPipeline pipeline = pipeline();
    14         // 处理日志
    15         pipeline.addLast("logger"new LoggingHandler());
    16         // 处理coder
    17         pipeline.addLast("decoder"new AMF3Decoder(Constants.MAX_OBJECT_SIZE));
    18         pipeline.addLast("encoder"new AMF3Encoder());
    19         //
    20         pipeline.addLast("handler"new PushProtocolHandler());
    21         //
    22         return pipeline;
    23     }
    24 }
    ServerMain
     1 public static main(String[] args)
     2 {
     3         // 开始NIO线程
     4          ChannelFactory factory = new NioServerSocketChannelFactory(Executors
     5                 .newCachedThreadPool(), Executors.newCachedThreadPool());
     6         // 服务启始点
     7     ServerBootstrap bootstrap = new ServerBootstrap(factory);
     8     // 处理过滤器
     9     bootstrap.setPipelineFactory(new PushServerPipelineFactory());
    10     // 设置相关参数
    11     bootstrap.setOption("child.tcpNoDelay"true);
    12     // 设置相关参数
    13     bootstrap.setOption("child.keepAlive"true);
    14     // 绑定相关端口
    15     bootstrap.bind(new InetSocketAddress(getPushPort()));
    16 }

      Flex客户端

    FlexSocket
     1 public class FlexSocket
     2 {
     3  
     4 //发送包
     5         public function send(type:int, obj:PushMessage):Boolean
     6         {
     7             if (_socket == null)
     8             {
     9                 return false;
    10             }
    11             //手动限制不给发送的时候用
    12             if (socketState == socket_state_closed || socketState == socket_state_connecting)
    13             {
    14                 return false;
    15             }
    16             if (!_socket.connected)
    17             {
    18                 return false;
    19             }
    20             var byteArr:ByteArray=objToByteaArray(obj);
    21             var msgHead:MsgHead=new MsgHead(type, byteArr.length);
    22             sendMsg(msgHead.getType(), msgHead.getSize(), byteArr);
    23             return true;
    24         }
    25 
    26 //接受包
    27                 private function getDataHandler(e:ProgressEvent):void
    28         {
    29             _timeServerDead.stop();
    30             _timeServerDead.reset();
    31             if (_socket.bytesAvailable >= 8 && !_isReadHead)
    32             {
    33                 _recvPackageType=_socket.readInt();
    34                 //同意关闭
    35 //                if(_recvPackageType == 5)
    36 //                {
    37 //                    close();
    38 //                }
    39                 _recvPackageSize=_socket.readInt();
    40                 _isReadHead=true;
    41             }
    42             if (_isReadHead && _socket.bytesAvailable >= _recvPackageSize)
    43             {
    44                 var byte:ByteArray=new ByteArray();
    45                 _socket.readBytes(byte0, _recvPackageSize);
    46                 _msgObj=byteArraytoObject(byte);
    47                 //暂时用上面一种 
    48                 if (_recvPackageType == packageType.LOGIN_TYPE)
    49                 {
    50                     if (_msgObj.ret == bodyType.RECEIVE_OK)
    51                     {
    52                         _timerDetectSocket.start();
    53                         socketState=socket_state_connected;
    54                         myEventDispatch.Instence().dispatcher(bodyType.INLINE_CURRENTSOCKETSTATE);
    55                     }
    56                     else if (_msgObj.ret == bodyType.RECEIVE_ERROR)
    57                     {
    58                         close();
    59                     }
    60 
    61                 }
    62                 else if (_recvPackageType == packageType.CHAT_TYPE)
    63                 {
    64                     myEventDispatch.Instence().dispatcher(selectEventName(_recvPackageType), _msgObj);
    65                 }
    66                 _recvPackageSize=0;
    67                 _recvPackageType=0;
    68                 _msgObj=null;
    69                 _isReadHead=false;
    70 
    71 
    72             }
    73         }
    74 
    75 }

      有关Mina的实现,你可以通过本博客向我索取相关代码。 

              

  • 相关阅读:
    UI:UITableView表视图
    UI:页面传值、单例模式传值、属性传值、NSUserDefaults 数据持久化
    UI:UINavigationController、界面通信
    UI:UIScrollView、UIPageControl
    UI:tomcat(说话小程序)、相框动画、UISgmentcontrol、UISwitch
    UI:触摸事件 与 事件的回应
    UI:转自互联网资料
    UI:MVC设计模式
    OC:copy 与 retain 的区别
    UI:数据持久化
  • 原文地址:https://www.cnblogs.com/51cto/p/1819361.html
Copyright © 2011-2022 走看看