zoukankan      html  css  js  c++  java
  • 使用librdkafka库实现kafka的生产和消费实例消费者

    一、消费者

    1、创建kafka配置

    rd_kafka_conf_t *rd_kafka_conf_new (void) 

    2、创建kafka topic的配置

    rd_kafka_topic_conf_t *rd_kafka_topic_conf_new (void)

    3、创建kafka各项参数

    rd_kafka_conf_res_t rd_kafka_conf_set (rd_kafka_conf_t *conf,  
                                           const char *name,  
                                           const char *value,  
                                           char *errstr, size_t errstr_size)  

    4、配置kafka topic各项参数

    rd_kafka_conf_res_t rd_kafka_topic_conf_set (rd_kafka_topic_conf_t *conf,  
                             const char *name,  
                             const char *value,  
                             char *errstr, size_t errstr_size)  

    5、创建consumer实例

    rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *conf, char *errstr, size_t errstr_size)  

    6、为consumer实例添加broker list

    int rd_kafka_brokers_add (rd_kafka_t *rk, const char *brokerlist)

    7、开启consumer订阅

    rd_kafka_subscribe (rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *topics) 

    8、轮询消息或事件,并调用回掉函数

    rd_kafka_message_t *rd_kafka_consumer_poll (rd_kafka_t *rk,int timeout_ms)  

    9、关闭consumer实例

    rd_kafka_resp_err_t rd_kafka_consumer_close (rd_kafka_t *rk)

    10、释放topic list 资源

    rd_kafka_topic_partition_list_destroy (rd_kafka_topic_partition_list_t *rktparlist)

    11、销毁consumer实例

    void rd_kafka_destroy (rd_kafka_t *rk)

    12、等待consumer对象的销毁

    int rd_kafka_wait_destroyed (int timeout_ms)

    代码示例consumer.c:

    #include <string.h>
    #include <stdlib.h>
    #include <syslog.h>
    #include <signal.h>
    #include <error.h>
    #include <getopt.h>
    
    #include "../src/rdkafka.h"
    
    static int run = 1;
    //`rd_kafka_t`自带一个可选的配置API,如果没有调用API,Librdkafka将会使用CONFIGURATION.md中的默认配置。
    static rd_kafka_t *rk;
    static rd_kafka_topic_partition_list_t *topics;
    
    static void stop (int sig) {
      if (!run)
        exit(1);
      run = 0;
      fclose(stdin); /* abort fgets() */
    }
    
    static void sig_usr1 (int sig) {
      rd_kafka_dump(stdout, rk);
    }
    
    /**
     * 处理并打印已消费的消息
     */
    static void msg_consume (rd_kafka_message_t *rkmessage,
           void *opaque) {
      if (rkmessage->err) {
        if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
          fprintf(stderr,
            "%% Consumer reached end of %s [%"PRId32"] "
                 "message queue at offset %"PRId64"\n",
                 rd_kafka_topic_name(rkmessage->rkt),
                 rkmessage->partition, rkmessage->offset);
    
          return;
        }
    
        if (rkmessage->rkt)
                fprintf(stderr, "%% Consume error for "
                        "topic \"%s\" [%"PRId32"] "
                        "offset %"PRId64": %s\n",
                        rd_kafka_topic_name(rkmessage->rkt),
                        rkmessage->partition,
                        rkmessage->offset,
                        rd_kafka_message_errstr(rkmessage));
        else
                fprintf(stderr, "%% Consumer error: %s: %s\n",
                        rd_kafka_err2str(rkmessage->err),
                        rd_kafka_message_errstr(rkmessage));
    
        if (rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION ||
            rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
              run = 0;
        return;
      }
    
      fprintf(stdout, "%% Message (topic %s [%"PRId32"], "
                          "offset %"PRId64", %zd bytes):\n",
                          rd_kafka_topic_name(rkmessage->rkt),
                          rkmessage->partition,
        rkmessage->offset, rkmessage->len);
    
      if (rkmessage->key_len) {
        printf("Key: %.*s\n",
                 (int)rkmessage->key_len, (char *)rkmessage->key);
      }
    
      printf("%.*s\n",
               (int)rkmessage->len, (char *)rkmessage->payload);
      
    }
    
    /*
      init all configuration of kafka
     */
    int initKafka(char *brokers, char *group,char *topic){
      rd_kafka_conf_t *conf;
      rd_kafka_topic_conf_t *topic_conf;
      rd_kafka_resp_err_t err;
      char tmp[16];
      char errstr[512];
    
      /* Kafka configuration */
      conf = rd_kafka_conf_new();
    
      //quick termination
      snprintf(tmp, sizeof(tmp), "%i", SIGIO);
      rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);
    
      //topic configuration
      topic_conf = rd_kafka_topic_conf_new();
    
      /* Consumer groups require a group id */
      if (!group)
              group = "rdkafka_consumer_example";
      if (rd_kafka_conf_set(conf, "group.id", group,
                            errstr, sizeof(errstr)) !=
          RD_KAFKA_CONF_OK) {
              fprintf(stderr, "%% %s\n", errstr);
              return -1;
      }
    
      /* Consumer groups always use broker based offset storage */
      if (rd_kafka_topic_conf_set(topic_conf, "offset.store.method",
                                  "broker",
                                  errstr, sizeof(errstr)) !=
          RD_KAFKA_CONF_OK) {
              fprintf(stderr, "%% %s\n", errstr);
              return -1;
      }
    
      /* Set default topic config for pattern-matched topics. */
      rd_kafka_conf_set_default_topic_conf(conf, topic_conf);
    
      //实例化一个顶级对象rd_kafka_t作为基础容器,提供全局配置和共享状态
      rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
      if(!rk){
        fprintf(stderr, "%% Failed to create new consumer:%s\n", errstr);
        return -1;
      }
    
      //Librdkafka需要至少一个brokers的初始化list
      if (rd_kafka_brokers_add(rk, brokers) == 0){
        fprintf(stderr, "%% No valid brokers specified\n");
        return -1;
      }
    
      //重定向 rd_kafka_poll()队列到consumer_poll()队列
      rd_kafka_poll_set_consumer(rk);
    
      //创建一个Topic+Partition的存储空间(list/vector)
      topics = rd_kafka_topic_partition_list_new(1);
      //把Topic+Partition加入list
      rd_kafka_topic_partition_list_add(topics, topic, -1);
      //开启consumer订阅,匹配的topic将被添加到订阅列表中
      if((err = rd_kafka_subscribe(rk, topics))){
          fprintf(stderr, "%% Failed to start consuming topics: %s\n", rd_kafka_err2str(err));
          return -1;
      }
    
      return 1;
    }
    
    int main(int argc, char **argv){
      char *brokers = "localhost:9092";
      char *group = NULL;
      char *topic = NULL;
      
      int opt;
      rd_kafka_resp_err_t err;
    
      while ((opt = getopt(argc, argv, "g:b:t:qd:eX:As:DO")) != -1){
        switch (opt) {
          case 'b':
            brokers = optarg;
            break;
          case 'g':
            group = optarg;
            break;
          case 't':
            topic = optarg;
            break;
          default:
            break;
        }
      } 
    
      signal(SIGINT, stop);
      signal(SIGUSR1, sig_usr1);
    
      if(!initKafka(brokers, group, topic)){
        fprintf(stderr, "kafka server initialize error\n");
      }else{
        while(run){
          rd_kafka_message_t *rkmessage;
          /*-轮询消费者的消息或事件,最多阻塞timeout_ms
            -应用程序应该定期调用consumer_poll(),即使没有预期的消息,以服务
            所有排队等待的回调函数,当注册过rebalance_cb,该操作尤为重要,
            因为它需要被正确地调用和处理以同步内部消费者状态 */
          rkmessage = rd_kafka_consumer_poll(rk, 1000);
          if(rkmessage){
            msg_consume(rkmessage, NULL);
            /*释放rkmessage的资源,并把所有权还给rdkafka*/
            rd_kafka_message_destroy(rkmessage);
          }
        }
      }
    
    done:
        /*此调用将会阻塞,直到consumer撤销其分配,调用rebalance_cb(如果已设置),
        commit offset到broker,并离开consumer group
        最大阻塞时间被设置为session.timeout.ms
        */
        err = rd_kafka_consumer_close(rk);
        if(err){
          fprintf(stderr, "%% Failed to close consumer: %s\n", rd_kafka_err2str(err));
        }else{
          fprintf(stderr, "%% Consumer closed\n");
        }
    
        //释放topics list使用的所有资源和它自己
        rd_kafka_topic_partition_list_destroy(topics);
    
        //destroy kafka handle
        rd_kafka_destroy(rk);
      
        run = 5;
        //等待所有rd_kafka_t对象销毁,所有kafka对象被销毁,返回0,超时返回-1
        while(run-- > 0 && rd_kafka_wait_destroyed(1000) == -1){
          printf("Waiting for librdkafka to decommission\n");
        }
        if(run <= 0){
          //dump rdkafka内部状态到stdout流
          rd_kafka_dump(stdout, rk);
        }
    
        return 0;
    }

    原文博客:http://blog.csdn.net/lijinqi1987/article/details/76582067

  • 相关阅读:
    ORM查询相关的操作
    分享一些珍藏和网上搜集的一些接码平台
    DRF: serializers ModelSerializer的序列化中model在有外键的情况下显示name代替显示id的几种方式
    Django Rest framework中序列化A表时怎么获取B表的数据
    10步入门Django Rest framework后端接口框架
    Django Rest framework后端接口框架,常用的子类视图
    redis学习(九)——数据持久化
    Java8之lambda表达式
    Java多线程(九)—— interrupt()和线程终止方式
    redis学习(八)——redis应用场景
  • 原文地址:https://www.cnblogs.com/GnibChen/p/8604585.html
Copyright © 2011-2022 走看看