zoukankan      html  css  js  c++  java
  • c语言使用librdkafka库实现kafka的生产和消费实例(转)

    关于librdkafka库的介绍,可以参考kafka的c/c++高性能客户端librdkafka简介,本文使用librdkafka库来进行kafka的简单的生产、消费

    一、producer

    librdkafka进行kafka生产操作的大致步骤如下:

    1、创建kafka配置

    [cpp] view plain copy
     
    1. rd_kafka_conf_t *rd_kafka_conf_new (void)  

    2、配置kafka各项参数

    [cpp] view plain copy
     
    1. rd_kafka_conf_res_t rd_kafka_conf_set (rd_kafka_conf_t *conf,  
    2.                                        const char *name,  
    3.                                        const char *value,  
    4.                                        char *errstr, size_t errstr_size)  

    3、设置发送回调函数

    [cpp] view plain copy
     
    1. void rd_kafka_conf_set_dr_msg_cb (rd_kafka_conf_t *conf,  
    2.                                   void (*dr_msg_cb) (rd_kafka_t *rk,  
    3.                                   const rd_kafka_message_t *  
    4.                                   rkmessage,  
    5.                                   void *opaque))  

    4、创建producer实例

    [cpp] view plain copy
     
    1. rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *conf,char *errstr, size_t errstr_size)  

    5、实例化topic

    [cpp] view plain copy
     
    1. rd_kafka_topic_t *rd_kafka_topic_new (rd_kafka_t *rk, const char *topic, rd_kafka_topic_conf_t *conf)  

    6、异步调用将消息发送到指定的topic

    [cpp] view plain copy
     
    1. int rd_kafka_produce (rd_kafka_topic_t *rkt, int32_t partition,  
    2.               int msgflags,  
    3.               void *payload, size_t len,  
    4.               const void *key, size_t keylen,  
    5.               void *msg_opaque)  

    7、阻塞等待消息发送完成

    [cpp] view plain copy
     
    1. int rd_kafka_poll (rd_kafka_t *rk, int timeout_ms)  

    8、等待完成producer请求完成

    [cpp] view plain copy
     
    1. rd_kafka_resp_err_t rd_kafka_flush (rd_kafka_t *rk, int timeout_ms)  

    9、销毁topic

    [cpp] view plain copy
     
    1. void rd_kafka_topic_destroy (rd_kafka_topic_t *app_rkt)  

    10、销毁producer实例

    [cpp] view plain copy
     
    1. void rd_kafka_destroy (rd_kafka_t *rk)  


    完整代码如下my_producer.c:

    [cpp] view plain copy
     
    1. #include <stdio.h>  
    2. #include <signal.h>  
    3. #include <string.h>  
    4.   
    5. #include "../src/rdkafka.h"  
    6.   
    7. static int run = 1;  
    8.   
    9. static void stop(int sig){  
    10.     run = 0;  
    11.     fclose(stdin);  
    12. }  
    13.   
    14. /* 
    15.     每条消息调用一次该回调函数,说明消息是传递成功(rkmessage->err == RD_KAFKA_RESP_ERR_NO_ERROR) 
    16.     还是传递失败(rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR) 
    17.     该回调函数由rd_kafka_poll()触发,在应用程序的线程上执行 
    18.  */  
    19. static void dr_msg_cb(rd_kafka_t *rk,  
    20.                       const rd_kafka_message_t *rkmessage, void *opaque){  
    21.         if(rkmessage->err)  
    22.             fprintf(stderr, "%% Message delivery failed: %s ",   
    23.                     rd_kafka_err2str(rkmessage->err));  
    24.         else  
    25.             fprintf(stderr,  
    26.                         "%% Message delivered (%zd bytes, "  
    27.                         "partition %"PRId32") ",  
    28.                         rkmessage->len, rkmessage->partition);  
    29.         /* rkmessage被librdkafka自动销毁*/  
    30. }  
    31.   
    32. int main(int argc, char **argv){  
    33.     rd_kafka_t *rk;            /*Producer instance handle*/  
    34.     rd_kafka_topic_t *rkt;     /*topic对象*/  
    35.     rd_kafka_conf_t *conf;     /*临时配置对象*/  
    36.     char errstr[512];            
    37.     char buf[512];               
    38.     const char *brokers;         
    39.     const char *topic;           
    40.   
    41.     if(argc != 3){  
    42.         fprintf(stderr, "%% Usage: %s <broker> <topic> ", argv[0]);  
    43.         return 1;  
    44.     }  
    45.   
    46.     brokers = argv[1];  
    47.     topic = argv[2];  
    48.   
    49.     /* 创建一个kafka配置占位 */  
    50.     conf = rd_kafka_conf_new();  
    51.   
    52.     /*创建broker集群*/  
    53.     if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr,  
    54.                 sizeof(errstr)) != RD_KAFKA_CONF_OK){  
    55.         fprintf(stderr, "%s ", errstr);  
    56.         return 1;  
    57.     }  
    58.   
    59.     /*设置发送报告回调函数,rd_kafka_produce()接收的每条消息都会调用一次该回调函数 
    60.      *应用程序需要定期调用rd_kafka_poll()来服务排队的发送报告回调函数*/  
    61.     rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);  
    62.   
    63.     /*创建producer实例 
    64.       rd_kafka_new()获取conf对象的所有权,应用程序在此调用之后不得再次引用它*/  
    65.     rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));  
    66.     if(!rk){  
    67.         fprintf(stderr, "%% Failed to create new producer:%s ", errstr);  
    68.         return 1;  
    69.     }  
    70.   
    71.     /*实例化一个或多个topics(`rd_kafka_topic_t`)来提供生产或消费,topic 
    72.     对象保存topic特定的配置,并在内部填充所有可用分区和leader brokers,*/  
    73.     rkt = rd_kafka_topic_new(rk, topic, NULL);  
    74.     if (!rkt){  
    75.         fprintf(stderr, "%% Failed to create topic object: %s ",   
    76.                 rd_kafka_err2str(rd_kafka_last_error()));  
    77.         rd_kafka_destroy(rk);  
    78.         return 1;  
    79.     }  
    80.   
    81.     /*用于中断的信号*/  
    82.     signal(SIGINT, stop);  
    83.   
    84.     fprintf(stderr,  
    85.                 "%% Type some text and hit enter to produce message "  
    86.                 "%% Or just hit enter to only serve delivery reports "  
    87.                 "%% Press Ctrl-C or Ctrl-D to exit ");  
    88.   
    89.      while(run && fgets(buf, sizeof(buf), stdin)){  
    90.         size_t len = strlen(buf);  
    91.   
    92.         if(buf[len-1] == ' ')  
    93.             buf[--len] = '';  
    94.   
    95.         if(len == 0){  
    96.             /*轮询用于事件的kafka handle, 
    97.             事件将导致应用程序提供的回调函数被调用 
    98.             第二个参数是最大阻塞时间,如果设为0,将会是非阻塞的调用*/  
    99.             rd_kafka_poll(rk, 0);  
    100.             continue;  
    101.         }  
    102.   
    103.      retry:  
    104.          /*Send/Produce message. 
    105.            这是一个异步调用,在成功的情况下,只会将消息排入内部producer队列, 
    106.            对broker的实际传递尝试由后台线程处理,之前注册的传递回调函数(dr_msg_cb) 
    107.            用于在消息传递成功或失败时向应用程序发回信号*/  
    108.         if (rd_kafka_produce(  
    109.                     /* Topic object */  
    110.                     rkt,  
    111.                     /*使用内置的分区来选择分区*/  
    112.                     RD_KAFKA_PARTITION_UA,  
    113.                     /*生成payload的副本*/  
    114.                     RD_KAFKA_MSG_F_COPY,  
    115.                     /*消息体和长度*/  
    116.                     buf, len,  
    117.                     /*可选键及其长度*/  
    118.                     NULL, 0,  
    119.                     NULL) == -1){  
    120.             fprintf(stderr,   
    121.                 "%% Failed to produce to topic %s: %s ",   
    122.                 rd_kafka_topic_name(rkt),  
    123.                 rd_kafka_err2str(rd_kafka_last_error()));  
    124.   
    125.             if (rd_kafka_last_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL){  
    126.                 /*如果内部队列满,等待消息传输完成并retry, 
    127.                 内部队列表示要发送的消息和已发送或失败的消息, 
    128.                 内部队列受限于queue.buffering.max.messages配置项*/  
    129.                 rd_kafka_poll(rk, 1000);  
    130.                 goto retry;  
    131.             }     
    132.         }else{  
    133.             fprintf(stderr, "%% Enqueued message (%zd bytes) for topic %s ",   
    134.                 len, rd_kafka_topic_name(rkt));  
    135.         }  
    136.   
    137.         /*producer应用程序应不断地通过以频繁的间隔调用rd_kafka_poll()来为 
    138.         传送报告队列提供服务。在没有生成消息以确定先前生成的消息已发送了其 
    139.         发送报告回调函数(和其他注册过的回调函数)期间,要确保rd_kafka_poll() 
    140.         仍然被调用*/  
    141.         rd_kafka_poll(rk, 0);  
    142.      }  
    143.   
    144.      fprintf(stderr, "%% Flushing final message..  ");  
    145.      /*rd_kafka_flush是rd_kafka_poll()的抽象化, 
    146.      等待所有未完成的produce请求完成,通常在销毁producer实例前完成 
    147.      以确保所有排列中和正在传输的produce请求在销毁前完成*/  
    148.      rd_kafka_flush(rk, 10*1000);  
    149.   
    150.      /* Destroy topic object */  
    151.      rd_kafka_topic_destroy(rkt);  
    152.   
    153.      /* Destroy the producer instance */  
    154.      rd_kafka_destroy(rk);  
    155.   
    156.      return 0;  
    157. }  



    二、consumer

    librdkafka进行kafka消费操作的大致步骤如下:

    1、创建kafka配置

    [cpp] view plain copy
     
    1. rd_kafka_conf_t *rd_kafka_conf_new (void)  

    2、创建kafka topic的配置

    [cpp] view plain copy
     
    1. rd_kafka_topic_conf_t *rd_kafka_topic_conf_new (void)   

    3、配置kafka各项参数

    [cpp] view plain copy
     
    1. rd_kafka_conf_res_t rd_kafka_conf_set (rd_kafka_conf_t *conf,  
    2.                                        const char *name,  
    3.                                        const char *value,  
    4.                                        char *errstr, size_t errstr_size)  

    4、配置kafka topic各项参数

    [cpp] view plain copy
     
    1. rd_kafka_conf_res_t rd_kafka_topic_conf_set (rd_kafka_topic_conf_t *conf,  
    2.                          const char *name,  
    3.                          const char *value,  
    4.                          char *errstr, size_t errstr_size)  

    5、创建consumer实例

    [cpp] view plain copy
     
    1. rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *conf, char *errstr, size_t errstr_size)  

    6、为consumer实例添加brokerlist

    [cpp] view plain copy
     
    1. int rd_kafka_brokers_add (rd_kafka_t *rk, const char *brokerlist)  

    7、开启consumer订阅

    [cpp] view plain copy
     
    1. rd_kafka_subscribe (rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *topics)  

    8、轮询消息或事件,并调用回调函数

    [cpp] view plain copy
     
    1. rd_kafka_message_t *rd_kafka_consumer_poll (rd_kafka_t *rk,int timeout_ms)  

    9、关闭consumer实例

    [cpp] view plain copy
     
    1. rd_kafka_resp_err_t rd_kafka_consumer_close (rd_kafka_t *rk)  

    10、释放topic list资源

    [cpp] view plain copy
     
    1. rd_kafka_topic_partition_list_destroy (rd_kafka_topic_partition_list_t *rktparlist)  

    11、销毁consumer实例

    [cpp] view plain copy
     
    1. void rd_kafka_destroy (rd_kafka_t *rk)   

    12、等待consumer对象的销毁

    [cpp] view plain copy
     
    1. int rd_kafka_wait_destroyed (int timeout_ms)  

    完整代码如下my_consumer.c

    [cpp] view plain copy
     
    1. #include <string.h>  
    2. #include <stdlib.h>  
    3. #include <syslog.h>  
    4. #include <signal.h>  
    5. #include <error.h>  
    6. #include <getopt.h>  
    7.   
    8. #include "../src/rdkafka.h"  
    9.   
    10. static int run = 1;  
    11. //`rd_kafka_t`自带一个可选的配置API,如果没有调用API,Librdkafka将会使用CONFIGURATION.md中的默认配置。  
    12. static rd_kafka_t *rk;  
    13. static rd_kafka_topic_partition_list_t *topics;  
    14.   
    15. static void stop (int sig) {  
    16.   if (!run)  
    17.     exit(1);  
    18.   run = 0;  
    19.   fclose(stdin); /* abort fgets() */  
    20. }  
    21.   
    22. static void sig_usr1 (int sig) {  
    23.   rd_kafka_dump(stdout, rk);  
    24. }  
    25.   
    26. /** 
    27.  * 处理并打印已消费的消息 
    28.  */  
    29. static void msg_consume (rd_kafka_message_t *rkmessage,  
    30.        void *opaque) {  
    31.   if (rkmessage->err) {  
    32.     if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {  
    33.       fprintf(stderr,  
    34.         "%% Consumer reached end of %s [%"PRId32"] "  
    35.              "message queue at offset %"PRId64" ",  
    36.              rd_kafka_topic_name(rkmessage->rkt),  
    37.              rkmessage->partition, rkmessage->offset);  
    38.   
    39.       return;  
    40.     }  
    41.   
    42.     if (rkmessage->rkt)  
    43.             fprintf(stderr, "%% Consume error for "  
    44.                     "topic "%s" [%"PRId32"] "  
    45.                     "offset %"PRId64": %s ",  
    46.                     rd_kafka_topic_name(rkmessage->rkt),  
    47.                     rkmessage->partition,  
    48.                     rkmessage->offset,  
    49.                     rd_kafka_message_errstr(rkmessage));  
    50.     else  
    51.             fprintf(stderr, "%% Consumer error: %s: %s ",  
    52.                     rd_kafka_err2str(rkmessage->err),  
    53.                     rd_kafka_message_errstr(rkmessage));  
    54.   
    55.     if (rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION ||  
    56.         rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)  
    57.           run = 0;  
    58.     return;  
    59.   }  
    60.   
    61.   fprintf(stdout, "%% Message (topic %s [%"PRId32"], "  
    62.                       "offset %"PRId64", %zd bytes): ",  
    63.                       rd_kafka_topic_name(rkmessage->rkt),  
    64.                       rkmessage->partition,  
    65.     rkmessage->offset, rkmessage->len);  
    66.   
    67.   if (rkmessage->key_len) {  
    68.     printf("Key: %.*s ",  
    69.              (int)rkmessage->key_len, (char *)rkmessage->key);  
    70.   }  
    71.   
    72.   printf("%.*s ",  
    73.            (int)rkmessage->len, (char *)rkmessage->payload);  
    74.     
    75. }  
    76.   
    77. /* 
    78.   init all configuration of kafka 
    79.  */  
    80. int initKafka(char *brokers, char *group,char *topic){  
    81.   rd_kafka_conf_t *conf;  
    82.   rd_kafka_topic_conf_t *topic_conf;  
    83.   rd_kafka_resp_err_t err;  
    84.   char tmp[16];  
    85.   char errstr[512];  
    86.   
    87.   /* Kafka configuration */  
    88.   conf = rd_kafka_conf_new();  
    89.   
    90.   //quick termination  
    91.   snprintf(tmp, sizeof(tmp), "%i", SIGIO);  
    92.   rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);  
    93.   
    94.   //topic configuration  
    95.   topic_conf = rd_kafka_topic_conf_new();  
    96.   
    97.   /* Consumer groups require a group id */  
    98.   if (!group)  
    99.           group = "rdkafka_consumer_example";  
    100.   if (rd_kafka_conf_set(conf, "group.id", group,  
    101.                         errstr, sizeof(errstr)) !=  
    102.       RD_KAFKA_CONF_OK) {  
    103.           fprintf(stderr, "%% %s ", errstr);  
    104.           return -1;  
    105.   }  
    106.   
    107.   /* Consumer groups always use broker based offset storage */  
    108.   if (rd_kafka_topic_conf_set(topic_conf, "offset.store.method",  
    109.                               "broker",  
    110.                               errstr, sizeof(errstr)) !=  
    111.       RD_KAFKA_CONF_OK) {  
    112.           fprintf(stderr, "%% %s ", errstr);  
    113.           return -1;  
    114.   }  
    115.   
    116.   /* Set default topic config for pattern-matched topics. */  
    117.   rd_kafka_conf_set_default_topic_conf(conf, topic_conf);  
    118.   
    119.   //实例化一个顶级对象rd_kafka_t作为基础容器,提供全局配置和共享状态  
    120.   rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));  
    121.   if(!rk){  
    122.     fprintf(stderr, "%% Failed to create new consumer:%s ", errstr);  
    123.     return -1;  
    124.   }  
    125.   
    126.   //Librdkafka需要至少一个brokers的初始化list  
    127.   if (rd_kafka_brokers_add(rk, brokers) == 0){  
    128.     fprintf(stderr, "%% No valid brokers specified ");  
    129.     return -1;  
    130.   }  
    131.   
    132.   //重定向 rd_kafka_poll()队列到consumer_poll()队列  
    133.   rd_kafka_poll_set_consumer(rk);  
    134.   
    135.   //创建一个Topic+Partition的存储空间(list/vector)  
    136.   topics = rd_kafka_topic_partition_list_new(1);  
    137.   //把Topic+Partition加入list  
    138.   rd_kafka_topic_partition_list_add(topics, topic, -1);  
    139.   //开启consumer订阅,匹配的topic将被添加到订阅列表中  
    140.   if((err = rd_kafka_subscribe(rk, topics))){  
    141.       fprintf(stderr, "%% Failed to start consuming topics: %s ", rd_kafka_err2str(err));  
    142.       return -1;  
    143.   }  
    144.   
    145.   return 1;  
    146. }  
    147.   
    148. int main(int argc, char **argv){  
    149.   char *brokers = "localhost:9092";  
    150.   char *group = NULL;  
    151.   char *topic = NULL;  
    152.     
    153.   int opt;  
    154.   rd_kafka_resp_err_t err;  
    155.   
    156.   while ((opt = getopt(argc, argv, "g:b:t:qd:eX:As:DO")) != -1){  
    157.     switch (opt) {  
    158.       case 'b':  
    159.         brokers = optarg;  
    160.         break;  
    161.       case 'g':  
    162.         group = optarg;  
    163.         break;  
    164.       case 't':  
    165.         topic = optarg;  
    166.         break;  
    167.       default:  
    168.         break;  
    169.     }  
    170.   }   
    171.   
    172.   signal(SIGINT, stop);  
    173.   signal(SIGUSR1, sig_usr1);  
    174.   
    175.   if(!initKafka(brokers, group, topic)){  
    176.     fprintf(stderr, "kafka server initialize error ");  
    177.   }else{  
    178.     while(run){  
    179.       rd_kafka_message_t *rkmessage;  
    180.       /*-轮询消费者的消息或事件,最多阻塞timeout_ms 
    181.         -应用程序应该定期调用consumer_poll(),即使没有预期的消息,以服务 
    182.         所有排队等待的回调函数,当注册过rebalance_cb,该操作尤为重要, 
    183.         因为它需要被正确地调用和处理以同步内部消费者状态 */  
    184.       rkmessage = rd_kafka_consumer_poll(rk, 1000);  
    185.       if(rkmessage){  
    186.         msg_consume(rkmessage, NULL);  
    187.         /*释放rkmessage的资源,并把所有权还给rdkafka*/  
    188.         rd_kafka_message_destroy(rkmessage);  
    189.       }  
    190.     }  
    191.   }  
    192.   
    193. done:  
    194.     /*此调用将会阻塞,直到consumer撤销其分配,调用rebalance_cb(如果已设置), 
    195.     commit offset到broker,并离开consumer group 
    196.     最大阻塞时间被设置为session.timeout.ms 
    197.     */  
    198.     err = rd_kafka_consumer_close(rk);  
    199.     if(err){  
    200.       fprintf(stderr, "%% Failed to close consumer: %s ", rd_kafka_err2str(err));  
    201.     }else{  
    202.       fprintf(stderr, "%% Consumer closed ");  
    203.     }  
    204.   
    205.     //释放topics list使用的所有资源和它自己  
    206.     rd_kafka_topic_partition_list_destroy(topics);  
    207.   
    208.     //destroy kafka handle  
    209.     rd_kafka_destroy(rk);  
    210.     
    211.     run = 5;  
    212.     //等待所有rd_kafka_t对象销毁,所有kafka对象被销毁,返回0,超时返回-1  
    213.     while(run-- > 0 && rd_kafka_wait_destroyed(1000) == -1){  
    214.       printf("Waiting for librdkafka to decommission ");  
    215.     }  
    216.     if(run <= 0){  
    217.       //dump rdkafka内部状态到stdout流  
    218.       rd_kafka_dump(stdout, rk);  
    219.     }  
    220.   
    221.     return 0;  
    222. }  


    在linux下编译producer和consumer的代码:

    [cpp] view plain copy
     
    1. gcc my_producer.c -o my_producer  -lrdkafka -lz -lpthread -lrt  
    2. gcc my_consumer.c -o my_consumer  -lrdkafka -lz -lpthread -lrt  

    在运行my_producer或my_consumer时可能会报错"error while loading shared libraries xxx.so", 此时需要在/etc/ld.so.conf中加入xxx.so所在的目录


    在本地启动一个简单的kafka服务,设置broker集群为localhost:9092并创建一个叫“test_topic”的topic
    启动方式可参考 kafka0.8.2集群的环境搭建并实现基本的生产消费

    启动consumer:

    启动producer,并发送一条数据“hello world”:


    consumer处成功收到producer发送的“hello world”:

    http://orchome.com/5

    https://github.com/edenhill/librdkafka

    https://github.com/mfontanini/cppkafka

    https://github.com/zengyuxing007/kafka_test_cpp

  • 相关阅读:
    Final TFS 2008 Feature List
    来看看微软对测试是什么要求
    淘宝设计流程
    Disable try catch
    jQuery validate API
    iPhone手机开发平台入门介绍和教程
    VSSpeedster Speed up your VS 2010
    Where are the SDK tools? Where is ildasm?
    效率高的删除语句truncate table [tablename]
    修改Hosts去除各站广告
  • 原文地址:https://www.cnblogs.com/wangbin/p/8192372.html
Copyright © 2011-2022 走看看