zoukankan      html  css  js  c++  java
  • FastSocket框架的回调实现

    这是2014年在csdn上写的,现在迁过来

    最近在GitHub上发现了FastSocket框架。初步用了一下发现实现很简单。几个Command类把需要执行的命令封闭起来,框架实现了协议,分包等功能。用户无需关心。用了一段时间发现少了两个比较主要的功能,心跳,回调。网上也没找到相关的文档。干脆自己实现。

    一、首先是Server端。

    第1步添加一个CallbackCommand

      public class CallbackCommand : ICommand<AsyncBinaryCommandInfo>
        {
            public void ExecuteCommand(IConnection connection, AsyncBinaryCommandInfo commandInfo)
            {
                if (commandInfo.Buffer == null || commandInfo.Buffer.Length == 0)
                {
                    connection.BeginDisconnect();
                    return;
                }
                commandInfo.Reply(connection, new byte[] { 0x00 });
                var v = MyService._connection.Find(model => model.ConnectionID == connection.ConnectionID);
                v.callBack = commandInfo;
    
            }
            public string Name { get { return "Callback"; } }
        }
    

      

    第2步 客户端连入的时候将connection信息记录进一个Hash表。

            public override void OnConnected(IConnection connection)
            {
                base.OnConnected(connection);
                _connection.Add(connection);
            }
     
    

      

    第3步 IConnection接口加入三个属性。

            /// <summary>
            /// 当前连接的最后心跳时间
            /// </summary>
            DateTime HeartTime { get; set; }
            /// <summary>
            /// 获取或设置与用户数据
            /// </summary>
            object UserData { get; set; }
    
            object callBack { get; set; }
    
    
     
    

      

    最后心跳时间、客户端接入的用户信息、以及回调对象。

    第4步 添加两个子线程,用于测试回调及定时心跳,这两个子,这两个子线程 可以在服务启动的时候直接启动。

            void TestSend() //主动推送消息到客户端
            {
                byte[] buffer = new byte[] { 0x00, 0x99, 0xAA, 0xFF };
                Packet packet = PacketBuilder.ToAsyncBinary("TestSend", 0, buffer);
    
                while (true)
                {
                    foreach (var v in _connection)
                    {
                        try
                        {
                            ((AsyncBinaryCommandInfo)v.callBack).Reply(v, buffer);
                        }
                        catch (Exception ex) { }
                    }
    
                    System.Threading.Thread.Sleep(1000);
                }
            }
            /// <summary>
            /// 心跳线程
            /// </summary>
            void SocketHeart()
            {
                while (!Globals.IsStop)
                {
                    lock (_connection)
                    {
                        for (int i = _connection.Count - 1; i >= 0; i--)
                            if (_connection[i].HeartTime < DateTime.Now.AddSeconds(0 - (30 * 3)))//超过三个心跳包,干掉
                            {
                                MyService.WriteLog("Action:"RemoveConnection"", _connection[i]);
                                _connection.RemoveAt(i);
                            }
                    }
                    System.Threading.Thread.Sleep(1000 * 10);
                }
            }
    
    
     
    
     
    

      

    Server端到这里就可以了。接下来的客户端。

    客户端这里需要解释一下。客户端在发送一个指令到服务器端的同时,会启动一个监听,因为每一个指令都有一个唯一的SeqlID,客户端在发送后启动一个监听,收到来自Server端的回复,并检查SeqID相同以后会清除这个监听。因为回调不允许清除监听,否则将不能接收到消息。那我就从这个角度考虑,把问题解决掉。


    问题的关键在于BaseSocketClient类的OnMessageReceied方法,其中有一个 this._requestCollection.Remove(response.SeqID); 其中 _requestCollection就是回调队列,Remove方法就是找到这个回调并加以删除。

    protected override void OnMessageReceived(IConnection connection, MessageReceivedEventArgs e)
            {
                base.OnMessageReceived(connection, e);
    
                int readlength;
                TResponse response = null;
                try
                {
                    response = this._protocol.FindResponse(connection, e.Buffer, out readlength);
                }
                catch (Exception ex)
                {
                    this.OnError(connection, ex);
                    connection.BeginDisconnect(ex);
                    e.SetReadlength(e.Buffer.Count);
                    return;
                }
    
                if (response != null)
                {
                    this.OnResponse(connection, response)
    
                    <span style="">var request = this._requestCollection.Remove(response.SeqID);</span>
    
                    if (request == null)
                        ThreadPool.QueueUserWorkItem(_ =>
                        {
                            try { this.HandleUnknowResponse(connection, response); }
                            catch (Exception ex) { SocketBase.Log.Trace.Error(ex.Message, ex); }
                        });
                    else
                        ThreadPool.QueueUserWorkItem(_ =>
                        {
                            try { request.SetResult(response); }
                            catch (Exception ex) { SocketBase.Log.Trace.Error(ex.Message, ex); }
                        });
                }
                e.SetReadlength(readlength);
            }
     
    
     
    

      

    接下来找到Remove方法,将Remove加以修改。因为客户端启动的第一件事情就是执行回调操作,所以SeqID == 1 ,我们就把这个SeqID一直保留,同样找到对_dic执行Remove操作的地方加以修改。

       public Request<TResponse> Remove(int seqID)
                {
                    Request<TResponse> removed;
                    if (seqID == 1 && this._dic.ContainsKey(seqID)) return this._dic[seqID];
    
                    this._dic.TryRemove(seqID, out removed);
                    return removed;
                }
    

      


    倒数第二步。AsyncBinarySocketClient类里面添加回调操作。这里我做了一个回调事件。

            public delegate void OnCallback(byte[] buffer);
            public event OnCallback Callback = null;
    

      

    并在Send方法里面添加该事件的实现。

     public Task<TResult> Send<TResult>(byte[] consistentKey, string cmdName, byte[] payload, Func<AsyncBinaryResponse, TResult> funcResultFactory, object asyncState = null)
            {
                if (string.IsNullOrEmpty(cmdName)) throw new ArgumentNullException("cmdName");
                if (funcResultFactory == null) throw new ArgumentNullException("funcResultFactory");
    
                var seqID = base.NextRequestSeqID();
                var cmdLength = cmdName.Length;
                var messageLength = (payload == null ? 0 : payload.Length) + cmdLength + 6;
                var sendBuffer = new byte[messageLength + 4];
    
                //write message length
                Buffer.BlockCopy(NetworkBitConverter.GetBytes(messageLength), 0, sendBuffer, 0, 4);
                //write seqID.
                Buffer.BlockCopy(NetworkBitConverter.GetBytes(seqID), 0, sendBuffer, 4, 4);
                //write response flag length.
                Buffer.BlockCopy(NetworkBitConverter.GetBytes((short)cmdLength), 0, sendBuffer, 8, 2);
                //write response flag
                Buffer.BlockCopy(Encoding.ASCII.GetBytes(cmdName), 0, sendBuffer, 10, cmdLength);
                //write body buffer
                if (payload != null && payload.Length > 0)
                    Buffer.BlockCopy(payload, 0, sendBuffer, 10 + cmdLength, payload.Length);
    
                var source = new TaskCompletionSource<TResult>(asyncState);
                base.Send(new Request<Response.AsyncBinaryResponse>(consistentKey, seqID, cmdName, sendBuffer, ex => source.TrySetException(ex),
                    response =>
                    {
                        TResult result;
                        try { result = funcResultFactory(response); }
                        catch (Exception ex) { source.TrySetException(ex); return; }
    
                        source.TrySetResult(result);  
    <span style="">                    if(cmdName == "Callback" && Callback != null)
                        {
                            Callback(result as byte[]);
                        }
    </span>                }));
                return source.Task;
            }
    
    
     
    

      

    最后一步,客户端实现这个回调事件,并发送回调操作到服务器。

                Globals.client.RegisterServerNode("127.0.0.1:8401", new System.Net.IPEndPoint(System.Net.IPAddress.Parse("127.0.0.1"), 8401));
                Globals.client.Callback += client_Callback;
                Globals.client.Send("Callback", new byte[] { 0x00 }, res => res.Buffer);
    

      

    到这里基本就完成了。
  • 相关阅读:
    分布式01-Dubbo基础背景
    项目总结17-使用layui table分页表格
    项目总结16-创建验证码图片
    Springboot学习07-数据源Druid
    Springboot学习06-Spring AOP封装接口自定义校验
    Springboot学习05-自定义错误页面完整分析
    Springboot学习04-默认错误页面加载机制源码分析
    Springboot学习03-SpringMVC自动配置
    项目总结15:JavaScript模拟表单提交(实现window.location.href-POST提交数据效果)
    Springboot学习02-webjars和静态资源映射规则
  • 原文地址:https://www.cnblogs.com/shushukui/p/9295106.html
Copyright © 2011-2022 走看看