zoukankan      html  css  js  c++  java
  • kafka之c接口常用API------librdkafka

    1 安装方法以及相关库文件

      https://github.com/edenhill/librdkafka

    • High-level producer
    • High-level consumer
    • Simple (Low-level) consumer
    • 压缩:snappy, gzip, lz4
    • SSL
    • SASL

      consumer有两套API,高级(high-level)和底层(simple)的,应该叫底层API或者低级API,它跟高级API的区别是没有自动负载均衡,而高级API会自动进行负载均衡。

    3 kafka主要的用途

      发数据---->producer

        //发一条

        rd_kafka_produce()

        //发多条

        rd_kafka_produce_batch()

      收数据---->consumer

      在收发数据之前至少需要一个统一的句柄,方便kafka内部准备好链接brokers集群,初始化kafka内部结构等

      

      建立这个kafka句柄需要知道连接到哪个broker

      

      发布消息使用rd_kafka_top_t, 

     1 // 对rd_kafka_topic_partition_list_t结构的操作
     2 // 创建
     3 rd_kafka_topic_partition_list_new();
     4 // 增加元素
     5 rd_kafka_topic_partition_list_add();
     6 // 删除元素
     7 rd_kafka_topic_partition_list_del();
     8 // 查找元素
     9 rd_kafka_topic_partition_list_find();
    10 
    11 // 对rd_kafka_topic_t的操作
    12 // 创建
    13 rd_kafka_topic_new();
    14 // 删除
    15 rd_kafka_topic_destroy();
    16 // 获取该topic的名字
    17 rd_kafka_topic_name();
    18 // 获取该topic传入的应用参数
    19 rd_kafka_topic_opaque();
    20 
    21 // 使用rd_kafka_topic_partition_list_t的时候,topic+partition是连在一起的,
    22 // 所以给kafka句柄的时候只用一个参数就够了
    23 // 订阅消息
    24 rd_kafka_subscribe (rd_kafka_t *rk,
    25                             const rd_kafka_topic_partition_list_t *topics);
    26 // 指定消费的partition,可以在运行时更换
    27 rd_kafka_assign (rd_kafka_t *rk,
    28                             const rd_kafka_topic_partition_list_t *partitions);
    29 
    30 // 用rd_kafka_topic_t比较麻烦,需要配合一个partition才行
    31 // 直接启动consumer了
    32 rd_kafka_consume_start(rd_kafka_topic_t *rkt, int32_t partition,
    33                             int64_t offset);
    34 // 每次接收也要带上partition
    35 rd_kafka_message_t *rd_kafka_consume(rd_kafka_topic_t *rkt, int32_t partition,
    36                             int timeout_ms);
    37 
    38 
    39 // 发送的时候使用 rd_kafka_topic_t
    40 int rd_kafka_produce(rd_kafka_topic_t *rkt, int32_t partition,
    41                             int msgflags,
    42                             void *payload, size_t len,
    43                             const void *key, size_t keylen,
    44                             void *msg_opaque);
    View Code

    后续用了继续做笔记,关于错误处理,topic_partition_list操作等

    https://blog.csdn.net/lijinqi1987/article/details/76582067

    http://suntus.github.io/2016/07/07/librdkafka--kafka%20C%20api%E4%BB%8B%E7%BB%8D/

  • 相关阅读:
    mysql(一) 关联查询的方式
    SpringBoot2.0(五) CORS跨域
    SpringBoot2.0(四) 远程调试
    SpringBoot2.0(三) 文件上传
    SpringBoot2.0(二) 配置文件多环境
    SpringBoot2.0(一) mybatis
    Java InputStream转File
    git 命令学习
    reids 中出现 (error) MOVED 原因和解决方案
    ibm 的 heapanalyzer 分析器
  • 原文地址:https://www.cnblogs.com/lanjianhappy/p/10636838.html
Copyright © 2011-2022 走看看