zoukankan      html  css  js  c++  java
  • C#通讯框架改写

    现有项目是利用C#的socket与PLC进行实时通讯,PLC有两种通讯模式——常规采集&高频采集。

    其中常规采集大概在10ms左右发送一次数据,高频采集大概在2ms左右发送一次数据。

    现有代码框架:在与PLC进行连接时,通过建立委托并创建线程的方式,来循环读取数据

    //创建委托
    public delegate void PLC_HD_Receive(byte[] recv_data);
    
    public PLC_HD_Receive PLC_Recv_Delegate_HD;
    
    //给委托绑定方法
    PLC_Recv_Delegate_HD = new PLC_HD_Receive(PLC_Receive_Callback_HD);
    
    //创建线程 PLC_Thread_HD
    = new Thread(new ThreadStart(PLC_ReadThread_HD));
    PLC_Thread_HD.IsBackground
    = true; PLC_Thread_HD.Start();
    //在线程内调用委托
    this.BeginInvoke(this.PLC_Recv_Delegate_HD, new Object[] { recv_buffer_hd });

     只要连接PLC成功后,会一直在后台读取PLC发送来的数据,并解析数据

    现有问题:实时性和数据完整性不够,有些操作会导致socket断掉连接。

    计划:改写现有代码框架,加深对通讯的理解,和对实时数据流的处理。                  2019-5-22

    **************************************************************************************************************************************************

    思路:原有框架读取数据使用的是同步通信,出错时反馈TimeOut错误,先准备改成异步通信

     1                     SocketError socket_error;
     2 
     3                     while (total_length < recv_buffer_len_hd)
     4                     {
     5                        //同步接收数据
     6                         ret_length = m_socket_hd.Receive(recv_buffer_hd, total_length, data_left, SocketFlags.None, out socket_error);
     7                         if (socket_error == SocketError.TimedOut || socket_error == SocketError.Shutdown || socket_error == SocketError.ConnectionAborted || ret_length == 0)
     8                         {
     9                             // 网络不正常,委托退出接收线程
    10                             thread_id = 1;
    11                             this.Invoke(this.PLC_ExitThread_Delegate_HD, new Object[] { thread_id });
    12                             return;
    13                         }
    14                         total_length += ret_length;
    15                         data_left -= ret_length;
    16                     }  

    控制台异步输出数据

    首先搭建一个简单的winform窗口demo,实现控制台异步输出数据

    此处参考链接:https://blog.csdn.net/smartsmile2012/article/details/71172450 异步接收

    但网上搜到的大部分都是服务器接收,项目上的应用是客户端接收,做了一点修改

    搭建的过程中遇到了winform无法直接控制台输出,需要引用AllocConsole()和FreeConsole()

    此处参考链接:https://blog.csdn.net/b510030/article/details/52621312 WinForm添加Console

    在反复点击按钮的过程中发现,AllocConsole()最好在窗口构造函数中使用,否则多次调用AllocConsole()会导致Console.Readkey()报错

      1     public partial class Form1 : Form
      2     {
      3         //winform调用console窗口
      4         [DllImport("Kernel32.dll")]
      5         public static extern Boolean AllocConsole();
      6 
      7         [DllImport("Kernel32.dll")]
      8         public static extern Boolean FreeConsole();
      9         //socket模块
     10         IPAddress ip;
     11         Socket m_sokcet;
     12         IPEndPoint local_endpoint;
     13         byte[] buffer;
    19 public Form1() 20 { 21 buffer = new byte[8]; 22 InitializeComponent();
    25
    AllocConsole(); 26 } 27 28 private void button1_Click(object sender, EventArgs e) 29 { 30 ip = IPAddress.Parse("127.0.0.1"); 31 local_endpoint = new IPEndPoint(ip, 60000); 32 m_sokcet = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); 33 m_sokcet.Connect(local_endpoint); 34 m_sokcet.BeginReceive(buffer, 0, buffer.Length, SocketFlags.None, new AsyncCallback(ReceiveCallback), m_sokcet); 35 Console.ReadKey(); 36 } 71 void ReceiveCallback(IAsyncResult result) 72 { 73 Socket m_sokcet = (Socket)result.AsyncState; 74 m_sokcet.EndReceive(result); 75 result.AsyncWaitHandle.Close();
    91
    Console.WriteLine("收到消息:{0}", Encoding.ASCII.GetString(buffer));
    94 //清空数据,重新开始异步接收 95 buffer = new byte[buffer.Length]; 96 m_sokcet.BeginReceive(buffer, 0, buffer.Length, SocketFlags.None, new AsyncCallback(ReceiveCallback), m_sokcet); 97 } 98 }

    服务端利用socketTool调试工具,发送数据后看控制台窗口的刷新情况,测试结果如下:

    测试结果OK

    **************************************************************************************************************************************************

     流式数据框架构思

     此处参考链接:https://www.csdn.net/article/2014-06-12/2820196-Storm 实时计算

    二.  实时计算的相关技术
    主要分为三个阶段(大多是日志流):
    数据的产生与收集阶段、传输与分析处理阶段、存储对对外提供服务阶段

    链接内说的是大数据和流式处理框架Storm,项目上还远远达不到大数据级别,所以只是参考一下思路。

    数据的产生:PLC,数据的接受:Socket,数据的存储:队列,数据的分析处理:解析数据,数据的对外服务:刷新UI

    框架思路有了,接下来就是具体实现

    **************************************************************************************************************************************************

    生产者-消费者模式和队列

    此处参考链接:https://www.cnblogs.com/samgk/p/4772806.html 队列

     1         //队列模块
     2         readonly static object _locker = new object();
     3         Queue<byte[]> _tasks = new Queue<byte[]>();
     4         EventWaitHandle _wh = new AutoResetEvent(false);
     5         Thread _worker;
     6 
     7         //窗口初始化时开始消费者线程
     8         public Form1()
     9         {
    10             buffer = new byte[8];
    11             InitializeComponent();
    12             _worker = new Thread(Work);
    13             _worker.Start();
    14             AllocConsole();
    15         }
    16 
    17        //加了锁和信号量
    18         void Work()
    19         {
    20             while (true)
    21             {
    22                 byte[] work = null;
    23                 lock (_locker)
    24                 {
    25                     if (_tasks.Count > 0)
    26                     {
    27                         work = _tasks.Dequeue(); // 有任务时,出列任务
    28 
    29                         if (work == null)  // 退出机制:当遇见一个null任务时,代表任务结束
    30                             return;
    31                     }
    32                 }
    33 
    34                 if (work != null)
    35                     SaveData(work);  // 任务不为null时,处理并保存数据
    36                 else
    37                     _wh.WaitOne();   // 没有任务了,等待信号
    38             }
    39         }
    40  
    41         //在异步接收的方法中把控制台输出修改为加入队列
    42         void EnqueueTask(byte[] task)
    43         {
    44             lock (_locker)
    45                 _tasks.Enqueue(task);  // 向队列中插入任务 
    46 
    47             _wh.Set();  // 给工作线程发信号
    48         }
    1                 //TODO 将收到的数据放入队列
    2                 EnqueueTask(buffer);
    3                 Thread.Sleep(10);
    4                 //Console.WriteLine("收到消息:{0}", Encoding.ASCII.GetString(buffer));
    5                 //
    1          void SaveData(byte[] buffer)
    2          {
    3             //从队列中取出数据           
    4             Console.WriteLine("收到消息:{0}", Encoding.ASCII.GetString(buffer));
    5          }

    这样就把数据先存入队列,再取出数据,通过控制台输出数据,实现了生产者-消费者模式和队列存储数据            2019-5-23

    **************************************************************************************************************************************************

     解析数据&刷新UI

     项目真正的业务需求是解析数据和刷新UI,所以我们需要把SaveData方法改造一下

     PLC会源源不断的输出数据,我们需要在接收到数据后对数据进行处理和刷新UI,不可能对每一个数据都进行处理

     而且项目不是大数据级别的,不使用数据库存放数据,纯粹的实时处理,我们需要定义一下处理数据的采集时间和UI的刷新时间

     原有框架的常规采集是16ms,高频采集是2ms,所以在测试阶段定义10ms采集一次,UI刷新500ms一次

    逻辑是在最后解析&刷新时间记录时间戳,和SaveData当前执行时间戳比较,大于10ms则解析,大于500ms则刷新

     1         int count_UI = 0;
     2         int count_Data = 0;
     3         float time_UI = 0F;
     4         float time_Data = 0F;
     5         float time_over_UI = 0F;
     6         float time_over_Data = 0F;
     7         /// <summary>处理保存</summary>
     8         bool SaveData(byte[] buffer)
     9         {
    11             //从队列中取出数据,解析并刷新UI
    13             //解析数据
    14             time_Data = Environment.TickCount - time_over_Data;
    15             time_UI = Environment.TickCount - time_over_UI;
    16             //if (time_Data > 10)//解析数据——10ms一次
    17             //{
    18             //解析数据函数
    19             count_Data++;
    23 Console.WriteLine("解析成功:{0},耗时{1}ms,序号:{2}", Encoding.ASCII.GetString(buffer), time_Data.ToString(), count_Data.ToString()); 24 time_over_Data = Environment.TickCount; 25 //} 26 27 //刷新UI——500ms一次 28 if (time_UI > 500) 29 { 30 //刷新UI函数 31 count_UI++;
    33 Console.WriteLine("刷新UI:{0},耗时{1}ms,序号:{2}", Encoding.ASCII.GetString(buffer), time_UI.ToString(), count_UI.ToString()); 34 time_over_UI = Environment.TickCount; 35 } 36 Thread.Sleep(200);// 模拟数据保存
    37 return true; 38 }

    使用SockeTool发送数据100次,会看到数据被过滤到了一部分

    测试到这里我对时间片有一点疑惑,查阅了一些资料和做了一些实际测试

     此处参考链接:https://zhidao.baidu.com/question/1051646628145878899.html 时间片

    socket处理数据流的速度非常快,如果不加10ms的过滤则每一条数据都会显示在控制台页面,如果加了10ms的过滤则只显示一部分,至于为什么大部分情况下是16ms,和线程调度有关

    我们现在把解析数据的函数和UI调用的函数放在指定的地方就可以实测了。

    **************************************************************************************************************************************************

    socket粘包&服务端断开连接异常&异步接收检测socket通断

    1、粘包——在测试过程中发现,如果buffer的大小与每次发送的数据不一致,会发生粘包现象。

    项目上PLC发送的数据固定为4096字节,所以和服务端保持一致即可。

    2、服务端连接断开——测试的另一个问题是如果服务端断开连接,客户端无法有效监测,回调函数会一直执行。

    3、监测通断——网上查了很多资料,利用select方法和poll方法的,试了一下没有效果,最后采用flag的方式成功在连接异常后终止回调函数

    EndReceive方法会反馈当前获取到的字节数,否则没有数据则为0,如果重复接收20次,每次延时100ms都没有为0,则判断为连接已断。

    项目是和PLC连接,和其他互联网应用有一定的差异。

     1         int flag_connect = 0;
     2         void ReceiveCallback(IAsyncResult result)
     3         {
    11             Socket m_sokcet = (Socket)result.AsyncState;
    12             int a = m_sokcet.EndReceive(result);
    13             result.AsyncWaitHandle.Close();
    14             if (a == 0)
    15             {
    16                 if (flag_connect == 20)
    17                 {
    18                     flag_connect = 0;
    19                     return;
    20                 }
    21                 flag_connect++;
    22                 Thread.Sleep(100);
    23             }
    24             else
    25             {
    27                 EnqueueTask(buffer);32             }
    33             //清空数据,重新开始异步接收
    34             buffer = new byte[buffer.Length];
    35             m_sokcet.BeginReceive(buffer, 0, buffer.Length, SocketFlags.None, new AsyncCallback(ReceiveCallback), m_sokcet);
    36         }

    **************************************************************************************************************************************************

    解析数据连接PLC实测

    上面的测试都是笔记本电脑上利用socketTool测试的,现在开始连接PLC做真实的数据解析测试

    1、测试遇到的问题是,如何断开解析数据线程和异步接收回调函数

    一开始直接使用的是Abort方法,但是效果不好,没有办法再次连接

    查询相关资料后,使用flag的方式来退出线程,使用信号量的方式来结束回调函数

    另外考虑到PLC是无限的数据流,对队列的最大数量做了一个限制,如果超过1000个则停止接收

    此处参考链接:https://blog.csdn.net/pc0de/article/details/52841458 Abort异常

    此处参考链接:https://blog.csdn.net/shizhibuyi1234/article/details/78202647 结束线程

    还有两个线程相关的,Mark一下日后学习

    https://www.cnblogs.com/doforfuture/p/6293926.html 线程池相关
    https://www.cnblogs.com/wjcnet/p/6955756.html  Task

    2、连接断开过程中的,队列内的数据处理。经过测试,最后还是采用信号量的方式

    在队列达到最大数量1000时,异步接收回调函数等待。

    在队列为空时,解析数据线程给异步接收回调函数发信号。

    另外,实测Queue为空时,调用Dequeue会报错队列为空。

    完整代码: 

      1     public partial class Form1 : Form
      2     {
      3         //winform调用console窗口
      4         [DllImport("Kernel32.dll")]
      5         public static extern Boolean AllocConsole();
      6 
      7         [DllImport("Kernel32.dll")]
      8         public static extern Boolean FreeConsole();
      9         //socket模块
     10         IPAddress ip;
     11         Socket m_sokcet;
     12         IPEndPoint local_endpoint;
     13         byte[] buffer;
     14         //队列模块
     15         readonly static object _locker = new object();
     16         Queue<byte[]> _tasks = new Queue<byte[]>();
     17         EventWaitHandle _wh;
     18         EventWaitHandle _recieve_call;
     19         Thread _worker;
     20         public Form1()
     21         {
     22             buffer = new byte[256];
     23             InitializeComponent();
     24             AllocConsole();
     25         }
     26 
     27         private void button1_Click(object sender, EventArgs e)
     28         {
     29             connect_status = true;
     30             if (_wh == null)//队列信号量
     31                 _wh = new AutoResetEvent(false);
     32             if (_recieve_call == null)//队列满或空信号量
     33                 _recieve_call = new AutoResetEvent(false);
     34             _worker = new Thread(Work);
     35             _worker.Start();
     36             if (m_sokcet == null)
     37             {
     38                 ip = IPAddress.Parse("169.254.11.22");//TODO IP修改
     39                 local_endpoint = new IPEndPoint(ip, 2001);//TODO 端口修改
     40                 m_sokcet = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
     41                 m_sokcet.Connect(local_endpoint);
     42             }
     43             m_sokcet.BeginReceive(buffer, 0, buffer.Length, SocketFlags.None, new AsyncCallback(ReceiveCallback), m_sokcet);
     44             //Console.ReadKey();
     45         }
     46 
     47         bool connect_status = false;
     48         int flag_connect = 0;
     49         void ReceiveCallback(IAsyncResult result)
     50         {
     51             if (_tasks.Count > 10000)
     52             {
     53                 //TODO 区分当前连接状态,执行wait还是return
     54                 _recieve_call.WaitOne();
     55                 //return;
     56             }
     57 
     58             Socket m_sokcet = (Socket)result.AsyncState;
     59             int a = m_sokcet.EndReceive(result);
     60             result.AsyncWaitHandle.Close();
     61             if (a == 0)//判断是否与服务端断开连接
     62             {
     63                 if (flag_connect == 20)
     64                 {
     65                     flag_connect = 0;
     66                     return;
     67                 }
     68                 flag_connect++;
     69                 Thread.Sleep(100);
     70             }
     71             else
     72             {
     73                 //TODO 将收到的数据放入队列
     74                 EnqueueTask(buffer);
     75                 //Thread.Sleep(1);
     76                 //Delay(1);
     77                 //Console.WriteLine("收到消息:{0}", Encoding.ASCII.GetString(buffer));
     78                 //
     79             }
     80             //清空数据,重新开始异步接收
     81             buffer = new byte[buffer.Length];
     82             m_sokcet.BeginReceive(buffer, 0, buffer.Length, SocketFlags.None, new AsyncCallback(ReceiveCallback), m_sokcet);
     83         }
     84 
     85         void Work()
     86         {
     87             bool result;
     88             while (connect_status)
     89             {
     90                 byte[] work = null;
     91                 lock (_locker)
     92                 {
     93                     if (_tasks.Count > 0)
     94                     {
     95                         work = _tasks.Dequeue(); // 有任务时,出列任务
     96                     }
     97                     else
     98                     {
     99                         _recieve_call.Set();
    100                         //return;
    101                     }
    102                 }
    103 
    104                 if (work != null)
    105                     result = SaveData(work);  // 任务不为null时,处理并保存数据
    106                 else
    107                     _wh.WaitOne();   // 没有任务了,等待信号
    108             }
    109         }
    110 
    111         /// <summary>插入任务</summary>
    112         void EnqueueTask(byte[] task)
    113         {
    114             lock (_locker)
    115                 _tasks.Enqueue(task);  // 向队列中插入任务 
    116 
    117             _wh.Set();  // 给工作线程发信号
    118         }
    119 
    120         int count_UI = 0;
    121         int count_Data = 0;
    122         float time_UI = 0F;
    123         float time_Data = 0F;
    124         float time_over_UI = 0F;
    125         float time_over_Data = 0F;
    126         /// <summary>处理保存</summary>
    127         bool SaveData(byte[] buffer)
    128         {
    129 
    130             //TODO 从队列中取出数据,解析并刷新UI
    131 
    132             //解析数据——全部解析并保存
    133             time_Data = Environment.TickCount - time_over_Data;
    134             time_UI = Environment.TickCount - time_over_UI;
    135             //if (time_Data > 10)
    136             //{
    137             //解析数据函数
    138             count_Data++;
    139             bool result = PLC_Receive_Callback_HD(buffer);
    140             //Console.WriteLine(count_Data.ToString() + "," + _tasks.Count.ToString() + "," + result.ToString());
    141             //Thread.Sleep(1);
    142             Console.WriteLine("解析成功:{0},耗时{1}ms,序号:{2}", Encoding.ASCII.GetString(buffer), time_Data.ToString(), count_Data.ToString());
    143             time_over_Data = Environment.TickCount;
    144             //}
    145 
    146             //刷新UI——500ms刷新一次
    147             if (time_UI > 500)
    148             {
    149                 //刷新UI函数
    150                 count_UI++;
    151                 //Console.WriteLine(count_UI.ToString() + "," + _tasks.Count.ToString() + "刷新UI成功");
    152                 Console.WriteLine("刷新UI:{0},耗时{1}ms,序号:{2}", Encoding.ASCII.GetString(buffer), time_UI.ToString(), count_UI.ToString());
    153                 time_over_UI = Environment.TickCount;
    154             }
    155             return true;
    156             //Thread.Sleep(200);  // 模拟数据保存
    157         }
    158 
    159         private void button2_Click(object sender, EventArgs e)
    160         {
    161             connect_status = false;
    162             if (_worker != null && _worker.IsAlive)
    163             {
    164                 _wh.Set();
    165                 //_worker.Join();
    166             }
    167         }

     最后加入了解析数据的函数,对4096个字节解析,但是把刷新UI全部屏蔽

    实测PLC_Receive_Callback_HD内900多行代码解析数据很快

    原打算采用异步调用方式调用解析数据函数,现在看来不需要,因为不涉及数据存储

    通讯框架基本改写完成,剩下的就是把刷新UI的函数加上去

    **************************************************************************************************************************************************

    总结:

    参考了网上的很多资料,实现了一个简单的异步通讯和生产者-消费者模式加队列存储,实际测试效果自己还是比较满意的

    果然用轮子不如造轮子,重复造轮子是提升技术的最好方法。                                                                                        2019-5-24

  • 相关阅读:
    xtoi (Hex to Integer) C function Nanoseconds Network
    Learning Scrapy | 王晨的博客
    Facebook搜索项目多名工程师均来自Google
    Beyond the C++ Standard Library: An Introduction to Boost: Björn Karlsson: 9780321133540: Amazon.com: Books
    归并排序 详解
    NewsFeed 3.0 发布,移植到 Python 3 开源中国 OSChina.NET
    对于拷贝构造函数和赋值构造函数的理解
    python 的os.fork()
    Install C++ Boost on Ubuntu
    石川的blog ,注意
  • 原文地址:https://www.cnblogs.com/frank-hang/p/10906227.html
Copyright © 2011-2022 走看看