zoukankan      html  css  js  c++  java
  • 【基础】利用thrift实现一个非阻塞带有回调机制的客户端

    假设读者对thrift有一定了解。

      客户端有时需要非阻塞的去发送请求,给定服务端一个请求,要求其返回一个计算结果。但是客户端不想等待服务端处理完,而是想发送完这个指令后自己去做其他事情,当结果返回时自动的去处理。

      比如举个形象点的例子:饭店的Boss让小弟A把本周店里的欠条收集起来放到自己桌子上,然后又告诉自己的小秘书坐在自己办公室等着小弟A把欠条拿过来,然后统计一下一共有多少,然后Boss自己出去半点事儿。

      Boss相当于client,小弟A相当于server,而小秘书相当于client端的回调函数(callback)。怎么讲呢?Boss不想等待小弟处理完,因为他老人家公务繁忙,还要去干别的呢。于是他把接下来处理欠条的任务托管给了小秘书,于是自己一个人出去了。

      OK,那么我们基本了解了整个工作流程,来看看实现的方法。thrift去实现client异步+回调的方法关键点在于:thrift生成的client中有个send_XXX()和recv_XXX()方法。send_XXX()相当于告知server去处理东西,可以立即返回;而调用recv_XXX就是个阻塞的方法了,直到server返回结果。所以,我们可以在主线程调用完send_XXX()之后,然后另开一个线程去调用send_XXX(),该线程在等到server回复后自动调用callback方法,对结果进行一些处理(当然callback在修改client状态时需要进行同步操作)。这样的模式下,我们可以做很多事情,比如分布式环境下的观察者模式。当然了需要注意的一点就是,各个线程接受到结果的顺序跟请求顺序不一定一样,因为server处理不通请求时间不通或者网络环境的影响都可能导致这种情形。所以如果你对接受这些结果时不是幂等操作时需要注意一下。

    thrift脚本:

    //只有一个方法,client发送一个消息,server换回一个消息
    service TestServ{
        string ping(1: string message),
    }
    

    server端采用TNBlockingServer实现

     1 #include "TestServ.h"
     2 
     3 #include <iostream>
     4 
     5 #include <thrift/protocol/TBinaryProtocol.h>
     6 #include <thrift/server/TNonblockingServer.h>
     7 #include <thrift/transport/TServerSocket.h>
     8 #include <thrift/transport/TBufferTransports.h>
     9 #include <thrift/concurrency/PosixThreadFactory.h>
    10 
    11 using namespace std;
    12 
    13 using namespace ::apache::thrift;
    14 using namespace ::apache::thrift::protocol;
    15 using namespace ::apache::thrift::transport;
    16 using namespace ::apache::thrift::server;
    17 using namespace ::apache::thrift::concurrency;
    18 
    19 using boost::shared_ptr;
    20 
    21 class TestServHandler : virtual public TestServIf {
    22  public:
    23   TestServHandler() {
    24     // Your initialization goes here
    25   }
    26 
    27   void ping(std::string& _return, const std::string& message) {
    28       _return = "hello, i am server! ";
    29       sleep(3);// do something time-consuming/ 这里我们在server端加一些耗时的操作
    30       cout<<"Request from client: "<<message<<endl;
    31   }
    32 
    33 };
    34 
    35 int main(int argc, char **argv) {
    36   int port = 9090;
    37 
    38   shared_ptr<TestServHandler> handler(new TestServHandler());
    39   shared_ptr<TProcessor> processor(new TestServProcessor(handler));
    40   shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());  
    41   shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(15);  
    42   shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory > (new PosixThreadFactory());  
    43   threadManager->threadFactory(threadFactory);  
    44   threadManager->start();  
    45   TNonblockingServer server(processor, protocolFactory, port, threadManager);  
    46   server.serve();  
    47   return 0;
    48 }

    client端实现:

     1 #include "TestServ.h"
     2 
     3 #include <iostream>
     4 #include <thrift/protocol/TBinaryProtocol.h>
     5 #include <thrift/transport/TSocket.h>
     6 #include <thrift/transport/TBufferTransports.h>
     7 
     8 #include "test_constants.h"
     9 
    10 using namespace std;
    11 using namespace ::apache::thrift;
    12 using namespace ::apache::thrift::protocol;
    13 using namespace ::apache::thrift::transport;
    14 using boost::shared_ptr;
    15 
    16 class AsynTestClient;
    17 void * wait_recv(void * parg );
    18 struct PARG {
    19     AsynTestClient * pthis;
    20     string message;
    21 };
    22 
    23 class AsynTestClient {
    24 private:
    25     unsigned int d_cnt_recv;//< 客户端接受到server响应次数的计数器.
    26 
    27     pthread_rwlock_t m_cnt_recv;//< 计数器的读写锁.
    28     vector<pthread_t> m_ids;
    29 
    30 public:
    31     TestServClient * d_client;
    32     void call_back(string & _return){
    33     //输出服务器返回信息并把返回计数加1
    34     cout<<"server msg: "<<_return<<endl;
    35     pthread_rwlock_wrlock( &m_cnt_recv );
    36     d_cnt_recv ++;
    37     pthread_rwlock_unlock( &m_cnt_recv );
    38     }
    39     explicit AsynTestClient(boost::shared_ptr<TProtocol> & protocol){
    40     pthread_rwlock_init( &m_cnt_recv, NULL );
    41     d_cnt_recv = 0;
    42     d_client = new TestServClient( protocol );
    43     }
    44 
    45     ~AsynTestClient(){
    46     delete d_client;
    47     pthread_rwlock_destroy( &m_cnt_recv );
    48     }
    49 
    50     void asyn_ping( const string & message) {
    51     //发送请求
    52     d_client->send_ping(message);
    53     //初始化每个等待回调线程的参数
    54     PARG * parg = new PARG;
    55     parg->pthis = this;
    56     parg->message = message;
    57     //把新生成的线程id放入全局数组维护
    58     pthread_t m_id;
    59     m_ids.push_back(m_id);
    60     //启动线程,从此只要接受到服务器的返回结果就调用回调函数。
    61     if( 0 != pthread_create( &m_id, NULL, wait_recv, reinterpret_cast< void * > (parg) ) ) {
    62         return;
    63     }
    64     }
    65 };
    66 int main(int argc, char **argv) {
    67 
    68     boost::shared_ptr<TSocket> socket(new TSocket("localhost", 9090));  
    69     boost::shared_ptr<TTransport> transport(new TFramedTransport(socket));  
    70     boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));  
    71 
    72     //TestServClient client(protocol);  
    73 
    74     transport->open();  
    75     AsynTestClient client(protocol);
    76     string message = "hello, i am client! ";
    77     client.asyn_ping(message);
    78 
    79     while(true){
    80     sleep(1);//这里相当于client去做别的事情了
    81     }
    82 
    83     transport->close();  
    84     return 0;  
    85 }
    86 void * wait_recv(void * parg ) {
    87     PARG * t_parg = reinterpret_cast< PARG * >(parg);//强制转化线程参数
    88     string _return;
    89     t_parg->pthis->d_client->recv_ping(_return);
    90     t_parg->pthis->call_back(_return);
    91 }

      其实大家可以注意到,我并没有使用asyn_ping(const string & message, void(*)call_back(void));这种方式去定义它,这是因为asyn_ping本身可以获取callback函数的指针。回调的本质是任务的托管、时间的复用,也就是说等待结果返回后自动去调用一段代码而已,所以本质上上面就是回调机制。如果你想使用传函数指针的方式,也可以实现出来。

      注意:编译时需要-L$(LIB_DIR) -lthrift -lthriftnb -levent。

  • 相关阅读:
    Zuul
    熔断机制
    跨域问题
    过滤器
    从Ftp下载某一文件夹下的所有文件(三)
    java操作Ftp文件的一些方式(一)
    Java代码实现FTP单个文件下载(二)
    一些order
    Spring Boot
    利用dubbo服务对传统工程的改造
  • 原文地址:https://www.cnblogs.com/colorfulkoala/p/3487948.html
Copyright © 2011-2022 走看看