C++操作Kafka使用Protobuf进行跨语言数据交互
Kafka 是一种分布式的流平台,具体是什么意思呢?
A streaming platform has three key capabilities:
- Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.
- Store streams of records in a fault-tolerant durable way.
- Process streams of records as they occur.
Kafka is generally used for two broad classes of applications:
- Building real-time streaming data pipelines that reliably get data between systems or applications
- Building real-time streaming applications that transform or react to the streams of data
To understand how Kafka does these things, let's dive in and explore Kafka's capabilities from the bottom up.
First a few concepts:
- Kafka is run as a cluster on one or more servers that can span multiple datacenters.
- The Kafka cluster stores streams of records in categories called topics.
- Each record consists of a key, a value, and a timestamp.
关于Kafka的作用,相关的总结很多,简单梳理如下:
- 以时间复杂度为 O(1) 的方式提供消息持久化能力,即使对 TB 级以上数据也能保证常数时间复杂度的访问性能。
高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒 100K 条以上消息的传输。
支持 Kafka Server 间的消息分区,及分布式消费,同时保证每个 Partition 内的消息顺序传输。
同时支持离线数据处理和实时数据处理。
Scale out:支持在线水平扩展。 - 解耦在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
- 冗余有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的 " 插入 - 获取 - 删除 " 范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。
- 扩展性因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。
- 灵活性 & 峰值处理能力在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
- 可恢复性系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
- 顺序保证在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。Kafka 保证一个 Partition 内的消息的有序性。
- 缓冲在任何重要的系统中,都会有需要不同的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列通过一个缓冲层来帮助任务最高效率的执行———写入队列的处理会尽可能的快速。该缓冲有助于控制和优化数据流经过系统的速度。
- 异步通信很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
librdkafka is a C library implementation of the Apache Kafka protocol, providing Producer, Consumer and Admin clients. It was designed with message delivery reliability and high performance in mind, current figures exceed 1 million msgs/second for the producer and 3 million msgs/second for the consumer.
librdkafka is licensed under the 2-clause BSD license.
Features
- Full Exactly-Once-Semantics (EOS) support
- High-level producer, including Idempotent and Transactional producers
- High-level balanced KafkaConsumer (requires broker >= 0.9)
- Simple (legacy) consumer
- Admin client
- Compression: snappy, gzip, lz4, zstd
- SSL support
- SASL (GSSAPI/Kerberos/SSPI, PLAIN, SCRAM, OAUTHBEARER) support
- Full list of supported KIPs
- Broker version support: >=0.8 (see Broker version compatibility)
- Guaranteed API stability for C & C++ APIs (ABI safety guaranteed for C)
- Statistics metrics
- Debian package: librdkafka1 and librdkafka-dev in Debian and Ubuntu
- RPM package: librdkafka and librdkafka-devel
- Gentoo package: dev-libs/librdkafka
- Portable: runs on Linux, OSX, Win32, Solaris, FreeBSD, AIX, ...
github: https://github.com/edenhill/librdkafka
使用rdkafka进行kafka数据生产和消费的代码如下:
#include "rdkafkacpp.h"
static void metadata_print (const std::string &topic,
const RdKafka::Metadata *metadata) {
std::cout << "Metadata for " << (topic.empty() ? "" : "all topics")
<< "(from broker " << metadata->orig_broker_id()
<< ":" << metadata->orig_broker_name() << std::endl;
/* Iterate brokers */
std::cout << " " << metadata->brokers()->size() << " brokers:" << std::endl;
RdKafka::Metadata::BrokerMetadataIterator ib;
for (ib = metadata->brokers()->begin();
ib != metadata->brokers()->end();
++ib) {
std::cout << " broker " << (*ib)->id() << " at "
<< (*ib)->host() << ":" << (*ib)->port() << std::endl;
}
/* Iterate topics */
std::cout << metadata->topics()->size() << " topics:" << std::endl;
RdKafka::Metadata::TopicMetadataIterator it;
for (it = metadata->topics()->begin();
it != metadata->topics()->end();
++it) {
std::cout << " topic ""<< (*it)->topic() << "" with "
<< (*it)->partitions()->size() << " partitions:";
if ((*it)->err() != RdKafka::ERR_NO_ERROR) {
std::cout << " " << err2str((*it)->err());
if ((*it)->err() == RdKafka::ERR_LEADER_NOT_AVAILABLE)
std::cout << " (try again)";
}
std::cout << std::endl;
/* Iterate topic's partitions */
RdKafka::TopicMetadata::PartitionMetadataIterator ip;
for (ip = (*it)->partitions()->begin();
ip != (*it)->partitions()->end();
++ip) {
std::cout << " partition " << (*ip)->id()
<< ", leader " << (*ip)->leader()
<< ", replicas: ";
/* Iterate partition's replicas */
RdKafka::PartitionMetadata::ReplicasIterator ir;
for (ir = (*ip)->replicas()->begin();
ir != (*ip)->replicas()->end();
++ir) {
std::cout << (ir == (*ip)->replicas()->begin() ? "":",") << *ir;
}
/* Iterate partition's ISRs */
std::cout << ", isrs: ";
RdKafka::PartitionMetadata::ISRSIterator iis;
for (iis = (*ip)->isrs()->begin(); iis != (*ip)->isrs()->end() ; ++iis)
std::cout << (iis == (*ip)->isrs()->begin() ? "":",") << *iis;
if ((*ip)->err() != RdKafka::ERR_NO_ERROR)
std::cout << ", " << RdKafka::err2str((*ip)->err()) << std::endl;
else
std::cout << std::endl;
}
}
}
static volatile sig_atomic_t run = 1;
static bool exit_eof = false;
static void sigterm (int sig) {
run = 0;
}
class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb {
public:
void dr_cb (RdKafka::Message &message) {
std::string status_name;
switch (message.status())
{
case RdKafka::Message::MSG_STATUS_NOT_PERSISTED:
status_name = "NotPersisted";
break;
case RdKafka::Message::MSG_STATUS_POSSIBLY_PERSISTED:
status_name = "PossiblyPersisted";
break;
case RdKafka::Message::MSG_STATUS_PERSISTED:
status_name = "Persisted";
break;
default:
status_name = "Unknown?";
break;
}
std::cout << "Message delivery for (" << message.len() << " bytes): " <<
status_name << ": " << message.errstr() << std::endl;
if (message.key())
std::cout << "Key: " << *(message.key()) << ";" << std::endl;
}
};
class ExampleEventCb : public RdKafka::EventCb {
public:
void event_cb (RdKafka::Event &event) {
switch (event.type())
{
case RdKafka::Event::EVENT_ERROR:
if (event.fatal()) {
std::cerr << "FATAL ";
run = 0;
}
std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
event.str() << std::endl;
break;
case RdKafka::Event::EVENT_STATS:
std::cerr << ""STATS": " << event.str() << std::endl;
break;
case RdKafka::Event::EVENT_LOG:
fprintf(stderr, "LOG-%i-%s: %s
",
event.severity(), event.fac().c_str(), event.str().c_str());
break;
default:
std::cerr << "EVENT " << event.type() <<
" (" << RdKafka::err2str(event.err()) << "): " <<
event.str() << std::endl;
break;
}
}
};
/* Use of this partitioner is pretty pointless since no key is provided
* in the produce() call. */
class MyHashPartitionerCb : public RdKafka::PartitionerCb {
public:
int32_t partitioner_cb (const RdKafka::Topic *topic, const std::string *key,
int32_t partition_cnt, void *msg_opaque) {
return djb_hash(key->c_str(), key->size()) % partition_cnt;
}
private:
static inline unsigned int djb_hash (const char *str, size_t len) {
unsigned int hash = 5381;
for (size_t i = 0 ; i < len ; i++)
hash = ((hash << 5) + hash) + str[i];
return hash;
}
};
void msg_consume(RdKafka::Message* message, void* opaque) {
const RdKafka::Headers *headers;
switch (message->err()) {
case RdKafka::ERR__TIMED_OUT:
break;
case RdKafka::ERR_NO_ERROR:
/* Real message */
std::cout << "Read msg at offset " << message->offset() << std::endl;
if (message->key()) {
std::cout << "Key: " << *message->key() << std::endl;
}
headers = message->headers();
if (headers) {
std::vector<RdKafka::Headers::Header> hdrs = headers->get_all();
for (size_t i = 0 ; i < hdrs.size() ; i++) {
const RdKafka::Headers::Header hdr = hdrs[i];
if (hdr.value() != NULL)
printf(" Header: %s = "%.*s"
",
hdr.key().c_str(),
(int)hdr.value_size(), (const char *)hdr.value());
else
printf(" Header: %s = NULL
", hdr.key().c_str());
}
}
printf("%.*s
",
static_cast<int>(message->len()),
static_cast<const char *>(message->payload()));
break;
case RdKafka::ERR__PARTITION_EOF:
/* Last message */
if (exit_eof) {
run = 0;
}
break;
case RdKafka::ERR__UNKNOWN_TOPIC:
case RdKafka::ERR__UNKNOWN_PARTITION:
std::cerr << "Consume failed: " << message->errstr() << std::endl;
run = 0;
break;
default:
/* Errors */
std::cerr << "Consume failed: " << message->errstr() << std::endl;
run = 0;
}
}
class ExampleConsumeCb : public RdKafka::ConsumeCb {
public:
void consume_cb (RdKafka::Message &msg, void *opaque) {
msg_consume(&msg, opaque);
}
};
int main (int argc, char **argv) {
std::string brokers = "localhost";
std::string errstr;
std::string topic_str;
std::string mode;
std::string debug;
int32_t partition = RdKafka::Topic::PARTITION_UA;
int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
bool do_conf_dump = false;
int opt;
MyHashPartitionerCb hash_partitioner;
int use_ccb = 0;
/*
* Create configuration objects
*/
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
while ((opt = getopt(argc, argv, "PCLt:p:b:z:qd:o:eX:AM:f:")) != -1) {
switch (opt) {
case 'P':
case 'C':
case 'L':
mode = opt;
break;
case 't':
topic_str = optarg;
break;
case 'p':
if (!strcmp(optarg, "random"))
/* default */;
else if (!strcmp(optarg, "hash")) {
if (tconf->set("partitioner_cb", &hash_partitioner, errstr) !=
RdKafka::Conf::CONF_OK) {
std::cerr << errstr << std::endl;
exit(1);
}
} else
partition = std::atoi(optarg);
break;
case 'b':
brokers = optarg;
break;
case 'z':
if (conf->set("compression.codec", optarg, errstr) !=
RdKafka::Conf::CONF_OK) {
std::cerr << errstr << std::endl;
exit(1);
}
break;
case 'o':
if (!strcmp(optarg, "end"))
start_offset = RdKafka::Topic::OFFSET_END;
else if (!strcmp(optarg, "beginning"))
start_offset = RdKafka::Topic::OFFSET_BEGINNING;
else if (!strcmp(optarg, "stored"))
start_offset = RdKafka::Topic::OFFSET_STORED;
else
start_offset = strtoll(optarg, NULL, 10);
break;
case 'e':
exit_eof = true;
break;
case 'd':
debug = optarg;
break;
case 'M':
if (conf->set("statistics.interval.ms", optarg, errstr) !=
RdKafka::Conf::CONF_OK) {
std::cerr << errstr << std::endl;
exit(1);
}
break;
case 'X':
{
char *name, *val;
if (!strcmp(optarg, "dump")) {
do_conf_dump = true;
continue;
}
name = optarg;
if (!(val = strchr(name, '='))) {
std::cerr << "%% Expected -X property=value, not " <<
name << std::endl;
exit(1);
}
*val = '