五 Kafka producer 拦截器(interceptor)
5.1 拦截器原理
Producer 拦截器(interceptor)是在 Kafka 0.10 版本被引入的,主要用于实现 clients 端的定制化控制逻辑。
对于 producer 而言,interceptor 使得用户在消息发送前以及 producer 回调逻辑前有机会
对消息做一些定制化需求,比如修改消息等。同时,producer 允许用户指定多个 interceptor
按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor 的实现接口是
org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:
(1)configure(configs) 获取配置信息和初始化数据时调用。
(2)onSend(ProducerRecord):该方法封装进 KafkaProducer.send 方法中,即它运行在用户主线程中。Producer 确保在
消息被序列化以计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保
证不要修改消息所属的 topic 和分区,否则会影响目标分区的计算
(3)onAcknowledgement(RecordMetadata, Exception):
该方法会在消息被应答之前或消息发送失败时调用,并且通常都是在 producer 回调逻辑
触发之前。onAcknowledgement 运行在 producer 的 IO 线程中,因此不要在该方法中放入很
重的逻辑,否则会拖慢 producer 的消息发送效率
(4)close:
关闭 interceptor,主要用于执行一些资源清理工作
如前所述,interceptor 可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个 interceptor,则 producer 按照指定顺序调用它们,并仅仅是捕获每个 interceptor 可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。
5.2 拦截器案例
1)需求:
实现一个简单的双 interceptor 组成的拦截链。
第一个 interceptor 会在消息发送前将时间戳信息加到消息 value 的最前部;
第二个 interceptor 会在消息发送后更新成功发送消息数或失败发送消息数。
2)案例实操
(1)增加时间戳拦截器
package cn.bw.kafka.interceptor;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class TimeInterceptor implements ProducerInterceptor<String, String> {
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record)
{
// 创建一个新的 record,把时间戳写入消息体的最前部
return new ProducerRecord(record.topic(), record.partition(), record.timestamp(),
record.key(),
System.currentTimeMillis() + "," + record.value().toString());
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
}
(2)统计发送消息成功和发送失败消息数,并在 producer 关闭时打印这两个计数器
package cn.bw.kafka.interceptor;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class CounterInterceptor implements ProducerInterceptor<String, String>{
private int errorCounter = 0;
private int successCounter = 0;
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record)
{
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
// 统计成功和失败的次数
if (exception == null) {
successCounter++;
} else {
errorCounter++;
}
}
@Override
public void close() {
// 保存结果
System.out.println("Successful sent: " + successCounter);
System.out.println("Failed sent: " + errorCounter);
}
}
(3)producer 主程序
package cn.bw.kafka.interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
public class InterceptorProducer {
public static void main(String[] args) throws Exception {
// 1 设置配置信息
Properties props = new Properties();
props.put("bootstrap.servers", "node01:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
// 2 构建拦截链
List<String> interceptors = new ArrayList<>();
interceptors.add("cn.bw.kafka.interceptor.TimeInterceptor");
interceptors.add("cn.bw.kafka.interceptor.CounterInterceptor");
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
String topic = "first";
Producer<String, String> producer = new KafkaProducer<>(props);
// 3 发送消息
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic,"message" + i);
producer.send(record);
}
// 4 一定要关闭 producer,这样才会调用 interceptor 的 close 方法
producer.close();
}
}
3)测试
(1)在 kafka 上启动消费者,然后运行客户端 java 程序。
[hadoop@node01 kafka]$ in/kafka-console-consumer.sh --zookeeper node01:2181 --from-beginning --topic first
1501904047034,message0
1501904047225,message1
1501904047230,message2
1501904047234,message3
1501904047236,message4
1501904047240,message5
1501904047243,message6
1501904047246,message7
1501904047249,message8
1501904047252,message9
(2)观察 java 平台控制台输出数据如下:
Successful sent: 10
Failed sent: 0
六 kafka Streams
6.1 概述
6.1.1 Kafka Streams (数据处理)
Kafka Streams。Apache Kafka 开源项目的一个组成部分。是一个功能强大,易于使用的
库。用于在 Kafka 上构建高可分布式、拓展性,容错的应用程序。
6.1.2 Kafka Streams 特点
1)功能强大
高扩展性,弹性,容错
2)轻量级
无需专门的集群
一个库,而不是框架
3)完全集成
100%的 Kafka 0.10.0 版本兼容
易于集成到现有的应用程序
4)实时性
毫秒级延迟
并非微批处理
窗口允许乱序数据
2
6.1.3 为什么要有 Kafka Stream
当前已经有非常多的流式处理系统,最知名且应用最多的开源流式处理系统有 Spark
Streaming 和 Apache Storm。Apache Storm 发展多年,应用广泛,提供记录级别的处理能力,
当前也支持 SQL on Stream。而 Spark Streaming 基于 Apache Spark,可以非常方便与图计算,
SQL 处理等集成,功能强大,对于熟悉其它 Spark 应用开发的用户而言使用门槛低。另外,
目前主流的 Hadoop 发行版,如 Cloudera 和 Hortonworks,都集成了 Apache Storm 和 Apache
Spark,使得部署更容易。
既然 Apache Spark 与 Apache Storm 拥用如此多的优势,那为何还需要 Kafka Stream 呢?笔者认为主要有如下原因。
第一,Spark 和 Storm 都是流式处理框架,而 Kafka Stream 提供的是一个基于 Kafka 的
流式处理类库。框架要求开发者按照特定的方式去开发逻辑部分,供框架调用。开发者很难
了解框架的具体运行方式,从而使得调试成本高,并且使用受限。而 Kafka Stream 作为流式
处理类库,直接提供具体的类给开发者调用,整个应用的运行方式主要由开发者控制,方便使用和调试。
第二,虽然 Cloudera 与 Hortonworks 方便了 Storm 和 Spark 的部署,但是这些框架的部
署仍然相对复杂。而 Kafka Stream 作为类库,可以非常方便的嵌入应用程序中,它对应用的
打包和部署基本没有任何要求。
第三,就流式处理系统而言,基本都支持 Kafka 作为数据源。例如 Storm 具有专门的
kafka-spout,而 Spark 也提供专门的 spark-streaming-kafka 模块。事实上,Kafka 基本上是主
流的流式处理系统的标准数据源。换言之,大部分流式系统中都已部署了 Kafka,此时使用Kafka Stream 的成本非常低。
第四,使用 Storm 或 Spark Streaming 时,需要为框架本身的进程预留资源,如 Storm
的 supervisor 和 Spark on YARN 的 node manager。即使对于应用实例而言,框架本身也会占
用部分资源,如 Spark Streaming 需要为 shuffle 和 storage 预留内存。但是 Kafka 作为类库不占用系统资源。
第五,由于 Kafka 本身提供数据持久化,因此 Kafka Stream 提供滚动部署和滚动升级以及重新计算的能力。
第六,由于 Kafka Consumer Rebalance 机制,Kafka Stream 可以在线动态调整并行度。
6.2 Kafka Stream 案例
6.2.1 eclipse 打包插件安装
1)将net.sf.fjep.fatjar_0.0.32.jar拷贝到eclipse安装目录中的plugins目录下,然后重启eclipse即可。
2)插件使用方法
6.2.2 数据清洗案例
0)需求:
实时处理单词带有”>>>”前缀的内容。例如输入”hadoop>>>ximenqing”,最终处理成 “ximenqing”
注:如果使用 maven,加入如下依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.0.0</version>
</dependency>
1)创建主类
package cn.bw.kafka.stream;
import java.util.Properties;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.TopologyBuilder;
public class Application {
public static void main(String[] args) {
// 定义输入的 topic
String from = "first";
// 定义输出的 topic
String to = "second";
// 设置参数
Properties settings = new Properties();
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter");
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092");
StreamsConfig config = new StreamsConfig(settings);
// 构建拓扑
//0.10 版本如下
//TopologyBuilder builder = new TopologyBuilder();
//2.0 版本如下
Topology builder = new Topology();
builder.addSource("SOURCE", from).addProcessor("PROCESS", new ProcessorSupplier<byte[], byte[]>() {
@Override
public Processor<byte[], byte[]> get() {
// 具体分析处理
return new LogProcessor();
}
}, "SOURCE").addSink("SINK", to, "PROCESS");
// 创建 kafka stream
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
}
}
2)具体业务处理
package cn.bw.kafka.stream;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
public class LogProcessor implements Processor<byte[], byte[]> {
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(byte[] key, byte[] value) {
String input = new String(value);
// 如果包含“>>>”则只保留该标记后面的内容
if (input.contains(">>>")) {
input = input.split(">>>")[1].trim();
// 输出到下一个 topic
context.forward("logProcessor".getBytes(), input.getBytes());
}else{
context.forward("logProcessor".getBytes(), input.getBytes());
}
}
@Override
public void punctuate(long timestamp) {
}
@Override
public void close() {
}
}
4) 将程序用 eclipse 插件打成 jar 包
5) 创建 first topic
./kafka-topics.sh --create --topic first --replication-factor 2 --partitions 3 --zookeeper node01:2181
6) 创建 second topic
./kafka-topics.sh --create --topic second --replication-factor 2 --partitions 3 --zookeeper node01:2181
7) 验证 topic 是否创建完成
./kafka-topics.sh --list --zookeeper node01:2181
8)将 jar 包拷贝 node01 上运行
[hadoop@node01 kafka]$ java -jar kafka0508_fat.jar cn.bw.kafka.stream.Application
注:maven 插件如果没有指定运行主类使用下面的命令运行:
java -cp processor.jar com.bw.kafka.streams.LogFilter
5)在 node02 上启动生产者
[hadoop@node02 kafka]$ bin/kafka-console-producer.sh --broker-list node02:9092 --topic first
>hello>>>world
>h>>>hadoop
>hahaha
6)在 node03 上启动消费者
[hadoop@node03 kafka]$ bin/kafka-console-consumer.sh --zookeeper node03:2181 --from-beginning --topic second
world
hadoop
hahaha