zoukankan      html  css  js  c++  java
  • Gearman分布式任务处理系统(八)开发讲解

    这篇重点介绍C-Lib库及client和worker的开发,以0.14版libgearman for C来讲解

    Client API

     

    client初始化&析构

    gearman_client_st *gearman_client_create(gearman_client_st *client)

    void gearman_client_free(gearman_client_st *client)

    gearman_return_t gearman_client_add_server(gearman_client_st *client, const char *host, in_port_t port);

    gearman_return_t gearman_client_add_servers(gearman_client_st *client, const char *servers); (一次添加多个gearman job-server)

     

    同步操作

    void * gearman_client_do(gearman_client_st *client, const char *function_name, const char *unique, const void *workload, size_t workload_size, size_t *result_size, gearman_return_t *ret_ptr);

    其中:

    i. unique: 的作用是client添加job给worker的时候的一个唯一标识,可选,默认是NULL

    ii. workload & workload_size指代执行任务的详细参数及其大小

    iii. result_size [out],指代返回数据的大小

    iv. ret_ptr [out], 指代gearman的返回status,以下是官方对于返回status的一些说明:

    In the case of GEARMAN_WORK_DATA, GEARMAN_WORK_WARNING, or GEARMAN_WORK_STATUS, the caller should take any actions to handle the event and then call this function again. This may happen multiple times until a GEARMAN_WORK_ERROR, GEARMAN_WORK_FAIL, or GEARMAN_SUCCESS (work complete) is returned. For GEARMAN_WORK_DATA or GEARMAN_WORK_WARNING, the result_size will be set to the intermediate data chunk being returned and an allocated data buffer will be returned. For GEARMAN_WORK_STATUS, the caller can use gearman_client_do_status() to get the current tasks status.

    总而言之,只有GEARMAN_WORK_ERROR/GEARMAN_WORK_FAIL/GEARMAN_SUCCESS才是三个最终的返回结果,其他的只是临时中间结果,需要进一步调用接受结果的函数(感觉中间结果只有在异步调用过程中才会出现)

    v. 输出是返回数据的起始地址,一旦用户用完之后,必须free,否则会出现内存泄露。

     

    void gearman_set_timeout(gearman_universal_st *gearman, int timeout);

    设置gearman_client_do多长时调用无返回则超时时间

     

    异步callback操作

    Gearman通过使用gearman_client_add_task()来望gearman_client_st中添加task,通过gearman_client_set_created_fn() / gearman_client_set_complete_fn()等来注册callback function,通过gearman_client_run_tasks()来运行gearman_client_st中的task。

     

    异步background操作

    系统在background运行job,client定期获得job运行结果,如果成功则返回,反之则继续等待。

    gearman_return_t gearman_client_do_background(gearman_client_st *client, const char *function_name, const char *unique, const void *workload, size_t workload_size, char *job_handle);

    i. job_handle [out]: 一个job的标识符

    ii. 输出:返回状态

    * gearman_return_t gearman_client_job_status(gearman_client_st *client, gearman_job_handle_t job_handle, bool *is_known, bool * is_running, uint32_t *numerator, uint32_t *denominator);

    * 用户获得在background执行的job的状态

    i. is_known [out]: Optional parameter to store the known status in

    ii. is_running [out]: Optional parameter to store the running status in

    iii. numerator [out]: Optional parameter to store the numerator in

    iv. denominator [out]: Optional parameter to store the denominator in

    PS: 好像background操作不怎么好使,不知道如何通过获得background的运行结果,这个是我一直困惑的

     

    gearman_client_st的一些属性

    gearman_client_st一共有以下3种运行属性:

    i. GEARMAN_CLIENT_NON_BLOCKING: client运行在non-blocking mode

    ii. GEARMAN_CLIENT_FREE_TASKS: 在task执行完成之后,自动的释放task

    iii. GEARMAN_CLIENT_UNBUFFERED_RESULT: Allow the client to read data in chunks rather than have the library buffer the entire data result and pass that back。

    可以通过函数gearman_client_add_options() / gearman_client_remove_options() / gearman_client_has_option() 等进行属性添加/删除/判断等

     

    Worker API

    /**
     * Initialize a worker structure. Always check the return value even if passing
     * in a pre-allocated structure. Some other initialization may have failed. It
     * is not required to memset() a structure before providing it.
     *
     * @param[in] worker Caller allocated structure, or NULL to allocate one.
     * @return On success, a pointer to the (possibly allocated) structure. On
     *  failure this will be NULL.
     */
    GEARMAN_API
    gearman_worker_st *gearman_worker_create(gearman_worker_st *worker);


    /**
     * Free resources used by a worker structure.
     *
     * @param[in] worker Structure previously initialized with
     *  gearman_worker_create() or gearman_worker_clone().
     */
    GEARMAN_API
    void gearman_worker_free(gearman_worker_st *worker);


    /**
     * Add a job server to a worker. This goes into a list of servers that can be
     * used to run tasks. No socket I/O happens here, it is just added to a list.
     *
     * @param[in] worker Structure previously initialized with
     *  gearman_worker_create() or gearman_worker_clone().
     * @param[in] host Hostname or IP address (IPv4 or IPv6) of the server to add.
     * @param[in] port Port of the server to add.
     * @return Standard gearman return value.
     */
    GEARMAN_API
    gearman_return_t gearman_worker_add_server(gearman_worker_st *worker,
                                               const char *host, in_port_t port);


    /**
     * Add a list of job servers to a worker. The format for the server list is:
     * SERVER[:PORT][,SERVER[:PORT]]...
     * Some examples are:
     * 10.0.0.1,10.0.0.2,10.0.0.3
     * localhost LIBGEARMAN_BITFIELD234,jobserver2.domain.com:7003,10.0.0.3
     *
     * @param[in] worker Structure previously initialized with
     *  gearman_worker_create() or gearman_worker_clone().
     * @param[in] servers Server list described above.
     * @return Standard gearman return value.
     */
    GEARMAN_API
    gearman_return_t gearman_worker_add_servers(gearman_worker_st *worker,
                                                const char *servers);
                                                
    /**
     * Register and add callback function for worker. To remove functions that have
     * been added, call gearman_worker_unregister() or
     * gearman_worker_unregister_all().
     *
     * @param[in] worker Structure previously initialized with
     *  gearman_worker_create() or gearman_worker_clone().
     * @param[in] function_name Function name to register.
     * @param[in] timeout Optional timeout (in seconds) that specifies the maximum
     *  time a job should. This is enforced on the job server. A value of 0 means
     *  an infinite time.
     * @param[in] function Function to run when there is a job ready.
     * @param[in] context Argument to pass into the callback function.
     * @return Standard gearman return value.
     */
    GEARMAN_API
    gearman_return_t gearman_worker_add_function(gearman_worker_st *worker,
                                                 const char *function_name,
                                                 uint32_t timeout,
                                                 gearman_worker_fn *function,
                                                 void *context);


    /**
     * Wait for a job and call the appropriate callback function when it gets one.
     *
     * @param[in] worker Structure previously initialized with
     *  gearman_worker_create() or gearman_worker_clone().
     * @return Standard gearman return value.
     */
    GEARMAN_API
    gearman_return_t gearman_worker_work(gearman_worker_st *worker);

     

    /**
     * See gearman_universal_set_timeout() for details.
     */
    GEARMAN_API
    void gearman_worker_set_timeout(gearman_worker_st *worker, int timeout);

     

    开发实例

    下面这个实例程序是,jfy_client发送test,jfy_worker返回test->result

     

    1. /*  
    2.   gearman client 测试程序  
    3.   gcc -o jfy_client jfy_client.c -I/usr/local/gearman/include -L/usr/local/gearman/lib -lgearman  
    4.   ./jfy_client "this is a test"  
    5. */  
    6.   
    7. #include <stdio.h>  
    8. #include <stdlib.h>  
    9. #include <string.h>  
    10. #include <unistd.h>  
    11.   
    12. #include <libgearman/gearman.h>  
    13.   
    14. static void usage(char *name);  
    15.   
    16. int main(int argc, char *argv[])  
    17. {  
    18.   int i;  
    19.   gearman_return_t ret;  
    20.   gearman_client_st client;  
    21.   char *result;  
    22.   size_t result_size;  
    23.   uint32_t numerator;  
    24.   uint32_t denominator;  
    25.   
    26.   char *host = "localhost", *port = "4730";  
    27.   
    28.   if (gearman_client_create(&client) == NULL)  
    29.   {  
    30.     fprintf(stderr, "Memory allocation failure on client creation ");  
    31.     exit(1);  
    32.   }  
    33.   
    34.   gearman_client_set_options(&client, GEARMAN_CLIENT_FREE_TASKS);  
    35.   gearman_client_set_timeout(&client, 15000);  
    36.   
    37.   ret= gearman_client_add_server(&client, host, atoi(port));  
    38.   if (ret != GEARMAN_SUCCESS)  
    39.   {  
    40.     fprintf(stderr, "%s ", gearman_client_error(&client));  
    41.     exit(1);  
    42.   }  
    43.   
    44.   for (i=0;i<10;i++)  
    45.   {  
    46.     result= (char *)gearman_client_do(&client, "jfytest", NULL,  
    47.                                       (void *)argv[1],  
    48.                                       (size_t)strlen(argv[1]),  
    49.                                       &result_size, &ret);  
    50.     if (ret == GEARMAN_WORK_DATA)  
    51.     {  
    52.       printf("Data=%.*s ", (int)result_size, result);  
    53.       free(result);  
    54.     }  
    55.     else if (ret == GEARMAN_WORK_STATUS)  
    56.     {  
    57.       gearman_client_do_status(&client, &numerator, &denominator);  
    58.       printf("Status: %u/%u ", numerator, denominator);  
    59.     }  
    60.     else if (ret == GEARMAN_SUCCESS)  
    61.     {  
    62.       char result2[1024];  
    63.       strncpy(result2, result, result_size);  
    64.       result2[result_size] = 0;  
    65.       printf("result_size=%d,result=%s= ", (int)result_size, result2);  
    66.       free(result);  
    67.     }  
    68.     else if (ret == GEARMAN_WORK_FAIL)  
    69.       fprintf(stderr, "Work failed ");  
    70.     else if (ret == GEARMAN_TIMEOUT)  
    71.     {  
    72.       fprintf(stderr, "Work timeout ");  
    73.     } else {  
    74.       fprintf(stderr, "%d,%s ", gearman_client_errno(&client), gearman_client_error(&client));  
    75.     }  
    76.     printf("sleep 5s ... ");  
    77.     sleep(5);  
    78.   }  
    79.   
    80.   gearman_client_free(&client);  
    81.   
    82.   return 0;  
    83. }  
    1. /*  
    2.   gearman worker 测试程序  
    3.   gcc -o jfy_worker jfy_worker2.c -I/usr/local/gearman/include -L/usr/local/gearman/lib -lgearman  
    4.   ./jfy_worker ./jfy_worker.tr  
    5. */  
    6.   
    7. #include <errno.h>  
    8. #include <stdio.h>  
    9. #include <stdlib.h>  
    10. #include <string.h>  
    11. #include <unistd.h>  
    12.   
    13. #include <libgearman/gearman.h>  
    14.   
    15. static void *jfytest(gearman_job_st *job, void *context, size_t *result_size,  
    16.                      gearman_return_t *ret_ptr);  
    17.   
    18. int main(int argc, char *argv[])  
    19. {  
    20.   gearman_return_t ret;  
    21.   gearman_worker_st worker;  
    22.   
    23.   char *host = "localhost", *port = "4730";  
    24.   
    25.   if (gearman_worker_create(&worker) == NULL)  
    26.   {  
    27.     printf("%s ", gearman_worker_error(&worker));  
    28.     exit(1);  
    29.   }  
    30.   
    31.   ret= gearman_worker_add_server(&worker, host, atoi(port));  
    32.   if (ret != GEARMAN_SUCCESS)  
    33.   {  
    34.     printf("%s ", gearman_worker_error(&worker));  
    35.     exit(1);  
    36.   }  
    37.   
    38.   ret= gearman_worker_add_function(&worker, "jfytest", 0, jfytest, NULL);  
    39.   if (ret != GEARMAN_SUCCESS)  
    40.   {  
    41.     printf("%s ", gearman_worker_error(&worker));  
    42.     exit(1);  
    43.   }  
    44.   
    45.   printf("wait job ... ");  
    46.   while (1)  
    47.   {  
    48.     ret= gearman_worker_work(&worker);  
    49.     if (ret != GEARMAN_SUCCESS)  
    50.     {  
    51.       printf("%s ", gearman_worker_error(&worker));  
    52.       break;  
    53.     }  
    54.   }  
    55.   
    56.   gearman_worker_free(&worker);  
    57.   
    58.   return 0;  
    59. }  
    60.   
    61. static void *jfytest(gearman_job_st *job, void *context, size_t *result_size,  
    62.                      gearman_return_t *ret_ptr)  
    63. {  
    64.   const uint8_t *workload;  
    65.   char *request,*result;  
    66.   
    67.   workload= gearman_job_workload(job);  
    68.   *result_size= gearman_job_workload_size(job);  
    69.   
    70.   request= malloc(1024);  
    71.   if (result == NULL)  
    72.   {  
    73.     printf("malloc request:%d ", errno);  
    74.     *ret_ptr= GEARMAN_WORK_FAIL;  
    75.     return NULL;  
    76.   }  
    77.   
    78.   snprintf((char *)request, *result_size+1, "%s", (char *)workload);  
    79.   printf("job=%s,result_size=%d,request=%s ", gearman_job_handle(job),*result_size,request);  
    80.   
    81.   result= malloc(1024);  
    82.   if (result == NULL)  
    83.   {  
    84.     printf("malloc result:%d ", errno);  
    85.     *ret_ptr= GEARMAN_WORK_FAIL;  
    86.     return NULL;  
    87.   }  
    88.   
    89.   *ret_ptr= GEARMAN_SUCCESS;  
    90.   
    91.   sprintf((char *)result, "%s->result", (char *)request);  
    92.   *result_size= strlen((char *)result);  
    93.   
    94.   printf("job=%s,result_size=%d,result=%s ", gearman_job_handle(job),*result_size,result);  
    95.   
    96.   return result;  
    97. }  

     

    下面的实例是PHP程序,客户端发送"hello!"

    worker端是两个程序,一个是阻塞方式的,一个是非阻塞方式的

    1. <?php  
    2. /* 
    3.  * send "Hello!" 
    4.  */  
    5.   
    6. echo "Starting ";  
    7.   
    8. # Create our client object.  
    9. $gmclientnew GearmanClient();  
    10.   
    11. # Add default server (localhost).  
    12. $gmclient->addServer();  
    13.   
    14. echo "Sending job ";  
    15.   
    16. # Send reverse job  
    17. do  
    18. {  
    19.   $result$gmclient->do("reverse""Hello!");  
    20.   # Check for various return packets and errors.  
    21.   switch($gmclient->returnCode())  
    22.   {  
    23.     case GEARMAN_WORK_DATA:  
    24.       echo "Data: $result ";  
    25.       break;  
    26.     case GEARMAN_WORK_STATUS:  
    27.       list($numerator$denominator)= $gmclient->doStatus();  
    28.       echo "Status: $numerator/$denominator complete ";  
    29.       break;  
    30.     case GEARMAN_SUCCESS:  
    31.       break;  
    32.     default:  
    33.       echo "RET: " . $gmclient->returnCode() . " ";  
    34.       exit;  
    35.   }  
    36. }  
    37. while($gmclient->returnCode() != GEARMAN_SUCCESS);  
    38. echo "Success: $result ";  
    39.   
    40. ?>  



    1. <?php  
    2. /* 
    3.  * 阻塞方式Worker,处理"Hello!"转换为"!olleH" 
    4.  */  
    5.   
    6. echo "Starting ";  
    7.   
    8. # Create our worker object.  
    9. $gmworkernew GearmanWorker();  
    10. $gmworker->setTimeout(5000);  
    11.   
    12. # Add default server (localhost).  
    13. $gmworker->addServer();  
    14.   
    15. # Register function "reverse" with the server. Change the worker function to  
    16. "reverse_fn_fast" for a faster worker with no output.  
    17. $gmworker->addFunction("reverse""reverse_fn");  
    18.   
    19. print "Waiting for job... ";  
    20. while($gmworker->work())  
    21. {  
    22.   if ($gmworker->returnCode() != GEARMAN_SUCCESS)  
    23.   {  
    24.     echo "return_code: " . $gmworker->returnCode() . " ";  
    25.     break;  
    26.   }  
    27.   echo "receve and proced a job!";  
    28. }  
    29.   
    30. function reverse_fn($job)  
    31. {  
    32.   echo "Received job: " . $job->handle() . " ";  
    33.   
    34.   $workload$job->workload();  
    35.   $workload_size$job->workloadSize();  
    36.   
    37.   echo "Workload: $workload ($workload_size) ";  
    38.   
    39.   # This status loop is not needed, just showing how it works  
    40.   for ($x= 0; $x < $workload_size$x++)  
    41.   {  
    42.     echo "Sending status: $x/$workload_size complete ";  
    43.     /* 
    44.     $job->sendStatus($x, $workload_size); 
    45.     sleep(1); 
    46.     */  
    47.   }  
    48.   
    49.   $resultstrrev($workload);  
    50.   echo "Result: $result ";  
    51.   
    52.   # Return what we want to send back to the client.  
    53.   return $result;  
    54. }  
    55.   
    56. # A much simpler and less verbose version of the above function would be:  
    57. function reverse_fn_fast($job)  
    58. {  
    59.   return strrev($job->workload());  
    60. }  
    61.   
    62. ?>  
      1. <?php  
      2. /* 
      3.  * 非阻塞方式Worker,处理"Hello!"转换为"!olleH" 
      4.  */  
      5.   
      6. echo "Starting ";  
      7.   
      8. # Create our worker object.  
      9. $gmworkernew GearmanWorker();  
      10. $gmworker->setTimeout(1000);  
      11. $gmworker->addOptions(GEARMAN_WORKER_NON_BLOCKING); # Make the worker non-blocking  
      12.   
      13. # Add default server (localhost).  
      14. $gmworker->addServer();  
      15.   
      16. # Register function "reverse" with the server. Change the worker function to  
      17. "reverse_fn_fast" for a faster worker with no output.  
      18. $gmworker->addFunction("reverse""reverse_fn");  
      19.   
      20. print "Waiting for job... ";  
      21.   
      22. while ( ($ret = $gmworker->work()) || $gmworker->returnCode() == GEARMAN_IO_WAIT || $gmworker->returnCode() == GEARMAN_NO_JOBS) {  
      23.   echo "return_code: " . $gmworker->returnCode() . " ";  
      24.   if ($gmworker->returnCode() == GEARMAN_SUCCESS) {  
      25.     continue;  
      26.   }  
      27.   if ( !$gmworker->wait() ) {  
      28.     echo "return_code: " . $gmworker->returnCode() . " ";  
      29.     if ($gmworker->returnCode() == GEARMAN_NO_ACTIVE_FDS) {  
      30.       # We are not connected to any servers, so wait a bit before  
      31.       # trying to reconnect. sleep(5);  
      32.       continue;  
      33.     }  
      34.     break;  
      35.   }  
      36. }  
      37.   
      38. function reverse_fn($job)  
      39. {  
      40.   echo "Received job: " . $job->handle() . " ";  
      41.   
      42.   $workload$job->workload();  
      43.   $workload_size$job->workloadSize();  
      44.   
      45.   echo "Workload: $workload ($workload_size) ";  
      46.   
      47.   # This status loop is not needed, just showing how it works  
      48.   for ($x= 0; $x < $workload_size$x++)  
      49.   {  
      50.     echo "Sending status: $x/$workload_size complete ";  
      51.     /* 
      52.     $job->sendStatus($x, $workload_size); 
      53.     sleep(1); 
      54.     */  
      55.   }  
      56.   
      57.   $resultstrrev($workload);  
      58.   echo "Result: $result ";  
      59.   
      60.   # Return what we want to send back to the client.  
      61.   return $result;  
      62. }  
      63.   
      64. # A much simpler and less verbose version of the above function would be:  
      65. function reverse_fn_fast($job)  
      66. {  
      67.   return strrev($job->workload());  
      68. }  
      69.   
      70. ?> 
  • 相关阅读:
    luogu P2852 [USACO06DEC]Milk Patterns G
    FZOJ 4267 树上统计
    CF1303G Sum of Prefix Sums
    luogu P5311 [Ynoi2011]成都七中
    luogu P5306 [COCI2019] Transport
    SP34096 DIVCNTK
    luogu P5325 【模板】Min_25筛
    luogu P1742 最小圆覆盖
    求两直线交点坐标
    1098: 复合函数求值(函数专题)
  • 原文地址:https://www.cnblogs.com/shenming/p/3628358.html
Copyright © 2011-2022 走看看