之前看wcf服务的时候看到wcf有支持管道通信协议,之前不知道,最近刚好有用到这个,这里写个简单实例
.net有已经封装好的pip通信的对象NamedPipeServerStream 和NamedPipeClientStream对象,底层应该还是调用C++实现的api实现的
对服务端和客户端做个简单的封装方便调用:
server:
public class PipServer:Log { public Action<string> ReceiveEvent; NamedPipeServerStream m_pipServer; AutoResetEvent monitor = new AutoResetEvent(false); Thread m_thread; bool run = true; string servname; public PipServer(string name) { m_pipServer = new NamedPipeServerStream(name,PipeDirection.InOut, 1, PipeTransmissionMode.Byte, PipeOptions.Asynchronous); servname = name; } public void Listen() { try { m_thread = new Thread(() => { WaitConnect(); }); m_thread.Start(); } catch (Exception ex) { P(ex, "[PipServer.WaitForConnect]"); } } void WaitConnect() { AsyncCallback callback = null; callback = new AsyncCallback(ar => { var pipeServer = (NamedPipeServerStream)ar.AsyncState; pipeServer.EndWaitForConnection(ar); Accept(); pipeServer.Disconnect(); m_pipServer.BeginWaitForConnection(callback, m_pipServer); }); m_pipServer.BeginWaitForConnection(callback, m_pipServer); } void Accept() { try { var res = Read(); if(!string.IsNullOrEmpty(res)) ReceiveEvent?.Invoke(res); } catch(Exception ex) { P(ex, "[PipServer.Accept]"); } } public bool Send(string msg) { try { var buf = Encoding.UTF8.GetBytes(msg); if (m_pipServer.CanWrite) { m_pipServer.Write(buf, 0, buf.Length); m_pipServer.Flush(); return true; } return false; } catch (Exception ex) { P(ex, "[PipServer.Send]"); return false; } } public string Read() { try { if (m_pipServer.CanRead) { int count = 0; List<byte> data = new List<byte>(); byte[] buf = new byte[1024]; do { count=m_pipServer.Read(buf, 0, buf.Length); if (count == buf.Length) { data.AddRange(buf); } else { var dst = new byte[count]; Buffer.BlockCopy(buf, 0, dst, 0, count); data.AddRange(dst); } } while (count > 0&&m_pipServer.CanRead); var res = Encoding.UTF8.GetString(data.ToArray()); return res; } return null; } catch (Exception ex) { P(ex, "[PipServer.Read]"); return null; } } public void Close() { run = false; m_thread.Join(); if (m_pipServer.IsConnected) { m_pipServer.Close(); } } }
client:
public class PipClient:Log { string serv; public PipClient(string server) { serv = server; } public bool Send(string msg) { try { var buf = Encoding.UTF8.GetBytes(msg); NamedPipeClientStream pipclient = new NamedPipeClientStream(serv); pipclient.Connect(3000); if (pipclient.CanWrite) { pipclient.Write(buf, 0, buf.Length); pipclient.Flush(); pipclient.Close(); return true; } return false; } catch (Exception ex) { P(ex, "[PipClient.Send]"); return false; } } }
log类写了一个简单日志打印类,集成下方便打印日志,可以直接去掉继承,吧日志打印去掉:
public class Log { public void L(string msg) { Console.WriteLine(msg); } public void L(string format, params string[] data) { Console.WriteLine(string.Format(format,data)); } public void P(Exception ex, string format, params string[] data) { var msg = string.Format(format, data); Console.WriteLine(string.Format("{0}:{1},{1}", msg, ex.Message, ex.StackTrace)); } }
调用实例:
static void PipTest() { Thread thread = new Thread(() => { PipServer pip = new PipServer("TEST_PIP"); pip.ReceiveEvent += s => { w(string.Format("receive:{0}",s)); }; pip.Listen(); }); thread.Start(); bool send = true; int count = 0; AutoResetEvent monitor = new AutoResetEvent(false); Thread client = new Thread(() => { PipClient ct = new PipClient("TEST_PIP"); while (send) { string msg = string.Format("这是第{0}条数据", count); w(msg); ct.Send(msg); count++; if (monitor.WaitOne(1000)) { break; } } }); client.Start(); while (true) { var input = Console.ReadLine(); if (input == "q" || input == "Q") { send = false; monitor.Set(); break; } } }
运行时,是客户端向服务端每隔一秒发送一次数据
有几个要注意的点:
1 要注意编码方式,怎么编码就怎么解码,最好是要有固定编码,不要直接写string,因为如果是不同的语言和不同平台实现的类,可能default对应的编码方式是不一样的,这样会造成读取乱码
2 这里可以用streamreader来读取,但是不要用readend这种写法,如果发送方不及时调用close方法,这样写会一直卡住,调用flush也没用
3 这里初始化只传入了servername,实际底层的地址是\\.\pipe\TEST_PIP,调试的时候下个断点可以看到的,如果用C++写的话,直接调用API传入的地址就是全名,到C#这边会自动被解析
4 可以再传入的信息上做一些文章,加上ID,发送方和接收方,这样可以实现类似回调的功能,这个是支持双向通信的,这里只有单向
5 类库是支持同步和异步的,这里是异步的等待连接,同步的读取,但是貌似没有直接支持4.5await写法的方法,只有AsyncCallback的写法