这是2014年在csdn上写的,现在迁过来
最近在GitHub上发现了FastSocket框架。初步用了一下发现实现很简单。几个Command类把需要执行的命令封闭起来,框架实现了协议,分包等功能。用户无需关心。用了一段时间发现少了两个比较主要的功能,心跳,回调。网上也没找到相关的文档。干脆自己实现。
一、首先是Server端。
第1步添加一个CallbackCommand
public class CallbackCommand : ICommand<AsyncBinaryCommandInfo> { public void ExecuteCommand(IConnection connection, AsyncBinaryCommandInfo commandInfo) { if (commandInfo.Buffer == null || commandInfo.Buffer.Length == 0) { connection.BeginDisconnect(); return; } commandInfo.Reply(connection, new byte[] { 0x00 }); var v = MyService._connection.Find(model => model.ConnectionID == connection.ConnectionID); v.callBack = commandInfo; } public string Name { get { return "Callback"; } } }
第2步 客户端连入的时候将connection信息记录进一个Hash表。
public override void OnConnected(IConnection connection) { base.OnConnected(connection); _connection.Add(connection); }
第3步 IConnection接口加入三个属性。
/// <summary> /// 当前连接的最后心跳时间 /// </summary> DateTime HeartTime { get; set; } /// <summary> /// 获取或设置与用户数据 /// </summary> object UserData { get; set; } object callBack { get; set; }
最后心跳时间、客户端接入的用户信息、以及回调对象。
第4步 添加两个子线程,用于测试回调及定时心跳,这两个子,这两个子线程 可以在服务启动的时候直接启动。
void TestSend() //主动推送消息到客户端 { byte[] buffer = new byte[] { 0x00, 0x99, 0xAA, 0xFF }; Packet packet = PacketBuilder.ToAsyncBinary("TestSend", 0, buffer); while (true) { foreach (var v in _connection) { try { ((AsyncBinaryCommandInfo)v.callBack).Reply(v, buffer); } catch (Exception ex) { } } System.Threading.Thread.Sleep(1000); } } /// <summary> /// 心跳线程 /// </summary> void SocketHeart() { while (!Globals.IsStop) { lock (_connection) { for (int i = _connection.Count - 1; i >= 0; i--) if (_connection[i].HeartTime < DateTime.Now.AddSeconds(0 - (30 * 3)))//超过三个心跳包,干掉 { MyService.WriteLog("Action:"RemoveConnection"", _connection[i]); _connection.RemoveAt(i); } } System.Threading.Thread.Sleep(1000 * 10); } }
Server端到这里就可以了。接下来的客户端。
客户端这里需要解释一下。客户端在发送一个指令到服务器端的同时,会启动一个监听,因为每一个指令都有一个唯一的SeqlID,客户端在发送后启动一个监听,收到来自Server端的回复,并检查SeqID相同以后会清除这个监听。因为回调不允许清除监听,否则将不能接收到消息。那我就从这个角度考虑,把问题解决掉。
问题的关键在于BaseSocketClient类的OnMessageReceied方法,其中有一个 this._requestCollection.Remove(response.SeqID); 其中 _requestCollection就是回调队列,Remove方法就是找到这个回调并加以删除。
protected override void OnMessageReceived(IConnection connection, MessageReceivedEventArgs e) { base.OnMessageReceived(connection, e); int readlength; TResponse response = null; try { response = this._protocol.FindResponse(connection, e.Buffer, out readlength); } catch (Exception ex) { this.OnError(connection, ex); connection.BeginDisconnect(ex); e.SetReadlength(e.Buffer.Count); return; } if (response != null) { this.OnResponse(connection, response) <span style="">var request = this._requestCollection.Remove(response.SeqID);</span> if (request == null) ThreadPool.QueueUserWorkItem(_ => { try { this.HandleUnknowResponse(connection, response); } catch (Exception ex) { SocketBase.Log.Trace.Error(ex.Message, ex); } }); else ThreadPool.QueueUserWorkItem(_ => { try { request.SetResult(response); } catch (Exception ex) { SocketBase.Log.Trace.Error(ex.Message, ex); } }); } e.SetReadlength(readlength); }
接下来找到Remove方法,将Remove加以修改。因为客户端启动的第一件事情就是执行回调操作,所以SeqID == 1 ,我们就把这个SeqID一直保留,同样找到对_dic执行Remove操作的地方加以修改。
public Request<TResponse> Remove(int seqID) { Request<TResponse> removed; if (seqID == 1 && this._dic.ContainsKey(seqID)) return this._dic[seqID]; this._dic.TryRemove(seqID, out removed); return removed; }
倒数第二步。AsyncBinarySocketClient类里面添加回调操作。这里我做了一个回调事件。
public delegate void OnCallback(byte[] buffer); public event OnCallback Callback = null;
并在Send方法里面添加该事件的实现。
public Task<TResult> Send<TResult>(byte[] consistentKey, string cmdName, byte[] payload, Func<AsyncBinaryResponse, TResult> funcResultFactory, object asyncState = null) { if (string.IsNullOrEmpty(cmdName)) throw new ArgumentNullException("cmdName"); if (funcResultFactory == null) throw new ArgumentNullException("funcResultFactory"); var seqID = base.NextRequestSeqID(); var cmdLength = cmdName.Length; var messageLength = (payload == null ? 0 : payload.Length) + cmdLength + 6; var sendBuffer = new byte[messageLength + 4]; //write message length Buffer.BlockCopy(NetworkBitConverter.GetBytes(messageLength), 0, sendBuffer, 0, 4); //write seqID. Buffer.BlockCopy(NetworkBitConverter.GetBytes(seqID), 0, sendBuffer, 4, 4); //write response flag length. Buffer.BlockCopy(NetworkBitConverter.GetBytes((short)cmdLength), 0, sendBuffer, 8, 2); //write response flag Buffer.BlockCopy(Encoding.ASCII.GetBytes(cmdName), 0, sendBuffer, 10, cmdLength); //write body buffer if (payload != null && payload.Length > 0) Buffer.BlockCopy(payload, 0, sendBuffer, 10 + cmdLength, payload.Length); var source = new TaskCompletionSource<TResult>(asyncState); base.Send(new Request<Response.AsyncBinaryResponse>(consistentKey, seqID, cmdName, sendBuffer, ex => source.TrySetException(ex), response => { TResult result; try { result = funcResultFactory(response); } catch (Exception ex) { source.TrySetException(ex); return; } source.TrySetResult(result); <span style=""> if(cmdName == "Callback" && Callback != null) { Callback(result as byte[]); } </span> })); return source.Task; }
最后一步,客户端实现这个回调事件,并发送回调操作到服务器。
Globals.client.RegisterServerNode("127.0.0.1:8401", new System.Net.IPEndPoint(System.Net.IPAddress.Parse("127.0.0.1"), 8401)); Globals.client.Callback += client_Callback; Globals.client.Send("Callback", new byte[] { 0x00 }, res => res.Buffer);