zoukankan      html  css  js  c++  java
  • MMF循环队列实现RPC

    为内存映射文件模块单独弄设计的,用于MMF上实现多进程见消息传输;测试中发现命名管道的效率很差,性能还不如用Socket Loopback,决定试试MMF。

    测试了下性能还不错,在我机器上:测试每个包4K 每秒可以执行近100w写入+读取操作,也就是每秒8G的数据操作,可以满足觉大多数业务需求。

    唯一不足在于全局锁,无法利用多核;不过可以使用多个MMF块绕过,无锁编程还是太复杂了。

      1  public unsafe class LoopMemoryStream
      2         {
      3             private  readonly int PREHEAD = sizeof(int*) * 2;
      4             private readonly byte* _dataArea;
      5             private readonly byte* _start;
      6             private readonly byte* _end;
      7             private readonly int _totalLen;
      8 
      9             private int* _head;//save next available byte offset index
     10             private int* _tail; //save first data byte offset index
     11 
     12             public LoopMemoryStream(byte* dataArea, int length)
     13             {
     14                 _dataArea = dataArea;
     15                 _totalLen = length - PREHEAD;
     16 
     17                 _head = (int*)_dataArea;
     18                 _tail = (int*)_dataArea + 1;
     19 
     20                 _start = dataArea + PREHEAD;
     21                 _end = dataArea + length;
     22             }
     23 
     24             public int* Head { get { return _head; } }
     25             public int* Tail { get { return _tail; } }
     26             public bool IsEmpty { get { return *_head == *_tail; } }
     27             public int DataLen { get { return _totalLen - AvailLen; } }
     28             public int AvailLen
     29             {
     30                 get
     31                 {
     32                     int diff = *_head - *_tail;
     33                     return diff >= 0 ? _totalLen - diff : -diff;
     34                 }
     35             }
     36 
     37             public void ClearData()
     38             {
     39                 *_head = 0;
     40                 *_tail = 0;
     41             }
     42 
     43             public void Write(byte[] data, int offSize, int length)
     44             {
     45                 if (AvailLen < length + 4)
     46                     throw new ArgumentException();
     47 
     48                 WriteInt32(length);
     49                 WriteBytes(data, offSize, length);
     50             }
     51 
     52             public byte[] Read()
     53             {
     54                 if (DataLen < 4)
     55                     throw new ArgumentException();
     56 
     57                 int len = GetInt32();
     58 
     59                 if (DataLen < len)
     60                     throw new ArgumentException();
     61 
     62                 return ReadBytes(len);
     63             }
     64 
     65             public byte[] ReadBytes(int length)
     66             {
     67                 byte[] data = new byte[length];
     68                 fixed (byte* pd = data)
     69                 {
     70                     if (*_tail > *_head && _totalLen - *_tail - 1 <= length)
     71                     {
     72                         for (int i = 0; i < length; i++)
     73                         {
     74                             *(pd + i) = *(_start + *_tail);
     75 
     76                             if (*_tail == _totalLen - 1)
     77                                 *_tail = 0;
     78                             else
     79                                 (*_tail)++;
     80                         }
     81                     }
     82                     else
     83                     {
     84                         MemCopy(_start + *_tail, pd, length);
     85                         *_tail += length;
     86                     }
     87                 }
     88 
     89                 return data;
     90             }
     91 
     92             private void WriteBytes(byte[] data, int offSize, int length)
     93             {
     94                 int end = offSize + length;
     95                 fixed (byte* pd = data)
     96                 {
     97                     if (*_head >= *_tail && _totalLen - *_head - 1 <= length)
     98                     {
     99                         for (int i = offSize; i < end; i++)
    100                         {
    101                             *(_start + *_head) = *(pd + i);
    102                             if (*_head == _totalLen - 1)
    103                                 *_head = 0;
    104                             else
    105                                 (*_head)++;
    106                         }
    107                     }
    108                     else
    109                     {
    110                         MemCopy(pd + offSize, _start + *_head, length);
    111                         *_head += length;
    112                     }
    113                 }
    114             }
    115 
    116             private int GetInt32()
    117             {
    118                 byte[] lenArr = ReadBytes(4);
    119                 fixed (byte* p = lenArr)
    120                 {
    121                     return *(int*)p;
    122                 }
    123             }
    124 
    125             private void WriteInt32(int value)
    126             {
    127                 byte[] lenArr = new byte[4];
    128                 fixed (byte* p = lenArr)
    129                 {
    130                     *(int*)p = value;
    131                 }
    132 
    133                 WriteBytes(lenArr, 04);
    134             }
    135 
    136 
    137             [DllImport("msvcrt.dll", SetLastError = false)]
    138             static extern IntPtr memcpy(IntPtr dest, IntPtr src, int len);
    139             /// <summary>
    140             /// 比MemCopy2 快1/3
    141             /// </summary>
    142             private static void MemCopy(byte* src, byte* dest, int len)
    143             {
    144                 memcpy(new IntPtr(dest), new IntPtr(src), len);
    145             }
    146             /// <summary>
    147             /// 比循环Copy速度快10倍
    148             /// </summary>
    149             private static void MemCopy2(byte* src, byte* dest, int len)
    150             {
    151                 if (len >= 16)
    152                 {
    153                     do
    154                     {
    155                         *(long*)dest = *(long*)src;
    156                         *(long*)(dest + 8) = *(long*)(src + 8);
    157                         dest += 16;
    158                         src += 16;
    159                     }
    160                     while ((len -= 16) >= 16);
    161                 }
    162                 if (len > 0)
    163                 {
    164                     if ((len & 8) != 0)
    165                     {
    166                         *(long*)dest = *(long*)src;
    167                         dest += 8;
    168                         src += 8;
    169                     }
    170                     if ((len & 4) != 0)
    171                     {
    172                         *(int*)dest = *(int*)src;
    173                         dest += 4;
    174                         src += 4;
    175                     }
    176                     if ((len & 2) != 0)
    177                     {
    178                         *(short*)dest = *(short*)src;
    179                         dest += 2;
    180                         src += 2;
    181                     }
    182                     if ((len & 1) != 0)
    183                     {
    184                         byte* d = dest;
    185                         dest = d + 1;
    186                         byte* s = src;
    187                         src = s + 1;
    188                         *d = *s;
    189                     }
    190                 }
    191             }

    192         } 

    测试代码:

                Random rand=new Random();
                fixed (byte* p = new byte[1024 * 1024])
                {
                    LoopMemoryStream stream = new LoopMemoryStream(p, 1024 * 1024);
                    CodeTimer.Time("test"200, () =>
                        {
                            for (int i = 0; i < 10000; i++)
                            {
                                byte[] data = new byte[rand.Next(04096)];
                                stream.Write(data, 0, data.Length);
                                if (stream.Read().Length != data.Length)
                                    throw new ArgumentException();
                            }
                        });

                } 

    附:

    API 代码,.Net自带MMF库有些问题:

      public sealed class Win32API
        {
            [DllImport("Kernel32", CharSet = CharSet.Auto, SetLastError = true)]
            private static extern IntPtr CreateFileMapping(IntPtr hFile, IntPtr lpAttributes, FileMapProtection flProtect,
                Int32 dwMaxSizeHi, Int32 dwMaxSizeLow, string lpName);
    
            public static IntPtr CreateFileMapping(System.IO.FileStream File,
                FileMapProtection flProtect, Int64 ddMaxSize, string lpName)
            {
                int Hi = (Int32)(ddMaxSize / Int32.MaxValue);
                int Lo = (Int32)(ddMaxSize % Int32.MaxValue);
                return CreateFileMapping(File.SafeFileHandle.DangerousGetHandle(), IntPtr.Zero, flProtect, Hi, Lo, lpName);
            }
    
            public static IntPtr CreateFileMapping(SafeFileHandle handle,
        FileMapProtection flProtect, Int64 ddMaxSize, string lpName)
            {
                int Hi = (Int32)(ddMaxSize / Int32.MaxValue);
                int Lo = (Int32)(ddMaxSize % Int32.MaxValue);
                return CreateFileMapping(handle.DangerousGetHandle(), IntPtr.Zero, flProtect, Hi, Lo, lpName);
            }
    
            [DllImport("kernel32.dll", SetLastError = true)]
            public static extern IntPtr OpenFileMapping(FileMapAccess DesiredAccess, bool bInheritHandle, string lpName);
    
            public static unsafe IntPtr Memcpy(byte* dest, byte* src, int len)
            {
                return memcpy(new IntPtr(dest), new IntPtr(src), len);
            }
    
            [DllImport("msvcrt.dll", SetLastError = false)]
            public static extern IntPtr memcpy(IntPtr dest, IntPtr src, int len);
    
            [Flags]
            public enum FileMapProtection : uint
            {
                PageReadonly = 0x02,
                PageReadWrite = 0x04,
                PageWriteCopy = 0x08,
                PageExecuteRead = 0x20,
                PageExecuteReadWrite = 0x40,
                SectionCommit = 0x8000000,
                SectionImage = 0x1000000,
                SectionNoCache = 0x10000000,
                SectionReserve = 0x4000000,
            }
    
            [DllImport("Kernel32", CharSet = CharSet.Auto, SetLastError = true)]
            private static extern IntPtr MapViewOfFile(IntPtr hFileMapping, FileMapAccess dwDesiredAccess, Int32 dwFileOffsetHigh, Int32 dwFileOffsetLow, Int32 dwNumberOfBytesToMap);
            public static IntPtr MapViewOfFile(IntPtr hFileMapping, FileMapAccess dwDesiredAccess, Int64 ddFileOffset, Int32 dwNumberOfBytesToMap)
            {
                int Hi = (Int32)(ddFileOffset / Int32.MaxValue);
                int Lo = (Int32)(ddFileOffset % Int32.MaxValue);
                return MapViewOfFile(hFileMapping, dwDesiredAccess, Hi, Lo, dwNumberOfBytesToMap);
            }
    
            [Flags]
            public enum FileMapAccess : uint
            {
                FileMapCopy = 0x0001,
                FileMapWrite = 0x0002,
                FileMapRead = 0x0004,
                FileMapAllAccess = 0x001f,
                fileMapExecute = 0x0020,
            }
    
            [DllImport("kernel32.dll")]
            public static extern bool FlushViewOfFile(IntPtr lpBaseAddress,
               Int32 dwNumberOfBytesToFlush);
    
            [DllImport("kernel32")]
            public static extern bool UnmapViewOfFile(IntPtr lpBaseAddress);
    
            [DllImport("kernel32", SetLastError = true)]
            public static extern bool CloseHandle(IntPtr hFile);
    
            [DllImport("kernel32.dll")]
            public static extern void GetSystemInfo([MarshalAs(UnmanagedType.Struct)] ref SYSTEM_INFO lpSystemInfo);
    
            [StructLayout(LayoutKind.Sequential)]
            public struct SYSTEM_INFO
            {
                public _PROCESSOR_INFO_UNION uProcessorInfo;
                public uint dwPageSize;
                public IntPtr lpMinimumApplicationAddress;
                public IntPtr lpMaximumApplicationAddress;
                public IntPtr dwActiveProcessorMask;
                public uint dwNumberOfProcessors;
                public uint dwProcessorType;
                public uint dwAllocationGranularity;
                public ushort dwProcessorLevel;
                public ushort dwProcessorRevision;
            }
    
            [StructLayout(LayoutKind.Explicit)]
            public struct _PROCESSOR_INFO_UNION
            {
                [FieldOffset(0)]
                public uint dwOemId;
                [FieldOffset(0)]
                public ushort wProcessorArchitecture;
                [FieldOffset(2)]
                public ushort wReserved;
            }
        }
      public unsafe class MMFMessageQueue
        {
            private volatile void* _shm;
            private volatile int* _lck;
            private volatile int* _head;
            private volatile int* _tail;
    
            private LoopMemoryStream _ms;
            private int _size;
            private int _realSize;
            private string _mmfName;
    
            public enum QueueResult
            {
                EMPTY,
                SUCCESS,
                FULL,
            }
    
            public int Count { get; set; }
    
            public MMFMessageQueue(string mmfName,int size)
            {
                _mmfName = mmfName;
                _size = size;
                _realSize = _size - sizeof(int*) * 3;
    
                _shm = GetOrCreateMMFView();
                _lck = (int*)_shm;
                _ms = new LoopMemoryStream((byte*)_shm + sizeof(int*), _size - sizeof(int*));
                //_ms.ClearData();//打开的同时清理
    
                _head = _ms.Head;
                _tail = _ms.Tail;
            }
    
            private void* GetOrCreateMMFView()
            {
                IntPtr mmf = Win32API.OpenFileMapping(Win32API.FileMapAccess.FileMapAllAccess, false, _mmfName);
                if (mmf == IntPtr.Zero)
                {
                    mmf = Win32API.CreateFileMapping(new SafeFileHandle(new IntPtr(-1), true), Win32API.FileMapProtection.PageReadWrite, _size, _mmfName);
                    if (mmf == IntPtr.Zero)
                        throw new Win32Exception();
                }
    
                IntPtr mvf = Win32API.MapViewOfFile(mmf, Win32API.FileMapAccess.FileMapWrite | Win32API.FileMapAccess.FileMapRead, 0, _size);
                if (mvf == IntPtr.Zero)
                    throw new Win32Exception();
    
                return mvf.ToPointer();
            }
    
            //SpinWait 每20次会有一次系统时间片切换
            //清理数据(挂的时候数据一致性是问题,全部删掉)
            //然后相当于获取锁往下执行
            //测试发现Count=10w时,wait时间为5s
            private void TryEnterLock()
            {
                SpinWait wait = new SpinWait();
                int head = *_head;
                int tail = *_tail;
                int count = 0;
    
                while (Interlocked.CompareExchange(ref *_lck, 1, 0) != 0)
                {
                    wait.SpinOnce();
    
                    count++;
                    if (head != *_head || tail != *_tail)
                    {
                        head = *_head;
                        tail = *_tail;
                        count = 0;
                    }
    
                    if (count > 100000)
                    {
                        Console.WriteLine("ClearData");
                        _ms.ClearData();
                        break;
                    }
                }
            }
    
            private void ExitLock()
            {
                *_lck = 0;
            }
            public QueueResult TryAppend(byte[] data)
            {
                return TryAppend(data, 0, data.Length);
            }
    
            public QueueResult TryAppend(byte[] data, int offsize, int length)
            {
                int realsize = 4 + length;
                if (realsize > _realSize)
                    throw new OverflowException();
    
                TryEnterLock();
    
                if (_ms.AvailLen < realsize)
                {
                    ExitLock();
                    return QueueResult.FULL;
                }
                else
                {
                    _ms.Write(data, 0, length);
                    ExitLock();
                    return QueueResult.SUCCESS;
                }
            }
    
            public QueueResult TryDequeue(out byte[] result)
            {
                result = null;
    
                TryEnterLock();
    
                if (_ms.IsEmpty)
                {
                    ExitLock();
                    return QueueResult.EMPTY;
                }
                else
                {
                    result = _ms.Read();
                    ExitLock();
                    return QueueResult.SUCCESS;
                }
            }
        }
        /// <summary>
        /// 每个通道两个MMF队列,一个发送,一个接收
        /// </summary>
        class RpcMMFMessageQueue
        {
            private const int NotityTimes = 1;
            private MMFMessageQueue _repQueue;
            private MMFMessageQueue _rspQueue;
            private Thread _thread;
            private EventWaitHandle _reqWait;
            private EventWaitHandle _rspWait;
            private RpcMMFMode _mode;
            private int _count;
    
            public Action<RpcMMFMessageQueue, byte[]> ReceiveData;
    
            public RpcMMFMessageQueue(string mmfName, RpcMMFMode mode)
            {
                _mode = mode;
                string reqName = string.Format("rpc_mmf_{0}_req", mmfName);
                string rspName = string.Format("rpc_mmf_{0}q_rsp", mmfName);
    
                _repQueue = new MMFMessageQueue(reqName, RpcMMFConfiguration.Current.MMFSize);
                _rspQueue = new MMFMessageQueue(rspName, RpcMMFConfiguration.Current.MMFSize);
    
                _reqWait = RpcMMFHelper.GetOrCreateWaitHandle(reqName+"_wait");
                _rspWait = RpcMMFHelper.GetOrCreateWaitHandle(rspName+"_wait");
    
                _thread = new Thread(DequeueProc);
                _thread.IsBackground = true;
                _thread.Start();
            }
    
            public void Enqueue(byte[] data)
            {
                 Enqueue(data, 0, data.Length);
            }
    
            public void Enqueue(byte[] data, int offsize, int length)
            {
                try
                {
                    var queue = _mode == RpcMMFMode.RpcClient ? _repQueue : _rspQueue;
                    if (queue.TryAppend(data, offsize, length) == MMFMessageQueue.QueueResult.FULL)
                    {
                        throw new RpcMMFException("MMF Queue Full");
                    }
                }
                catch (OverflowException)
                {
                    throw new RpcMMFException("MMF Queue Full");
                }
    
                if (Interlocked.Increment(ref _count) == NotityTimes)
                {
                    _count = 0;
                    if (_mode == RpcMMFMode.RpcClient)
                        _reqWait.Set();
                    else
                        _rspWait.Set();
                }
            }
    
            private void DequeueProc()
            {
                while (true)
                {
                    byte[] data;
                    var queue = _mode == RpcMMFMode.RpcServer ? _repQueue : _rspQueue;
    
                    if (queue.TryDequeue(out data) == MMFMessageQueue.QueueResult.EMPTY)
                    {
                        if (_mode == RpcMMFMode.RpcServer)
                            _reqWait.WaitOne(1);
                        else
                            _rspWait.WaitOne(1);
                    }
                    else
                    {
                        if (ReceiveData != null)
                            ReceiveData(this, data);
                    }
                }
            }
        }
     class RpcMMFHelper
        {
            public static EventWaitHandle GetOrCreateWaitHandle(string name)
            {
                EventWaitHandle set = TryCreateWaitHandle(name);
                if (set == null)
                    set = TryOpenWaitHandle(name);
    
                if (set == null)
                    throw new Exception(string.Format("can't open or create eventWaitHandle:{0}", name));
    
                return set;
            }
    
            public unsafe static byte[] GetRpcPacket<T>(T header, RpcBodyBuffer bodyBuffer)
            {
                MemoryStream hms = ProtoBufSerializer.Serialize<T>(header);
                short headLen = (short)hms.Length;
                int bodyLen = bodyBuffer == null ? 0 : bodyBuffer.GetSize();
                int totalLen = 2 + headLen + bodyLen;
    
                byte[] buffer = new byte[totalLen];
                fixed (byte* p = buffer)
                {
                    *(short*)p = headLen;
                    hms.Read(buffer, 2, headLen);
    
                    if (bodyLen > 0)
                    {
                        byte[] body = bodyBuffer.GetByteArray();
                        fixed (byte* src = body)
                        {
                            Win32API.Memcpy(p + 2 + headLen, src, bodyLen);
                        }
                    }
                }
                return buffer;
            }
    
            private static EventWaitHandle TryOpenWaitHandle(string name)
            {
                EventWaitHandle set = null;
                try
                {
                    set = (EventWaitHandle)EventWaitHandle.OpenExisting(name);
                }
                catch (WaitHandleCannotBeOpenedException ex) { }
    
                return set;
            }
    
            private static EventWaitHandle TryCreateWaitHandle(string name)
            {
                EventWaitHandle set = null;
                try
                {
                    set = new EventWaitHandle(false, EventResetMode.AutoReset, name);
                }
                catch (WaitHandleCannotBeOpenedException ex) { }
                
                return set;
            }
        }

     Rpc请求封包:

     /// <summary>
        /// 2byte head Length + header+ body
        /// </summary>
        unsafe class RpcMMFRequestPacket
        {
            private RpcRequest _request;
            private int _sequence;
    
            public RpcRequest Request { get { return _request; } }
            public int Sequence { get { return _sequence; } }
    
            public RpcMMFRequestPacket(byte[] data)
            {
                if (data.Length <= 2)
                    throw new ArgumentException();
                short headLen;
                fixed (byte* p = data)
                {
                    headLen = *(short*)p;
                }
    
                MemoryStream hms = new MemoryStream(data, 2, headLen);
                var header = ProtoBufSerializer.Deserialize<RpcMMFRequestHeader>(hms);
    
                var request = new RpcRequest();
                request.ContextUri = header.ContextUri;
                request.FromComputer = header.FromComputer;
                request.FromService = header.FromService;
                request.Method = header.Method;
                request.Service = header.Service;
                _sequence = header.Sequence;
    
                int bodyLen = data.Length - 2 - headLen;
                if (bodyLen > 0)
                {
                    MemoryStream bms = new MemoryStream(data, 2 + headLen, bodyLen);
                    request.SetBodyStream(bms, bodyLen);
                }
                else
                    request.BodyBuffer = RpcBodyBuffer.EmptyBody;
    
                _request = request;
            }
    
            public static void WriteRequest(RpcMMFMessageQueue queue, IRpcMMFSendingPacket packet)
            {
                RpcMMFRequestHeader header = packet.RequestHeader;
                byte[] buffer = RpcMMFHelper.GetRpcPacket<RpcMMFRequestHeader>(header, packet.BodyBuffer);
                queue.Enqueue(buffer);
            }
        }

    RRC 请求头描述信息:

        [ProtoContract]
        public class RpcMMFRequestHeader
        {
            [ProtoMember(1, IsRequired = true)]
            public int Sequence;
    
            [ProtoMember(2)]
            public string ContextUri;
    
            [ProtoMember(3)]
            public string FromComputer;
    
            [ProtoMember(4)]
            public string FromService;
    
            [ProtoMember(5)]
            public string Service;
    
            [ProtoMember(6)]
            public string Method;
    
            public unsafe void Serialize(Stream stream)
            {
                byte[] b = new byte[4];
                fixed (byte* p = b)
                    *(int*)p = Sequence;
                stream.Write(b, 0, 4);
                WriteStr(stream, ContextUri);
                WriteStr(stream, FromComputer);
                WriteStr(stream, FromService);
                WriteStr(stream, Service);
                WriteStr(stream, Method);
            }
    
            private static unsafe void WriteStr(Stream stream, string str)
            {
               
                byte len = 0;
                if (!string.IsNullOrEmpty(str))
                    len = (byte)str.Length;
    
                if (len > 0)
                {
                    byte[] data = UTF8Encoding.UTF8.GetBytes(str);
                    if (data.Length > byte.MaxValue)
                        throw new NotSupportedException();
    
                    len = (byte)data.Length;
                    stream.WriteByte(len);
                    stream.Write(data, 0, len);
                }
                else
                    stream.WriteByte(0);
            }
    
            private static unsafe string ReadStr(Stream stream)
            {
                byte len = (byte)stream.ReadByte();
                if (len == 0)
                    return null;
                byte[] data = new byte[len];
                stream.Read(data, 0, len);
               
                return UTF8Encoding.UTF8.GetString(data);
            }
    
            public MemoryStream Serialize()
            {
                MemoryStream ms = new MemoryStream();
                Serialize(ms);
                return ms;
            }
    
            public static unsafe RpcMMFRequestHeader Deserialize(Stream stream)
            {
                RpcMMFRequestHeader header = new RpcMMFRequestHeader();
                byte[] b = new byte[4];
                stream.Read(b, 0, 4);
                fixed (byte* p = b)
                    header.Sequence = *(int*)p;
    
                header.ContextUri = ReadStr(stream);
                header.FromComputer = ReadStr(stream);
                header.FromService = ReadStr(stream);
                header.Service = ReadStr(stream);
                header.Method = ReadStr(stream);
                return header;
            }
        }

    RPC 应答封包:

       /// <summary>
        ///  2byte head Length + header+ body
        /// </summary>
        unsafe class RpcMMFResponsePacket
        {
            private RpcResponse _response;
            private int _sequence;
            public int Sequence { get { return _sequence; } }
            public RpcResponse Response { get { return _response; } }
    
            public RpcMMFResponsePacket(byte[] data)
            {
                if (data.Length <= 2)
                    throw new ArgumentException();
                short headLen;
                fixed (byte* p = data)
                {
                    headLen = *(short*)p;
                }
    
                MemoryStream hms = new MemoryStream(data, 2, headLen);
                var header = ProtoBufSerializer.Deserialize<RpcMMFResponseHeader>(hms);
                int bodyLen = data.Length - 2 - headLen;
    
                RpcBodyBuffer body = null;
                if (bodyLen > 0)
                {
                    MemoryStream bs = new MemoryStream(data, 2 + headLen, bodyLen, false);
                    body = new RpcBodyBuffer(bs, bodyLen);
                }
                else
                    body = RpcBodyBuffer.EmptyBody;
    
                _sequence = header.Sequence;
                _response = new RpcResponse((RpcErrorCode)header.ResponseCode, body);
            }
    
            public static void WriteResponse(RpcMMFMessageQueue queue, IRpcMMFSendingPacket packet)
            {
                RpcMMFResponseHeader header = packet.ResponseHeader;
                byte[] buffer = RpcMMFHelper.GetRpcPacket<RpcMMFResponseHeader>(header, packet.BodyBuffer);
                queue.Enqueue(buffer);
            }
        }

    应答头:

     [ProtoContract]
        public class RpcMMFResponseHeader
        {
            [ProtoMember(1, IsRequired = true)]
            public int Sequence;
    
            [ProtoMember(2, IsRequired = true)]
            public int ResponseCode;
    
            [ProtoMember(3, IsRequired = true)]
            public int BodyLength;
        }

    事务管理:

      static class RpcMMFTransactionManager
        {
    private static Thread _thread;
            private static ConcurrentDictionary<int, RpcMMFClientTransaction> _dict;
            private static Stopwatch _watch;
            private static object _sync = new object();
            public static void Initialize()
            {
                if (_dict == null)
                    lock (_sync)
                    {
                        _watch = new Stopwatch();
                        _watch.Start();
    
                        _dict = new ConcurrentDictionary<int, RpcMMFClientTransaction>(16, 64 * 1024);
                        _thread = new Thread(MonitorProc);
                        _thread.IsBackground = true;
                        _thread.Start();
                    }
            }
    
            public static void BeginTransaction(RpcMMFClientTransaction tx)
            {
                tx.Tickets = _watch.ElapsedMilliseconds;
    
                if (!_dict.TryAdd(tx.Sequence, tx))
                {
                    tx.SendFailed(RpcErrorCode.SendFailed, null);
                    _tracing.ErrorFmt("sequence key same,{0}", ObjectHelper.DumpObject(tx.RequestHeader));
                }
            }
    
            public static void EndTransaction(int seq, RpcResponse response)
            {
    
                RpcMMFClientTransaction tx;
                if (_dict.TryRemove(seq, out tx))
                {
                  tx.Callback(response);
                }
                else
                {
                    _tracing.ErrorFmt("out of band sequence:{0},{1}", seq, ObjectHelper.DumpObject(response));
                }
            }
    
            private static void MonitorProc()
            {
                while (true)
                {
                    try
                    {
                        long currentTickets = _watch.ElapsedMilliseconds;
                        foreach (var kv in _dict)
                        {
                            if (kv.Value.Tickets + (long)kv.Value.Timeout < currentTickets)
                            {
                                RpcResponse rsp = new RpcResponse(RpcErrorCode.TransactionTimeout, null);
                                EndTransaction(kv.Key, rsp);
    
                                _tracing.Error("transation timeout");
                            }
                        }
                    }
                    catch (ThreadAbortException)
                    {
                        Thread.ResetAbort();
                    }
                    catch (Exception ex)
                    {
                        _tracing.ErrorFmt(ex, "MonitorProc Error");
                    }
    
                    Thread.Sleep(1000);
                }
            }
        }

    主要代码都贴上了,不是完整的Rpc,大家集成大自己Rpc 框架试试。

  • 相关阅读:
    JAVA应用apache httpclient探测http服务
    C#中字符串与byte[]相互转换
    C#中位、字节等知识
    #JAVA操作LDAP
    C#正则表达式判断字符串是否是金钱
    【IDEA】使用Maven骨架创建JavaWeb项目
    【IDEA】回退操作记录
    【SpringMVC】IDEA 不识别webapp的解决办法
    【Layui】16 表单元素 Form
    【Layui】15 日期时间选择器 Laydate
  • 原文地址:https://www.cnblogs.com/lulu/p/2560410.html
Copyright © 2011-2022 走看看