zoukankan      html  css  js  c++  java
  • 极简风格网络消息以及分发架构

      之前有一次情况特殊, 只能手撸网络框架代码, 因为时间紧张就需要一个非常极简风格的消息和分发框架了, 只要把网络的发送逻辑跟接收入口跟分发中心绑定, 然后所有操作只需要跟消息中心交互就行了, 就像下面这样 : 

        public class MyIP : INetMessage
        {
            public string ipAddress;
            public int port;
        }
        
        // 接收
        private static void OnReceviedInfo(MyIP info)
        {
            Debug.Log("MyIP " + info.ipAddress);
        }
        MsgCenter.instance.Register<MyIP>(OnReceviedInfo);
        
        // 发送
        MsgCenter.instance.SendNetMessage(new MyIP() { ipAddress = "192.168.1.12" });

      就只需要定义一个类型继承 INetMessage, 然后注册接收回调, 发送的时候直接调用消息中心进行发送即可, 对于临时需求来说非常方便了.

      然后消息中心里面最核心的就是消息的自动注册和分发了, 它需要有一个把对象序列化的工具, 可以是Protobuf, Json序列化等各种工具, 这里因为对数据和效率要求不高, 并且减少工序, 直接用对象转Json然后转字节的方式来了 : 

        // 对象转字节数组  
        public static byte[] StructToBytes<T>(T structObj)
        {
            var jsonStr = LitJson.JsonMapper.ToJson(structObj);
            var sendBytes = System.Text.Encoding.UTF8.GetBytes(jsonStr);
            return sendBytes;
        }
    
        // 字节转对象
        public static object BytesToStruct(byte[] bytes, int startIndex, int length, Type strcutType)
        {
            var jsonStr = System.Text.Encoding.UTF8.GetString(bytes, startIndex, length);
            var tag = LitJson.JsonMapper.ToObject(jsonStr, strcutType);        // 这个接口插件没有提供, 不过它的函数已经支持这个方法了, 自己添加即可
            return tag;
        }

      对象能够转换字节之后, 就是添加消息收发的逻辑了, 这里使用了类型的名称作为ID, 自动反序列化的关联性就在ID上, 为了极简方案所以使用类名作为ID, 只要类名不重复即可 : 

            // 对象转为发送字节
            public static byte[] StructToSendBuffer<T>(T structObj) where T : INetMessage
            {
                var sendData = StructToBytes<T>(structObj);
                int sendDataLen = sendData.Length;
                var id = typeof(T).Name;
                var sendID = System.Text.Encoding.UTF8.GetBytes(id);
                int sendIDLen = sendID.Length;
                int totalLen = 4 + 4 + sendIDLen + sendDataLen;
                var sendBuffer = new byte[totalLen];
    
                Array.Copy(BitConverter.GetBytes(totalLen), sendBuffer, 4);
                Array.Copy(BitConverter.GetBytes(sendIDLen), 0, sendBuffer, 4, 4);
                Array.Copy(sendID, 0, sendBuffer, 8, sendIDLen);
                Array.Copy(sendData, 0, sendBuffer, 8 + sendIDLen, sendData.Length);
    
                return sendBuffer;
            }

      发送结构就如下图 : 

      第一个 byte[4] 代表消息总长度, 这样就可以解决分包粘包的问题了, 不过我们这个临时的都是小消息, 就没有对它进行处理.

      第二个 byte[4] 代表后面ID的长度, 因为ID使用的是类名称, 长度不一定, 所以要添加这个.

      第三个 ID 就是类名.

      第四个 就是对象的序列化了, 它的长度可以用 (总长-8-ID长度) 得到, 也很简单.

      所以发送数据就这样简单完成了, 消息中心的发送可以直接一个代理绑定即可 : 

    public System.Action<byte[]> _sendMessageByte = null;
    [System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.Synchronized)]
    public void SendNetMessage<T>(T message) where T : INetMessage
    {
        if(_sendMessageByte != null)
        {
            var sendBuffer = StructToSendBuffer<T>(message);
            _sendMessageByte.Invoke(sendBuffer);
        }
    }

      接下来就是非常有意思的接收到数据之后的反序列化了, 因为所有消息都是继承 INetMessage 的, 所以可以通过 Assembly 获取全部消息对象, 然后创建反序列化回调就行了, 看下面的代码 : 

    Dictionary<string, System.Func<byte[], int, int, INetMessage>> _messageDeserializer = new ......
    
    private void InitDeserializeFunc()
    {
        var baseMsgType = typeof(INetMessage);
        foreach(var assembly in AppDomain.CurrentDomain.GetAssemblies())
        {
            var types = assembly.GetTypes();
            foreach(var type in types)
            {
                var interfaces = type.GetInterface(baseMsgType.Name);
                if(interfaces != null)
                {
                    _messageDeserializer[type.Name] = (_bytes, _startIndex, _length) =>
                    {
                        var data = BytesToStruct(_bytes, _startIndex, _length, type);
                        if(data != null)
                        {
                            return (INetMessage)data;
                        }
                        return null;
                    };
                }
            }
        }    
    }
        [System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.Synchronized)]
        public void ReceiveNetMessage(byte[] message)
        {
            try
            {
                int len = System.BitConverter.ToInt32(message, 0);
                int idLen = System.BitConverter.ToInt32(message, 4);
                string id = System.Text.Encoding.UTF8.GetString(message, 8, idLen);
                var func = _messageDeserializer.TryGetNullableValue(id);
                if(func != null)
                {
                    var msg = func.Invoke(message, 8 + idLen, len - 8 - idLen);
                    if(msg != null)
                    {
                        lock(_receiveLock)
                        {
                            _receivedMessage.Add(msg);
                        }
                    }
                }
            }
            catch { }
        }

      这里 ReceiveNetMessage 是对接到网络接收端的, 省掉了对分包/粘包这些的逻辑, 这里从消息中读取出了 ID, 然后从容器里面找到反序列化方法, 然后反序列化到已接收队列中. 

      消息分发的逻辑就使用了比较低效的 DynamicInvoke 的方式, 通过注册回调来自动分发 : 

    Dictionary<Type, Delegate> _msgDispatcher = new Dictionary<Type, Delegate>();
    
    public void Register<T>(System.Action<T> action) where T : INetMessage
    {
        var type = typeof(T);
        var call = _msgDispatcher.TryGetNullableValue(type);
        if(call == null)
        {
            _msgDispatcher[type] = action;
        }
        else
        {
            _msgDispatcher[type] = (Delegate.Combine(call, action));
        }
    }
    private void Update()
    {
        if(_receivedMessage.Count > 0)
        {
            lock(_receiveLock)
            {
                for(int i = 0; i < _receivedMessage.Count; i++)
                {
                    var msg = _receivedMessage[i];
                    var type = msg.GetType();
                    var call = _msgDispatcher.TryGetNullableValue(type);
                    if(call != null)
                    {
                        call.DynamicInvoke(msg);
                    }
                }
                _receivedMessage.Clear();
            }
        }
    }

      这个 Update 如果是 Unity 的生命周期的话, 那么接收消息回调就是在主线程里面的了, 如果是开了一个线程来 Tick 这个 Update 的话, 它就是在非主线程的了, 这个需要注意, 为了方便起见一般消息中心就继承 MonoBehaviour 了.  

      以上, 极简风格, 极垃圾的性能(可以优化抢救的)...

    --------------- 完整代码 -----------------

    using System;
    using System.Text;
    using System.Collections;
    using System.Collections.Generic;
    using System.Runtime.InteropServices;
    using UnityEngine;
    using System.Reflection;
    
    namespace NetWorkingModule
    {
        public class MsgCenter : Singleton<MsgCenter>
        {
            private static readonly Dictionary<string, System.Func<byte[], int, int, INetMessage>> _messageDeserializer
                = new Dictionary<string, System.Func<byte[], int, int, INetMessage>>();
            private readonly Dictionary<Type, Delegate> _msgDispatcher = new Dictionary<Type, Delegate>();
    
            private static volatile List<INetMessage> _receivedMessage = new List<INetMessage>();
            private static object _receiveLock = new object();
    
            public System.Action<byte[]> _sendMessageByte = null;
            public event System.Action<byte[]> sendMessageByte
            {
                add { _sendMessageByte += value; }
                remove { _sendMessageByte -= value; }
            }
    
            private System.Threading.Thread _update = null;
    
            #region Override Funcs
            protected override void Initialize()
            {
                InitDeserializeFunc();
            }
    
            protected override void UnInitialize()
            {
            }
            #endregion
    
            #region Main Funcs
            private void InitDeserializeFunc()
            {
                var baseMsgType = typeof(INetMessage);
                foreach(var assembly in AppDomain.CurrentDomain.GetAssemblies())
                {
                    var types = assembly.GetTypes();
                    foreach(var type in types)
                    {
                        var interfaces = type.GetInterface(baseMsgType.Name);
                        if(interfaces != null)
                        {
                            _messageDeserializer[type.Name] = (_bytes, _startIndex, _length) =>
                            {
                                var data = BytesToStruct(_bytes, _startIndex, _length, type);
                                if(data != null)
                                {
                                    return (INetMessage)data;
                                }
                                return null;
                            };
                        }
                    }
                }
                if(_update != null)
                {
                    _update.Abort();
                }
                _update = new System.Threading.Thread(() =>
                {
                    while(true)
                    {
                        Update();
                        System.Threading.Thread.Sleep(10);
                    }
                });
                _update.Start();
            }
    
            [System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.Synchronized)]
            public void SendNetMessage<T>(T message) where T : INetMessage
            {
                if(_sendMessageByte != null)
                {
                    var sendBuffer = StructToSendBuffer<T>(message);
                    _sendMessageByte.Invoke(sendBuffer);
                }
            }
    
            [System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.Synchronized)]
            public void ReceiveNetMessage(byte[] message)
            {
                try
                {
                    int len = System.BitConverter.ToInt32(message, 0);
                    int idLen = System.BitConverter.ToInt32(message, 4);
                    string id = System.Text.Encoding.UTF8.GetString(message, 8, idLen);
                    var func = _messageDeserializer.TryGetNullableValue(id);
                    if(func != null)
                    {
                        var msg = func.Invoke(message, 8 + idLen, len - 8 - idLen);
                        if(msg != null)
                        {
                            lock(_receiveLock)
                            {
                                _receivedMessage.Add(msg);
                            }
                        }
                    }
                }
                catch { }
            }
    
            public void Register<T>(System.Action<T> action) where T : INetMessage
            {
                var type = typeof(T);
                var call = _msgDispatcher.TryGetNullableValue(type);
                if(call == null)
                {
                    _msgDispatcher[type] = action;
                }
                else
                {
                    _msgDispatcher[type] = (Delegate.Combine(call, action));
                }
            }
            public void UnRegister<T>(System.Action<T> action) where T : INetMessage
            {
                var type = typeof(T);
                var call = _msgDispatcher.TryGetNullableValue(type);
                if(call != null)
                {
                    _msgDispatcher[type] = Delegate.Remove(call, action);
                }
            }
            #endregion
    
            #region Help Funcs
            // 对象转为发送字节
            public static byte[] StructToSendBuffer<T>(T structObj) where T : INetMessage
            {
                var sendData = StructToBytes<T>(structObj);
                int sendDataLen = sendData.Length;
                var id = typeof(T).Name;
                var sendID = System.Text.Encoding.UTF8.GetBytes(id);
                int sendIDLen = sendID.Length;
                int totalLen = 4 + 4 + sendIDLen + sendDataLen;
                var sendBuffer = new byte[totalLen];
    
                Array.Copy(BitConverter.GetBytes(totalLen), sendBuffer, 4);
                Array.Copy(BitConverter.GetBytes(sendIDLen), 0, sendBuffer, 4, 4);
                Array.Copy(sendID, 0, sendBuffer, 8, sendIDLen);
                Array.Copy(sendData, 0, sendBuffer, 8 + sendIDLen, sendData.Length);
    
                return sendBuffer;
            }
    
            // 对象转字节数组  
            public static byte[] StructToBytes<T>(T structObj)
            {
                var jsonStr = LitJson.JsonMapper.ToJson(structObj);
                var sendBytes = System.Text.Encoding.UTF8.GetBytes(jsonStr);
                return sendBytes;
            }
    
            // 字节转对象
            public static object BytesToStruct(byte[] bytes, int startIndex, int length, Type strcutType)
            {
                var jsonStr = System.Text.Encoding.UTF8.GetString(bytes, startIndex, length);
                var tag = LitJson.JsonMapper.ToObject(jsonStr, strcutType);
                return tag;
            }
            #endregion
    
            private void Update()
            {
                if(_receivedMessage.Count > 0)
                {
                    lock(_receiveLock)
                    {
                        for(int i = 0; i < _receivedMessage.Count; i++)
                        {
                            var msg = _receivedMessage[i];
                            var type = msg.GetType();
                            var call = _msgDispatcher.TryGetNullableValue(type);
                            if(call != null)
                            {
                                Core.ThreadMaster.RunOnMainThread(()=> { call.DynamicInvoke(msg); });
                            }
                        }
                        _receivedMessage.Clear();
                    }
                }
            }
        }
    }

    PS : 为了在编辑器等情况下使用, 继承改为普通的 SingleTon 了, Update 的更新也进了工作线程里, 回调通过 SynchronizationContext 来回到主线程, 性能更惨了哈哈 

      

    也贴一下标记主线程的方法吧 : 

        public static int MainThreadId
        {
            get; private set;
        }
        public static SynchronizationContext UnitySynchronizationContext
        {
            get; private set;
        }
    
    #if UNITY_EDITOR
        [UnityEditor.InitializeOnLoadMethod()]
    #endif
        [RuntimeInitializeOnLoadMethod(RuntimeInitializeLoadType.BeforeSceneLoad)]
        private static void Init()
        {
            UnitySynchronizationContext = SynchronizationContext.Current;
            MainThreadId = Thread.CurrentThread.ManagedThreadId;
            Debug.Log("Main Thread ID : " + MainThreadId);
        }
        
        public static void RunOnMainThread(System.Action call)
        {
            if(Thread.CurrentThread.ManagedThreadId == MainThreadId)
            {
                call.Invoke();
            }
            else
            {
                UnitySynchronizationContext.Post((_state) => { call.Invoke(); }, null);
            }
        }
  • 相关阅读:
    占位博客
    占位博客
    占位博客
    占位
    占位
    占位
    占位
    占位
    python字典设置初始值setdefault()与get()
    自然语言处理如何入门
  • 原文地址:https://www.cnblogs.com/tiancaiwrk/p/15213628.html
Copyright © 2011-2022 走看看