zoukankan      html  css  js  c++  java
  • unity探索者之socket传输protobuf字节流(三)

    版权声明:本文为原创文章,转载请声明http://www.cnblogs.com/unityExplorer/p/6986474.html 

    上一篇讲到了数据的处理,这一篇主要讲使用多线程收发消息

      1 //创建消息数据模型
      2 //正式项目中,消息的结构一般是消息长度+消息id+消息主体内容
      3 public class Message
      4 {
      5     public IExtensible protobuf;
      6     public int messageId;
      7 }
      8 
      9 public class SocketClientTemp : MonoBehaviour
     10 {
     11     const int packageMaxLength = 1024;
     12 
     13     Socket mSocket;
     14     Thread threadSend;
     15     Thread threadRecive;
     16     Queue<Message> allMessages = new Queue<Message>();
     17     Queue<byte[]> sendQueue = new Queue<byte[]>();
     18 
     19     public bool Init()
     20     {
     21         //创建一个socket对象
     22         mSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
     23         return SocketConnection("此处是ip", 1111);
     24     }
     25 
     26     void Update()
     27     {
     28         AnalysisMessage();
     29     }
     30 
     31     /// <summary>
     32     /// 建立服务器连接
     33     /// </summary>
     34     /// <param name="ip">服务器的ip地址</param>
     35     /// <param name="port">端口</param>
     36     bool SocketConnection(string ip, int port)
     37     {
     38         try
     39         {
     40             IPEndPoint ipep = new IPEndPoint(IPAddress.Parse(ip), port);
     41             //同步连接服务器,实际使用时推荐使用异步连接,处理方式会在下一篇讲断线重连时讲到
     42             mSocket.Connect(ipep);
     43             //连接成功后,创建两个线程,分别用于发送和接收消息
     44             threadSend = new Thread(new ThreadStart(SendMessage));
     45             threadSend.Start();
     46             threadRecive = new Thread(new ThreadStart(ReceiveMessage));
     47             threadRecive.Start();
     48             return true;
     49         }
     50         catch (Exception e)
     51         {
     52             Debug.Log(e.ToString());
     53             Close();
     54             return false;
     55         }
     56     }
     57 
     58     #region ...发送消息
     59     /// <summary>
     60     /// 添加数据到发送队列
     61     /// </summary>
     62     /// <param name="protobufModel"></param>
     63     /// <param name="messageId"></param>
     64     public void AddSendMessageQueue(IExtensible protobufModel, int messageId)
     65     {
     66         sendQueue.Enqueue(BuildPackage(protobufModel, messageId));
     67     }
     68 
     69     void SendMessage()
     70     {
     71         //循环获取发送队列中第一个数据,然后发送到服务器
     72         while (true)
     73         {
     74             if (sendQueue.Count == 0)
     75             {
     76                 Thread.Sleep(100);
     77                 continue;
     78             }
     79             if (!mSocket.Connected)
     80             {
     81                 Close();
     82                 break;
     83             }
     84             else
     85                 Send(sendQueue.Peek());//发送队列中第一条数据
     86         }
     87     }
     88 
     89     void Send(byte[] bytes)
     90     {
     91         try
     92         {
     93             mSocket.Send(bytes, SocketFlags.None);
     94             //发送成功后,从发送队列中移除已发送的消息
     95             sendQueue.Dequeue();
     96         }
     97         catch (SocketException e)
     98         {
     99             //如果错误码为10035,说明服务器缓存区满了,所以等100毫秒再次发送
    100             if (e.NativeErrorCode == 10035)
    101             {
    102                 Thread.Sleep(100);
    103                 Send(bytes);
    104             }
    105             else
    106                 Debug.Log(e.ToString());
    107         }
    108     }
    109     #endregion
    110 
    111     #region ...接收消息
    112     /// <summary>
    113     /// 解析收到的消息
    114     /// </summary>
    115     void AnalysisMessage()
    116     {
    117         while (allMessages.Count > 0)
    118         {
    119             int id = allMessages.Dequeue().messageId;
    120             switch (id)
    121             {
    122                 //根据消息id做不同的处理
    123             }
    124         }
    125     }
    126 
    127     /// <summary>
    128     /// 接收数据
    129     /// </summary>
    130     void ReceiveMessage()
    131     {
    132         while (true)
    133         {
    134             if (!mSocket.Connected)
    135                 break;
    136             byte[] recvBytesHead = GetBytesReceive(4);
    137             int bodyLength = IPAddress.NetworkToHostOrder(BitConverter.ToInt32(recvBytesHead, 0));
    138             byte[] recvBytesBody = GetBytesReceive(bodyLength);
    139 
    140             byte[] messageId = new byte[4];
    141             Array.Copy(recvBytesBody, 0, messageId, 0, 4);
    142             byte[] messageBody = new byte[bodyLength - 4];
    143             Array.Copy(recvBytesBody, 4, messageBody, 0, bodyLength - 4);
    144 
    145             if (BitConverter.IsLittleEndian)
    146                 Array.Reverse(messageId);
    147             FillAllPackages(BitConverter.ToInt32(messageId, 0), messageBody);
    148         }
    149     }
    150 
    151     /// <summary>
    152     /// 填充接收消息队列
    153     /// </summary>
    154     /// <param name="messageId"></param>
    155     /// <param name="messageBody"></param>
    156     void FillAllPackages(int messageId, byte[] messageBody)
    157     {
    158         switch (messageId)
    159         {
    160             //根据消息id处理消息,并添加到接收消息队列
    161             case 1:
    162                 allMessages.Enqueue(new Message() 
    163                 {
    164                     protobuf = ProtobufSerilizer.DeSerialize<TestTemp>(messageBody), 
    165                     messageId = messageId 
    166                 });
    167                 break;
    168         }
    169     }
    170 
    171     /// <summary>
    172     /// 接收数据并处理
    173     /// </summary>
    174     /// <param name="length"></param>
    175     /// <returns></returns>
    176     byte[] GetBytesReceive(int length)
    177     {
    178         byte[] recvBytes = new byte[length];
    179         while (length > 0)
    180         {
    181             byte[] receiveBytes = new byte[length < packageMaxLength ? length : packageMaxLength];
    182             int iBytesBody = 0;
    183             if (length >= receiveBytes.Length)
    184                 iBytesBody = mSocket.Receive(receiveBytes, receiveBytes.Length, 0);
    185             else
    186                 iBytesBody = mSocket.Receive(receiveBytes, length, 0);
    187             receiveBytes.CopyTo(recvBytes, recvBytes.Length - length);
    188             length -= iBytesBody;
    189         }
    190         return recvBytes;
    191     }
    192     #endregion
    193 
    194     /// <summary>
    195     /// 构建消息数据包
    196     /// </summary>
    197     /// <param name="protobufModel"></param>
    198     /// <param name="messageId"></param>
    199     byte[] BuildPackage(IExtensible protobufModel, int messageId)
    200     {
    201         byte[] b;
    202         if (protobufModel != null)
    203             b = ProtobufSerilizer.Serialize(protobufModel);
    204         else
    205             b = new byte[0];
    206         //消息长度(int数据,长度4) + 消息id(int数据,长度4) + 消息主体内容
    207         ByteBuffer buf = ByteBuffer.Allocate(b.Length + 4 + 4);
    208         //消息长度 = 消息主体内容长度 + 消息id长度
    209         buf.WriteInt(b.Length + 4);
    210         buf.WriteInt(messageId);
    211 
    212         if (protobufModel != null)
    213             buf.WriteBytes(b);
    214         return buf.GetBytes();
    215     }
    216 
    217     void OnDestroy()
    218     {
    219         //停止运行后,如果不关闭socket多线程,再次运行时,unity会卡死
    220         Close();
    221     }
    222 
    223     /// <summary>
    224     /// 关闭socket,终止线程
    225     /// </summary>
    226     public void Close()
    227     {
    228         if (mSocket != null)
    229         {
    230             //微软官方推荐在关闭socket前先shutdown,但是经过测试,发现网络断开后,shutdown会无法执行
    231             if (mSocket.Connected)
    232                 mSocket.Shutdown(SocketShutdown.Both);
    233             mSocket.Close();
    234             mSocket = null;
    235         }
    236         //关闭线程
    237         if (threadSend != null)
    238             threadSend.Abort();
    239         if (threadRecive != null)
    240             threadRecive.Abort();
    241         threadSend = null;
    242         threadRecive = null;
    243     }
    244 }

    到这里,使用socket处理消息的收发就基本结束了,但是,某些项目为了增强体验,可能还会增加断线重连的功能,这个功能会在下一篇讲到

    由于环境不同关系,并非所有的博客内容都会上传完整的源码,大部分的源码,大家可以到我的github主页上的UGCFramework查找

    传送门:https://github.com/wulonghao/UGCFramework
  • 相关阅读:
    应用程序池的配置 狼
    SQL跨数据库复制表数据 狼
    <script language= "javascript " for= "window " event= "onload "> 狼
    禁止虚拟目录继承根目录下web.config中的有些配置 web.config的继承禁止方法 狼
    linux 在程序里修改系统时间
    xlinux下载地址
    安装tslib1.4出现的问题汇总
    linux之看门狗 (转)
    VC中显示GIF图片
    mdev 自动挂载U盘成功
  • 原文地址:https://www.cnblogs.com/unityExplorer/p/6986474.html
Copyright © 2011-2022 走看看