zoukankan      html  css  js  c++  java
  • kafka C客户端librdkafka producer源码分析

    简介

    kafka网站上提供了C语言的客户端librdkafka,地址在这

    librdkafka是使用C语言根据apache kafka 协议实现的客户端。另外这个客户端还有简单的c++接口。客户端作者对这个客户端比较上心,经常会修改bug并提交新功能。

    librdkafka的基本原理和我之前博客说的java版producer类似,一个线程向队列中加数据,另一个线程通过非阻塞的方式从队列中取出数据,并写入到broker。

    源码分析

    源码包含两个文件夹src和src-cpp

    image image

    src是用c实现的源码,而src-cpp是在c接口上包装的一层c++类,实现了基本的功能。

    代码运行流程如下

    1、rd_kafka_conf_set设置全局配置

    2、rd_kafka_topic_conf_set设置topic配置

    3、rd_kafka_brokers_add设置broker地址,启动向broker发送消息的线程

    4、rd_kafka_new启动kafka主线程

    5、rd_kafka_topic_new建topic

    6、rd_kafka_produce使用本函数发送消息

    7、rd_kafka_poll调用回调函数

    还是看发送一条消息的过程

    入队列过程

     调用rd_kafka_produce可以将消息写到队列

    1 int rd_kafka_produce (...) {
    2     //调用rd_kafka_msg_new
    3     return rd_kafka_msg_new(...);
    4 }
    首先先将消息包装成rd_kafka_msg_t类型,然后获取分区并相应的队列
    复制代码
    1 int rd_kafka_msg_new (...) {
    2     ...
    3     //创建消息,将传入的参数转换为rkm
    4     rkm = rd_kafka_msg_new0(...);
    5     //分区并入队
    6     err = rd_kafka_msg_partitioner(rkt, rkm, 1);
    7     ...
    8     return -1;
    9 }
    复制代码
    复制代码
     1 int rd_kafka_msg_partitioner (...) {
     2      ...
     3      //获取分区号
     4      switch (rkt->rkt_state)
     5      {
     6          ...
     7      }
     8     //获取分区
     9     rktp_new = rd_kafka_toppar_get(rkt, partition, 0);
    10     ...
    11     //加入队列
    12     rd_kafka_toppar_enq_msg(rktp_new, rkm);
    13     return 0;
    14 }
    复制代码

    出队列过程

    添加broker的过程中就启动了扫描队列的操作

    复制代码
     1 static rd_kafka_broker_t *rd_kafka_broker_add (rd_kafka_t *rk,
     2                            rd_kafka_confsource_t source,
     3                            const char *name, uint16_t port,
     4                            int32_t nodeid) {
     5     ...
     6     pthread_attr_init(&attr);
     7     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
     8     //启动向broker发送消息的主线程
     9     if ((err = pthread_create(&rkb->rkb_thread, &attr,
    10                   rd_kafka_broker_thread_main, rkb))) {
    11         ...
    12         return NULL;
    13     }
    14     //将broker加到broker队列中
    15     TAILQ_INSERT_TAIL(&rkb->rkb_rk->rk_brokers, rkb, rkb_link);
    16     (void)rd_atomic_add(&rkb->rkb_rk->rk_broker_cnt, 1);
    17     ...
    18     return rkb;
    19 }
    复制代码

    启动rd_kafka_broker_thread_main主线程

    复制代码
     1 static void *rd_kafka_broker_thread_main (void *arg) {
     2     ...
     3     while (!rkb->rkb_rk->rk_terminate) {
     4         switch (rkb->rkb_state)
     5         {
     6         //如果broker连接未初始化,或中断,则不断重连broker
     7         case RD_KAFKA_BROKER_STATE_INIT:
     8         case RD_KAFKA_BROKER_STATE_DOWN:
     9             if (rd_kafka_broker_connect(rkb) == -1) {
    10                 ...
    11             }
    12             break;
    13         //如果broker连接已经建立,则调用serve函数
    14         case RD_KAFKA_BROKER_STATE_UP:
    15             if (rkb->rkb_nodeid == RD_KAFKA_NODEID_UA)
    16                 rd_kafka_broker_ua_idle(rkb);
    17             else if (rk->rk_type == RD_KAFKA_PRODUCER)
    18                 rd_kafka_broker_producer_serve(rkb);
    19             else if (rk->rk_type == RD_KAFKA_CONSUMER)
    20                 rd_kafka_broker_consumer_serve(rkb);
    21             break;
    22         }
    23     }
    24     ...
    25     return NULL;
    26 }
    复制代码

    只看producer的处理函数,该函数扫描消息并发送

    复制代码
     1 static void rd_kafka_broker_producer_serve (rd_kafka_broker_t *rkb) {
     2     ...
     3     while (!rkb->rkb_rk->rk_terminate &&
     4            rkb->rkb_state == RD_KAFKA_BROKER_STATE_UP) {
     5         ...
     6         do {
     7             cnt = 0;
     8             ...
     9             //扫描所有的topic-partitions,并发送消息
    10             TAILQ_FOREACH(rktp, &rkb->rkb_toppars, rktp_rkblink) {
    11                 ...
    12                 //将入队过程中的队列rktp_msgq加到rktp_xmit_msgq中
    13                 if (rktp->rktp_msgq.rkmq_msg_cnt > 0)
    14                     rd_kafka_msgq_concat(&rktp->
    15                                  rktp_xmit_msgq,
    16                                  &rktp->rktp_msgq);
    17                 rd_kafka_toppar_unlock(rktp);
    18                 //扫描消息队列中数据是否超时
    19                 if (unlikely(do_timeout_scan))
    20                     rd_kafka_msgq_age_scan(&rktp->
    21                                    rktp_xmit_msgq,
    22                                    &timedout,
    23                                    now);
    24                 //队列为空则从头继续
    25                 if (rktp->rktp_xmit_msgq.rkmq_msg_cnt == 0)
    26                     continue;
    27                 
    28                 //如果没有超时,或者没达到处理消息数量的阈值,则从头继续,这样批处理可以提高性能
    29                 if (rktp->rktp_ts_last_xmit +
    30                     (rkb->rkb_rk->rk_conf.
    31                      buffering_max_ms * 1000) > now &&
    32                     rktp->rktp_xmit_msgq.rkmq_msg_cnt <
    33                     rkb->rkb_rk->rk_conf.
    34                     batch_num_messages) {
    35                     /* Wait for more messages */
    36                     continue;
    37                 }
    38 
    39                 rktp->rktp_ts_last_xmit = now;
    40 
    41                 //按协议转换并填充数据到rkb中
    42                 while (rktp->rktp_xmit_msgq.rkmq_msg_cnt > 0) {
    43                     int r = rd_kafka_broker_produce_toppar(
    44                         rkb, rktp);
    45                     if (likely(r > 0))
    46                         cnt += r;
    47                     else
    48                         break;
    49                 }
    50             }
    51 
    52         } while (cnt);
    53 
    54         //触发数据发送情况的回调函数,将发送失败的写到一个操作结果队列中
    55         if (unlikely(isrfailed.rkmq_msg_cnt > 0))
    56             rd_kafka_dr_msgq(rkb->rkb_rk, &isrfailed,
    57                      RD_KAFKA_RESP_ERR__ISR_INSUFF);
    58 
    59         if (unlikely(timedout.rkmq_msg_cnt > 0))
    60             rd_kafka_dr_msgq(rkb->rkb_rk, &timedout,
    61                      RD_KAFKA_RESP_ERR__MSG_TIMED_OUT);
    62 
    63         rd_kafka_broker_toppars_unlock(rkb);
    64 
    65         /* Check and move retry buffers */
    66         if (unlikely(rkb->rkb_retrybufs.rkbq_cnt) > 0)
    67             rd_kafka_broker_retry_bufs_move(rkb);
    68 
    69         rd_kafka_broker_unlock(rkb);
    70 
    71         //开始在网络上发送数据
    72                 rd_kafka_broker_io_serve(rkb);
    73 
    74         /* Scan wait-response queue
    75          * Note: 'now' may be a bit outdated by now. */
    76         if (do_timeout_scan)
    77             rd_kafka_broker_waitresp_timeout_scan(rkb, now);
    78 
    79         rd_kafka_broker_lock(rkb);
    80     }
    81 
    82     rd_kafka_broker_unlock(rkb);
    83 } 
    复制代码
    通过poll处理网络事件,将消息从网络发送到broker
    复制代码
     1 static void rd_kafka_broker_io_serve (rd_kafka_broker_t *rkb) {
     2     rd_kafka_op_t *rko;
     3     rd_ts_t now = rd_clock();
     4     //处理broker操作
     5     if (unlikely(rd_kafka_q_len(&rkb->rkb_ops) > 0))
     6         while ((rko = rd_kafka_q_pop(&rkb->rkb_ops, RD_POLL_NOWAIT)))
     7             rd_kafka_broker_op_serve(rkb, rko);
     8     //请求metadata
     9     if (unlikely(now >= rkb->rkb_ts_metadata_poll))
    10         rd_kafka_broker_metadata_req(rkb, 1 /* all topics */, NULL,
    11                                              NULL, "periodic refresh");
    12     //如果有消息,手动增加写事件
    13     if (rkb->rkb_outbufs.rkbq_cnt > 0)
    14         rkb->rkb_pfd.events |= POLLOUT;
    15     else
    16         rkb->rkb_pfd.events &= ~POLLOUT;
    17     if (poll(&rkb->rkb_pfd, 1,
    18          rkb->rkb_rk->rk_conf.buffering_max_ms) <= 0)
    19         return;
    20     //poll函数,处理各种事件,发送消息时,只处理写事件,当请求metadata时,处理读事件
    21     if (rkb->rkb_pfd.revents & POLLIN)
    22         while (rd_kafka_recv(rkb) > 0)
    23             ;
    24     if (rkb->rkb_pfd.revents & POLLHUP)
    25         return rd_kafka_broker_fail(rkb, RD_KAFKA_RESP_ERR__TRANSPORT,
    26                         "Connection closed");
    27     if (rkb->rkb_pfd.revents & POLLOUT)
    28         while (rd_kafka_send(rkb) > 0)
    29             ;
    30 }
    复制代码
    
    

    问题

    librdkafka不像java客户端那样,可以通过future.get()实现同步发送。所以,如果broker不能连通的话,send方法还是可以正常将消息放入队列。这会导致两个问题

    1、我们的客户端是不会知道broker已经挂掉了,因而不能对这种情况作出及时处理,导致消息全部堆积在内存中,如果此时不幸,我们的客户端也挂掉了,那这部分消息就全部丢失了。

    2、如果broker一直没有恢复,而我们一直向队列中写数据的话,producer中有一个选项message.timeout.ms,如果超过了设定的消息超时时间,那么会有线程清理队列中的数据,导致消息丢失,而如果将时间设置为0(永不超时)的话,将导致客户端内存撑满。

    上面这个问题可以通过如下方法实现的同步发送来解决

    复制代码
     1 void dr_cb (...err, , void *msg_opaque) {
     2      int *produce_statusp = (int *)msg_opaque;
     3 
     4      /* set sync_produce()'s produce_status value to the error code (which can be NO_ERROR) */
     5      *produce_statusp = err;
     6 }
     7 
     8 int sync_produce (rkt, msg..) {
     9    int produce_status = -100000; /* or some other magic value that is not proper value in rd_kafka_resp_err_t */
    10 
    11    rd_kafka_produce(rkt, ..msg, .., &produce_status /* msg_opaque */);
    12 
    13    do {
    14      /* poll dr and error callbacks. */
    15      rd_kafka_poll(rk, 1000);
    16     /* wait for dr_cb to be called and setting produce_status to the error value. */
    17    } while (produce_status == -100000);
    18 
    19   if (produce_status == RD_KAFKA_RESP_ERR_NO_ERROR)
    20    return SUCCESS!;
    21   else
    22    return FAILURE;
    23 }
    复制代码
  • 相关阅读:
    父亲节前参考四级考试
    rpm小解
    oracle忘记sys/system/scott用户的密码怎么办
    yum 小解
    linux下设置swap文件
    启动mysql 报错: ERROR 2002 (HY000): Can’t connect to local MySQL server through socket ‘/var/lib/mysql/mysql.sock’ (2)
    mysql 常用命令
    wget安装
    删除mysql
    什么是swap分区
  • 原文地址:https://www.cnblogs.com/the-tops/p/5765297.html
Copyright © 2011-2022 走看看