zoukankan      html  css  js  c++  java
  • Socket实现发布订阅

    片头语:

          由于工作需要,最近一直在蛋疼这些事情,实在分身不暇。原计划1周一篇的WPF系列,也就此搁置了。不过俗话说的好,生活就像强X,反抗不了,就去享受吧……

      在这段时间里,把一直不太懂的Socket编程一步一步看了一些,也算收获颇丰了。在这里给初学者推荐一篇学习Socket的博客,园子里著名高帅富JimmyZhang写的,通俗易懂由浅入深……也解答了我很多疑惑,在这里对博主的文采和原创谨慎表示感谢。

      传送门:http://www.cnblogs.com/JimmyZhang/archive/2008/09/07/1286300.html

    下边进入正题:

        发布订阅机制,跟观察者模式相同,WCF那篇里也有说,这里简单描述一下。

      客户端--注册-->服务

      服务--广播-->客户端

      实现发布订阅,需要在服务端维护已连接客户端列表,并且客户端与服务端之间需要建立可靠的长连接。

      使用Socket实现,服务端的Socket需要创建一个Listener用于监听客户端的连接。而客户端需要Connect到服务器之后,才可以进行消息的交换。

          这里盗一张图,表达一下Socket通讯的生命周期……见谅……

      

          

      我们先看服务端:

        服务端最主要的就是做了2件事,一个是维护一个已连接的客户端列表,另一个就是循环对客户端进行消息广播。

          服务器端的Socket通过Accept()阻塞线程,等待客户端的连接,获取客户端Socket对象。然而我们却不能使用同步的方式进行开发,因为我们除了时刻侦听客户端连接的同时,我们还需要做广播消息的动作,因此这里对Aceept的行为,我们选择使用异步进行实现。代码如下:

      

    PushServer
    /// <summary>
        /// 推送服务类
        /// </summary>
        public class PushServer : IPushServer
        {
            #region 基本参数
            /// <summary>
            /// 监听器
            /// </summary>
            TcpListener listener;
    
            /// <summary>
            /// 客户端列表
            /// </summary>
            List<TcpClient> clientList;
    
            /// <summary>
            /// 异步获取客户端连接回调
            /// </summary>
            AsyncCallback acceptTcpClientCallback;
    
    
            #endregion
    
            #region 构造函数
            public PushServer(string ip, int port)
            {
                //初始化侦听器
                listener = new TcpListener(IPAddress.Parse(ip), port);
                //初始化客户端列表
                clientList = new List<TcpClient>();
    
                //初始化异步操作委托
                acceptTcpClientCallback = new AsyncCallback(OnAcceptCallback);
            }
            #endregion
    
            #region StratListen
            /// <summary>
            /// 开始侦听并推送
            /// </summary>
            public void Start()
            {
                this.Start(0);
            }
    
            /// <summary>
            /// 开始侦听并推送
            /// </summary>
            /// <param name="block"></param>
            public void Start(int block)
            {
                //启动侦听列表
                listener.Start(block);
    
                Console.WriteLine("服务启动...");
    
                //异步操作解决,无法用同步
                listener.BeginAcceptTcpClient(acceptTcpClientCallback, null);
            }
    
            /// <summary>
            /// 异步获取客户端连接的回调
            /// </summary>
            /// <param name="ar"></param>
            private void OnAcceptCallback(IAsyncResult ar)
            {
                try
                {
                    lock (clientList)
                    {
                        TcpClient remoteClient = listener.EndAcceptTcpClient(ar);
    
                        //将客户端添加到列表中
                        clientList.Add(remoteClient);
    
                        Console.WriteLine("加入远程客户端{0}到列表中", remoteClient.Client.RemoteEndPoint);
    
                        //回调等待下一个客户端的加入
                        listener.BeginAcceptTcpClient(acceptTcpClientCallback, null);
                    }
    
                }
                catch (Exception)
                {
                    Console.WriteLine("发生异常,未加入客户端Socket");
                }
            }
            #endregion
    
            #region Send Message
    
            public void SendMessage(string message)
            {
                byte[] buffer = Encoding.UTF8.GetBytes(message);
    
                foreach (var remoteClient in clientList.ToArray())
                {
                    if (remoteClient.Connected)
                    {
                       
                        NetworkStream ns = null;
                        try
                        {
                            //获取流
                            ns = remoteClient.GetStream();
                            //写入数据到客户端缓冲区
                            ns.Write(buffer, 0, buffer.Length);
                            //清空缓冲区 全部发送到客户端
                            ns.Flush();
                        }
                        catch (Exception e)
                        {
                            Console.WriteLine("Socket连接 {0} 出现异常,错误信息:\n{1}", remoteClient.Client.RemoteEndPoint, e.Message);
                            //从广播列表中移除
                            clientList.Remove(remoteClient);
    
                            //进行资源释放
                            if (ns != null)
                                ns.Dispose();
                            remoteClient.Close();
                        }
                    }
                    else
                    {
                        //移除
                        clientList.Remove(remoteClient);
    
                        Console.WriteLine("客户端{0}断开连接,已回收", remoteClient.Client.RemoteEndPoint);
                    }
                }
    
            }
    
            #endregion
        }

      PushServer这个类,封装了服务器的所有行为,我们在这里关注的是它的Start()方法。服务端的Listener 我们使用的是System.Net.Sockets命名空间下的TcpListener类。这个类是微软封装好的用于处理服务端Socket的类型。

      在Start()方法里,我们在建立了服务侦听之后,使用了BeginAcceptTcpClient方法。这是个异步的方法,会开启后台线程等待客户端的连接,不会阻塞主线程的执行。在方法的回调委托中,我们将获取到的客户端连接压入到我们的客户端列表中。在完成客户端的搜集之后,我们应该再次启用BeginAccpetTcpClient进行对客户端的监听,形成一个循环。这么做,是因为服务端每次只能Accept到一个客户端Socket对象,当处理完这个对象之后,需要再次Accept才能得到下一个连接的客户端对象。

      至于SendMessage方法就不多说了,就是通过NetworkStream向客户端写入数据即可。

      NetworkStream中数据的流向是双向的,即客户端的数据可以流向服务端,服务端的数据也可以流向客户端。

      NetworkStream有两个方法Read 与 Write 分别是对数据流的读写操作。

      其中Read方法是会阻塞线程的,也就是说当一端为Read时,线程是阻塞的,除非另一端使用Write写入了一段数据,否则会一直等待下去。

      同时Write方法在写入数据的时候,数据会立刻流向另一端的缓冲区中。

      在这里需要注意的是,在服务器与客户端交换数据的过程中,无论哪一方Close()了NetworkStream对象,都会关闭这个数据流。

      服务端的东西就这么多了,我们来看客户端。

      客户端:

      

    Client
      public class Client
        {
            #region 基本参数
            //为了方便操作,这里设置为全局变量
    
            private TcpClient client;
    
            private AsyncCallback receiveCallback;
    
            public event EventHandler<MessageReceiveEventArgs> MessageReceived;
    
            //设置缓冲区,8196字节
            byte[] buffer = new byte[8196];
    
            //网络流
            NetworkStream ns = null;
            #endregion
    
            #region
    
            public Client()
            {
                client = new TcpClient();
                receiveCallback = new AsyncCallback(OnReceived);
            }
    
            #endregion
    
            #region 连接
    
            #endregion
    
            public void Connect(string ip, int port)
            {
                try
                {
                    //尝试连接
                    client.Connect(IPAddress.Parse(ip), port);
    
                    //输出连接信息
                    Console.WriteLine("连接到远程服务器{0}成功", client.Client.RemoteEndPoint);
    
                    //获取数据流
                    ns = client.GetStream();
    
                    //异步读取信息
                    ns.BeginRead(buffer, 0, buffer.Length, receiveCallback, null);
                }
                catch (Exception e)
                {
                    Console.WriteLine("尝试连接远程服务器失败,错误信息:\n{0}", e.Message);
                }
            }
    
            #region 收取
            /// <summary>
            /// 异步收取回调
            /// </summary>
            /// <param name="ar"></param>
            private void OnReceived(IAsyncResult ar)
            {
                try
                {
                    //获取读取字节数
                    int readBytes = ns.EndRead(ar);
    
                    //如果读取到0字节,则是废包……直接跳出
                    if (readBytes != 0)
                    {
                        //编码
                        string msg = Encoding.UTF8.GetString(buffer, 0, readBytes);
    
                        //抛出事件
                        if (this.MessageReceived != null)
                        {
                            this.MessageReceived(this, new MessageReceiveEventArgs(msg));
                        }
                    }
                    //输出读取到的数据
                    Console.WriteLine("读取了{0}字节的数据", readBytes);
                    //清理数组 避免脏读
                    Array.Clear(buffer, 0, buffer.Length);
                    //递归读取下一条进入缓冲区的数据
                    ns.BeginRead(buffer, 0, buffer.Length, receiveCallback, null);
                }
                catch (Exception e)
                {
                    Console.WriteLine("异步收取失败....");
                    if (ns != null)
                        ns.Dispose();
                    client.Close();
                }
            }
    
            #endregion
        }

      由于客户端是各自独立的,不存在一对多的情况,因为客户端可以完全使用同步操作。本段代码中对NetWorkStream使用了异步操作,使用方式大同小异。就不多说了。

      当客户端Connect到服务端的时候,就会被存到列表中。这时候服务端就可以向客户端广播消息了。当服务端向NetWorkStream中写入数据的时候,客户端阻塞的NetWorkStream的Read()方法就开始读取数据并且在处理完毕后等待下一条消息的写入。

      总体实现大概就是这样,晚些会贴上完整代码。Socket实现发布订阅并不是很难,只要思想正确了,实现的方式,与WCF也是大同小异的。这里没有做粘包的处理,有兴趣的朋友可以自己完成。最近忙得焦头烂额,文章质量可能也不是很好,希望大家可以多多包涵 谢谢了。

      

      Demo地址:猛击这里

  • 相关阅读:
    Quartz.Net 作业调度后台管理系统,基于Extjs
    [备份]EntityFramework
    WebMisSharp升级说明,最新版本1.6.0
    AllPay(欧付宝)支付接口集成
    Paypal Rest Api自定义物流地址(跳过填写物流地址)
    根据IP获取国家
    ViewBag 找不到编译动态表达式所需的一种或多种类型,是否缺少引用?
    Extjs4 DateTimeField,日期时间控件完美版
    IOS Swift 训练
    .Net集成PayPal的Demo
  • 原文地址:https://www.cnblogs.com/ShadowLoki/p/2673099.html
Copyright © 2011-2022 走看看