zoukankan      html  css  js  c++  java
  • 0MQ绑定Delphi版-说明

    0MQ绑定Delphi版

    这是一份ZMQ绑定。测试环境Delphi7,BDS2006,FPC 2.6.0(目前仅Window)。

    概述

    程序包中含有一个dll的wrapper(zmq.pas),和一个高层API(zmqapi.pas)。

    它应该能够工作于ZMQ 2.2.0,和3.2.0 rc1(实验性)。要使用v3.2的dll,在zmq.inc中定义zmq3({$define zmq3})。dll来自于官方发行版

    使用

    你应该使用高层API,那会节省你大量时间,而且附带效果是代码也将更容易阅读。

    首先,你应该创建一个上下文

    1  <!--lang: delphi-->
    2 context := TZMQContext.Create;

    有很多种套接字类型,参见指南,每种都有一个常量。要创建例如一个REP套接字,这么写:

    1 <!--lang: delphi-->
    2 socket := context.Socket( stRep );
    3  
    4 // 绑定套接字
    5 socket.bind( 'tcp://*:5555' );
    6  
    7 // 连接套接字
    8 socket.connect( 'tcp://localhost:5555' );

    API中有很多中方法来发送消息。你可以发送单份,或多份消息,阻塞或非阻塞(v3中称为dontwait不等待)模式。

     <!--lang: delphi-->
    
    // 阻塞模式(默认)发送一个字符串就像这么简单:
     
    socket.send( 'Hello' );
     
    // 或者用非阻塞模式
     
    socket.send( 'Hello', [rsfNoBlock] );
     
    // 这种情况下如果消息无法入队则将抛出一个EZMQException异常,
     
    // 异常类型为EAGAIN。
     
     
     
    // 从stream发送数据(别忘了设置stream的位置到读取位置)
     
    socket.send( stream, size );
     
     
     
    // 发送多段消息。
     
    // 多个字符串:
     
    socket.send( ['Hello','World'] );
     
     
     
    // 这等同于:
     
    socket.send( 'Hello', [rsfSndMore] );
     
    socket.send( 'World' );
     
     
     
    // 或者使用TStrings。
     
    tsl := TStringList.Create;
     
    tsl.Add( 'Hello' );
     
    tsl.Add( 'World' );
     
    socket.send( tsl );
     
    tsl.Free;

    接收消息就像这么容易:

     
    <!--lang: delphi-->
     
     
     
    msize := socket.recv( msg );
     
    // 新的消息到了msg中,而msize持有消息的长度
     
     
     
    // 到一个stream中
     
    msize := socket.recv( stream );
     
     
     
    // 读取多段消息
     
    tsl := TStringList.Create;
     
    mcount := socket.recv( tsl );
     
    // 这将会添加多个消息的部分到字符串列表,并返回接收的消息数量。
    CTRL+C的处理

    这是个小技巧。在Windows中信号处理与posix系统有所不同。阻塞调用不会接收到SIGINT,只会持续的阻塞。要克服这个问题,已安装的处理器终止了上下文,所以阻塞调用例如recvpoll,等等...将接收到ETERM。这仅在Windows。

    如果你将你的无限循环代码写为这样,你可以很干净的终止。

     1  <!--lang: delphi-->
     2 
     3 while not context.Terminated do
     4  
     5 try
     6  
     7 socket.recv( msg );
     8  
     9 except
    10  
    11 // 处理异常,或者
    12  
    13 context.Terminate;
    14  
    15 end;
    16  
    17 context.Free;

    轮询

    轮询可以工作在两种不同方式,让我们称第一种为同步,第二种为异步方式。异步版本创建了一个线程,在其中做轮询。

    • 同步

      <!--lang: delphi-->
      
      // 创建上下文 
      context := TZMQContext.Create; 
      socket := context.Socket( stDealer ); 
      socket.connect( address );
      
      // 创建轮询器。参数true告诉轮询器使用同步轮询 
      poller := TZMQPoller.Create( true );
      
      // 注册套接字。 
      poller.register( socket, [pePollIn] );
      
      timeout := 100; // 100ms 
      while not context.Terminated do 
      begin
      
       
      rc := poller.poll(timeout);
       
      if rc > 0 then
       
      do something...
      end;
      
      poller.Free; 
      socket.Free; 
      context.Free;
    • 异步方式

      这个实现使用了一种reactor模式,轮询器启动了一个新的线程,并在类和创建的线程之间创建了一对套接字连接。所以这个轮询器的实现不是线程安全的,不要在不同线程中去注册、反注册套接字。

    •  
      <!--lang: delphi-->
       
       
       
      procedure TMyClass.pollerEvent( socket: TZMQSocket; event: TZMQPollEvents );
       
      begin
       
      do something...
       
      end;
       
       
       
      // 创建上下文。
       
      context := TZMQContext.Create;
       
       
       
      socket := context.Socket( stDealer );
       
      socket.connect( address );
       
       
       
      // 创建轮询器。第二个参数可以为nil,此时轮询器会创建自己的上下文。
       
      poller := TZMQPoller.Create( false, context );
       
       
       
      poller.onEvent := pollerEvent;
       
       
       
      // 注册套接字。如果第三个参数为true,注册将阻塞直到套接字注册完毕。
       
      poller.register( socket, [pePollIn], false );

    监视套接字(仅在v3.2可用)

    <!--lang: delphi-->
     
    // 像这样定义一个回调
     
    procedure TMyClass.MonitorCallback( event: TZMQEvent );
     
    begin
     
    // do something.
     
    end;
     
    // 注册这个回调
     
    socket.RegisterMonitor( MonitorCallback, cZMQMonitorEventsAll );
     
    // MonitorCallback将会在由RegisterMonitor创建的分离的线程中被调用。
     
    // 你可以这么调用来反注册监视。
     
    socket.DeRegisterMonitor;

    Name

    zmq_socket_monitor - 注册一个监控回调函数

    Synopsis

    int zmq_socket_monitor (void *socket, char * *addr, int events);

    Description

    zmq_socket_monitor() 函数会产生一个PAIR类型的socket,用来把socket状态改变(事件)通过inproc://传输方式广播到制定的终结点(endpoint)上。

    消息包括两个帧,第一部分包含着事件ID和与其相关联的值。第二帧以字符串方式保存收到影响的终结点。

    第一帧的组织方式是:16 bit的事件ID和32 bit的事件值。

    事件ID和19:23值是按照本地的字节序排序的(即这个运行着的进程所在的机器)。在这两部分之间没有其它的数据。

    事件值必须根据根据当前的context和事件ID进行解析。更多细节请查看下面可支持的事件。

    只有方向确定的传输方式(tcp、ipc)才能支持此初始行为。

    支持的事件

    ZMQ_EVENT_CONNECTED:链接已建立

      当和远程的另一端的连接建立好的时候,ZMQ_EVENT_CONNECTED事件会被触发。同步和异步事件都会发生触发此事件。事件值是新连接的socket的FD。

    ZMQ_EVENT_CONNECT_DELAYED:同步连接失败,仍在进行重试

      当一个请求立即连接的尝试被延迟并且仍然在尝试的时候,此事件被触发。事件值没有意义。

    ZMQ_EVENT_CONNECT_RETRIED:尝试异步连接/重连

      当一个连接尝试被重连计时器捕获后此事件被触发。重连间隔根据所有的尝试情况进行计算。事件值是重连间隔。

    ZMQ_EVENT_LISTENING:socket已经绑定了某个地址,准备好接受连接请求

      当一个socket成功的绑定在一个端口上的时候此事件被触发。事件值是新绑定的socket的FD。

    ZMQ_EVENT_BIND_FAILED:socket无法绑定在这个地址上

      当一个socket无法绑定在给定的端口上时此事件被触发。事件值是绑定函数修改后的errno值。

    ZMQ_EVENT_ACCEPTED:连接请求被接受

      一个从远端到来的连接被一个绑定了地址的socket接受并建立了连接是会触发此事件。事件值是被接受socket的FD。

    ZMQ_EVENT_ACCEPT_FAILED:无法接受客户端的连接请求

      当一个连接请求试图连接另一个socket失败的时候会触发此事件。事件值是accept设置的errno值。

    ZMQ_EVENT_CLOSED:连接关闭

      当一个连接的底层描述符被关闭是会触发此事件。事件值是被关闭的socket的FD。此时这个FD已经被关闭了。

    ZMQ_EVENT_CLOSE_FAILED:连接无法被关闭

      当一个描述符无法被释放回OS的时候会触发此事件。注意:只对IPC socket有效。事件值是释放失败时设置的errno值。

    ZMQ_EVENT_DISCONNECTED:会话被破坏

      当流引擎(尤其是TCP、IPC)出现了崩溃的/被破坏的会话时,此事件被触发。事件值是socket的FD。

    Return value

    当函数zmq_socket_monitor() 执行成功时,返回0或者更大值。否则返回 -1,并且设置errno为下列指定值。

    Errors

      ETERM

        与被指定的socket关联的ZMQ context 被终结了。

      EPROTONOSUPPORT

        无法支持被请求的传输协议。监视socket必须要使用inproc://方式进行传输。

      EINVAL

        提供的地址节点不能用。

    Example

      监视一个REP socket的连接状态

    复制代码
     1 #include <stdio.h>
     2 #include <zmq.h>
     3 #include <pthread.h>
     4 #include <string.h>
     5 #include <assert.h>
     6 
     7 static int read_msg(void* s, zmq_event_t* event, char* ep)
     8 {
     9     int rc ;
    10     zmq_msg_t msg1;  // binary part
    11     zmq_msg_init (&msg1);
    12     zmq_msg_t msg2;  //  address part
    13     zmq_msg_init (&msg2);
    14     rc = zmq_msg_recv (&msg1, s, 0);
    15     if (rc == -1 && zmq_errno() == ETERM)
    16         return 1 ;
    17     assert (rc != -1);
    18     assert (zmq_msg_more(&msg1) != 0);
    19     rc = zmq_msg_recv (&msg2, s, 0);
    20     if (rc == -1 && zmq_errno() == ETERM)
    21         return 1;
    22     assert (rc != -1);
    23     assert (zmq_msg_more(&msg2) == 0);
    24     // copy binary data to event struct
    25     const char* data = (char*)zmq_msg_data(&msg1);
    26     memcpy(&(event->event), data, sizeof(event->event));
    27     memcpy(&(event->value), data+sizeof(event->event),       sizeof(event->value));
    28     // copy address part
    29     const size_t len = zmq_msg_size(&msg2) ;
    30     ep = memcpy(ep, zmq_msg_data(&msg2), len);
    31     *(ep + len) = 0 ;
    32     return 0 ;
    33 }
    34 
    35 // REP socket monitor thread
    36 static void *rep_socket_monitor (void *ctx)
    37 {
    38     zmq_event_t event;
    39     static char addr[1025] ;
    40     int rc;
    41     printf("starting monitor...
    ");
    42     void *s = zmq_socket (ctx, ZMQ_PAIR);
    43     assert (s);
    44     rc = zmq_connect (s, "inproc://monitor.rep");
    45     assert (rc == 0);
    46     while (!read_msg(s, &event, addr)) {
    47         switch (event.event) {
    48         case ZMQ_EVENT_LISTENING:
    49             printf ("listening socket descriptor %d
    ", event.value);
    50             printf ("listening socket address %s
    ", addr);
    51             break;
    52         case ZMQ_EVENT_ACCEPTED:
    53             printf ("accepted socket descriptor %d
    ", event.value);
    54             printf ("accepted socket address %s
    ", addr);
    55             break;
    56         case ZMQ_EVENT_CLOSE_FAILED:
    57             printf ("socket close failure error code %d
    ", event.value);
    58             printf ("socket address %s
    ", addr);
    59             break;
    60         case ZMQ_EVENT_CLOSED:
    61             printf ("closed socket descriptor %d
    ", event.value);
    62             printf ("closed socket address %s
    ", addr);
    63             break;
    64         case ZMQ_EVENT_DISCONNECTED:
    65             printf ("disconnected socket descriptor %d
    ", event.value);
    66             printf ("disconnected socket address %s
    ", addr);
    67             break;
    68         }
    69     }
    70     zmq_close (s);
    71     return NULL;
    72 }
    73 
    74 int main()
    75 {
    76     const char* addr = "tcp://127.0.0.1:6666" ;
    77     pthread_t thread ;
    78     //  Create the infrastructure
    79     void *ctx = zmq_init (1);
    80     assert (ctx);
    81     // REP socket
    82     void* rep = zmq_socket (ctx, ZMQ_REP);
    83     assert (rep);
    84     // REP socket monitor, all events
    85     int rc = zmq_socket_monitor (rep, "inproc://monitor.rep", ZMQ_EVENT_ALL);
    86     assert (rc == 0);
    87     rc = pthread_create (&thread, NULL, rep_socket_monitor, ctx);
    88     assert (rc == 0);
    89     rc = zmq_bind (rep, addr);
    90     assert (rc == 0);
    91     // Allow some time for event detection
    92     zmq_sleep (1);
    93     // Close the REP socket
    94     rc = zmq_close (rep);
    95     assert (rc == 0);
    96     zmq_term (ctx);
    97     return 0 ; 
    98 }

    示例

    示例位于 zguide examples/Delphi 目录.

    变更

    • New poller class
    • poll function of TZMQPoller has a new optional parameter "pollCount".
    • Upgrade dll-s to v3.2.2 RC2
    • New monitoring logic implemented.
    • Default ZMQ version for the binding is now 3.2 ( can switch back to 2.2 by not defining zmq3 in thezmq.inc file )

    作者

    以下为项目的贡献人员:

    1.  
      Balazs Varga <bb.varga@gmail.com>
    2.  
      Stathis Gkotsis <stathis.gkotsis@gmail.com>
    3.  
      Stephane Carre <scarre.lu@gmail.com>

    版权

    遵循 GNU Lesser General Public License (LGPL) 条款将被授权自由使用此软件。细节请参看包含在发行内容中的文件COPYING.LESSER

  • 相关阅读:
    各种算法七
    各种算法六
    使用URLConnection调用axis1.4开发的webservice
    JDBC结果集rs.next()注意事项
    URLConnection调用接口
    axis1.4开发webservice客户端(快速入门)-基于jdk1.4
    axis1.4开发webservice服务端(快速入门)-基于jdk1.4
    FMDB数据库的简单实用
    Xcode5 取消项目ARC,或者单个类ARC切换
    用CornerStone配置SVN,HTTP及svn简单使用说明
  • 原文地址:https://www.cnblogs.com/h2zZhou/p/13177256.html
Copyright © 2011-2022 走看看