zoukankan      html  css  js  c++  java
  • Linux下librdkafka客户端的编译运行

    Linux下librdkafka客户端的编译运行

      librdkafka是一个开源的Kafka客户端C/C++实现,提供了Kafka生产者、消费者接口。

      由于项目需要,我要将Kafka生产者接口封装起来给别人调用,所以先安装了librdkakfa,然后在demo上进行修改封装一个生产者接口。

    [一] 安装librdkafka

       首先在github上下载librdkafka源码,解压后进行编译;

       cd librdkafka-master

       chmod 777 configure lds-gen.py

       ./configure

       make

       make install

       在make的时候,如果是64位Linux会报下面这个异常

       /bin/ld:librdkafka.lds:1: syntax error in VERSION script

       只要Makefile.config里面的WITH_LDS=y这一行注释掉就不会报错了。

    [二] 封装librdkafka的生产者接口

    #include <ctype.h>
    #include <signal.h>
    #include <string.h>
    #include <unistd.h>
    #include <stdlib.h>
    #include <syslog.h>
    #include <time.h>
    #include <sys/time.h>
    
    #include "librdkafka/rdkafka.h"  /* for Kafka driver */
    
    static int run = 1;
    static rd_kafka_t *rk;
    rd_kafka_topic_t *rkt;
    int partition = RD_KAFKA_PARTITION_UA;
    rd_kafka_topic_conf_t *topic_conf;
    
    static void stop (int sig) {
        run = 0;
        fclose(stdin); /* abort fgets() */
    }
    
    static void sig_usr1 (int sig) {
        rd_kafka_dump(stdout, rk);
    }
    
    int initProducer (char *parameters) {
        int argc = 1;
        char **argv;
        char *para;
        char *delim = " ";
        char *brokers = "localhost:9092";
        char *topic = NULL;
        int opt;
        rd_kafka_conf_t *conf;
        char errstr[512];
        char tmp[16];
    
        char copyParameters[1024];
        strcpy(copyParameters, parameters);
        para = strtok(parameters, delim);
        argc++;
        while((para = strtok(NULL, delim)) != NULL){
            argc++;
        }
        argv = (char**)malloc(argc*sizeof(char*));
        argc = 0;
        argv[argc] = "initProducer";
        para = strtok(copyParameters, delim);
        argc++;
        argv[argc] = para;
        while((para = strtok(NULL, delim)) != NULL){
            argc++;
            argv[argc] = para;
        }
        argc++;
        /* Kafka configuration */
        conf = rd_kafka_conf_new();
        /* Quick termination */
        snprintf(tmp, sizeof(tmp), "%i", SIGIO);
        rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);
        /* Topic configuration */
        topic_conf = rd_kafka_topic_conf_new();
        while ((opt = getopt(argc, argv, "PCLt:p:b:z:qd:o:eX:As:")) != -1) {
            switch (opt) {
            case 't':
                topic = optarg;
                break;
            case 'p':
                partition = atoi(optarg);
                break;
            case 'b':
                brokers = optarg;
                break;
            default:
                fprintf(stderr, "%% Failed to init producer with error parameters
    ");
            }
        }
        if (optind != argc || !topic) {
            exit(1);
        }
        signal(SIGINT, stop);
        signal(SIGUSR1, sig_usr1);
        /* Create Kafka handle */
        if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)))) {
            fprintf(stderr, "%% Failed to create new producer: %s
    ", errstr);
            exit(1);
        }
        rd_kafka_set_log_level(rk, LOG_DEBUG);
        /* Add brokers */
        if (rd_kafka_brokers_add(rk, brokers) == 0) {
            fprintf(stderr, "%% No valid brokers specified
    ");
            exit(1);
        }
        /* Create topic */
        rkt = rd_kafka_topic_new(rk, topic, topic_conf);
        topic_conf = NULL; /* Now owned by topic */
        return 1;
    }
    
    int freeProducer()
    {
        /* Destroy topic */
        rd_kafka_topic_destroy(rkt);
        /* Destroy the handle */
        rd_kafka_destroy(rk);
        if (topic_conf)
            rd_kafka_topic_conf_destroy(topic_conf);
        /* Let background threads clean up and terminate cleanly. */
        run = 5;
        while (run-- > 0 && rd_kafka_wait_destroyed(1000) == -1)
            printf("Waiting for librdkafka to decommission
    ");
        if (run <= 0)
            rd_kafka_dump(stdout, rk);
        return 1;
    }
    
    int main (int argc, char **argv)
    {
        char parameter[] = "-t XX-HTTP-KEYWORD-LOG -b 10.10.6.101:9092,10.10.6.102:9092,10.10.6.104:9092";
        char buf[1024];
        initProducer(parameter);
        while (run && fgets(buf, sizeof(buf), stdin)) {
            if(rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, buf, strlen(buf), NULL, 0, NULL) == -1){
                fprintf(stderr, "%% Failed to produce to topic %s partition %i: %s
    ", rd_kafka_topic_name(rkt), partition, rd_kafka_err2str(rd_kafka_last_error()));
            }else{
                fprintf(stderr, "%% Sent %zd bytes to topic %s partition %i
    ", strlen(buf), rd_kafka_topic_name(rkt), partition);
            }
        }
        freeProducer();
        return 0;
    }

    [三] 编译运行

       编译的时候要加上-lrdkafka -lz -lpthread -lrt这些选项:gcc myProducer.c -o myProducer -lrdkafka -lz -lpthread -lrt

       在编译的时候会报error while loading share library librdkafak.so.1,这是因为make的时候将librdkafak.so.1放在了/usr/local/lib下,在Linux的默认共享库路径/lib和/usr/lib下找不到,只要执行下面两句就可以了:

       echo "/usr/local/lib" >> /etc/ld.so.conf
       ldconfig

       运行./myProducer,会不断的从终端读取键入的字符串,然后发送到Kafka,通过Kafka自带的console consumer能够消费查看数据。

  • 相关阅读:
    mybatis plus foreach 的用法
    mongodb聚合查询
    mongodb and 和 or 查询
    mongodb全文搜索
    时间参数的传递
    rabbitmq
    AOP各种的实现
    OWASP Top 10十大风险 – 10个最重大的Web应用风险与攻防
    OAuth2.0认证和授权机制讲解
    MySQL主从复制
  • 原文地址:https://www.cnblogs.com/vincent-vg/p/5855924.html
Copyright © 2011-2022 走看看