zoukankan      html  css  js  c++  java
  • 五 、 Kafka producer 拦截器(interceptor) 和 六 、Kafka Streaming案例

    五 Kafka producer 拦截器(interceptor)
      5.1 拦截器原理
        Producer 拦截器(interceptor)是在 Kafka 0.10 版本被引入的,主要用于实现 clients 端的定制化控制逻辑。
        对于 producer 而言,interceptor 使得用户在消息发送前以及 producer 回调逻辑前有机会
          对消息做一些定制化需求,比如修改消息等。同时,producer 允许用户指定多个 interceptor
          按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor 的实现接口是
        org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:
        (1)configure(configs) 获取配置信息和初始化数据时调用。
        (2)onSend(ProducerRecord):该方法封装进 KafkaProducer.send 方法中,即它运行在用户主线程中。Producer 确保在
                        消息被序列化以计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保
                        证不要修改消息所属的 topic 和分区,否则会影响目标分区的计算
        (3)onAcknowledgement(RecordMetadata, Exception):
                  该方法会在消息被应答之前或消息发送失败时调用,并且通常都是在 producer 回调逻辑
                  触发之前。onAcknowledgement 运行在 producer 的 IO 线程中,因此不要在该方法中放入很
                  重的逻辑,否则会拖慢 producer 的消息发送效率
        (4)close
            关闭 interceptor,主要用于执行一些资源清理工作
            如前所述,interceptor 可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个 interceptor,则 producer         按照指定顺序调用它们,并仅仅是捕获每个 interceptor 可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。
      5.2 拦截器案例
          1)需求:
            实现一个简单的双 interceptor 组成的拦截链。
            第一个 interceptor 会在消息发送前将时间戳信息加到消息 value 的最前部;
            第二个 interceptor 会在消息发送后更新成功发送消息数或失败发送消息数。
                                              
          2)案例实操
            (1)增加时间戳拦截器
                package cn.bw.kafka.interceptor;
                import java.util.Map;
                import org.apache.kafka.clients.producer.ProducerInterceptor;
                import org.apache.kafka.clients.producer.ProducerRecord;
                import org.apache.kafka.clients.producer.RecordMetadata;
                public class TimeInterceptor implements ProducerInterceptor<String, String> {
                @Override
                public void configure(Map<String, ?> configs) {
                }
                @Override
                public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record)
                {
                // 创建一个新的 record,把时间戳写入消息体的最前部
                return new ProducerRecord(record.topic(), record.partition(), record.timestamp(),
                record.key(),
                  System.currentTimeMillis() + "," + record.value().toString());
                }
                @Override
                public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
                }
                @Override
                public void close() {
                }
                }
    (2)统计发送消息成功和发送失败消息数,并在 producer 关闭时打印这两个计数器
                package cn.bw.kafka.interceptor;
                import java.util.Map;
                import org.apache.kafka.clients.producer.ProducerInterceptor;
                import org.apache.kafka.clients.producer.ProducerRecord;
                import org.apache.kafka.clients.producer.RecordMetadata;
                public class CounterInterceptor implements ProducerInterceptor<String, String>{
                    private int errorCounter = 0;
                    private int successCounter = 0;
                    @Override
                    public void configure(Map<String, ?> configs) {
                    }
                    @Override
                    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record)
                    {    
                    return record;
                    }
                    @Override
                    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
                        // 统计成功和失败的次数
                        if (exception == null) {
                          successCounter++;
                        } else {
                          errorCounter++;
                        }
                      }
                    @Override
                    public void close() {
                      // 保存结果
                      System.out.println("Successful sent: " + successCounter);
                      System.out.println("Failed sent: " + errorCounter);
                      }
                    }
        (3)producer 主程序
              package cn.bw.kafka.interceptor;
              import java.util.ArrayList;
              import java.util.List;
              import java.util.Properties;
              import org.apache.kafka.clients.producer.KafkaProducer;
              import org.apache.kafka.clients.producer.Producer;
              import org.apache.kafka.clients.producer.ProducerConfig;
              import org.apache.kafka.clients.producer.ProducerRecord;
              public class InterceptorProducer {
                public static void main(String[] args) throws Exception {
                   // 1 设置配置信息
                  Properties props = new Properties();
                  props.put("bootstrap.servers", "node01:9092");
                  props.put("acks", "all");
                  props.put("retries", 0);
                  props.put("batch.size", 16384);
                  props.put("linger.ms", 1);
                  props.put("buffer.memory", 33554432);
                  props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
                  props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
                    // 2 构建拦截链
                   List<String> interceptors = new ArrayList<>();
                   interceptors.add("cn.bw.kafka.interceptor.TimeInterceptor");
                   interceptors.add("cn.bw.kafka.interceptor.CounterInterceptor");
                   props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
                   String topic = "first";
                   Producer<String, String> producer = new KafkaProducer<>(props);
                    // 3 发送消息
                   for (int i = 0; i < 10; i++) {
                        ProducerRecord<String, String> record = new ProducerRecord<>(topic,"message" + i);
                        producer.send(record);
                    }
                    // 4 一定要关闭 producer,这样才会调用 interceptor 的 close 方法
                    producer.close();
                      }
                    }
      3)测试
        (1)在 kafka 上启动消费者,然后运行客户端 java 程序。
            [hadoop@node01 kafka]$ in/kafka-console-consumer.sh --zookeeper node01:2181 --from-beginning --topic first
            1501904047034,message0
            1501904047225,message1
            1501904047230,message2
            1501904047234,message3
            1501904047236,message4
            1501904047240,message5
            1501904047243,message6
            1501904047246,message7
            1501904047249,message8
            1501904047252,message9
        (2)观察 java 平台控制台输出数据如下:
            Successful sent: 10
            Failed sent: 0
    六 kafka Streams
        6.1 概述
            6.1.1 Kafka Streams (数据处理)
              Kafka Streams。Apache Kafka 开源项目的一个组成部分。是一个功能强大,易于使用的
              库。用于在 Kafka 上构建高可分布式、拓展性,容错的应用程序。
            6.1.2 Kafka Streams 特点
              1)功能强大
                高扩展性,弹性,容错
              2)轻量级
                无需专门的集群
                一个库,而不是框架
              3)完全集成
                 100%的 Kafka 0.10.0 版本兼容
                  易于集成到现有的应用程序
              4)实时性
                  毫秒级延迟
                  并非微批处理
                  窗口允许乱序数据
                  2
            6.1.3 为什么要有 Kafka Stream
                  当前已经有非常多的流式处理系统,最知名且应用最多的开源流式处理系统有 Spark
              Streaming 和 Apache Storm。Apache Storm 发展多年,应用广泛,提供记录级别的处理能力,
              当前也支持 SQL on Stream。而 Spark Streaming 基于 Apache Spark,可以非常方便与图计算,
              SQL 处理等集成,功能强大,对于熟悉其它 Spark 应用开发的用户而言使用门槛低。另外,
              目前主流的 Hadoop 发行版,如 Cloudera 和 Hortonworks,都集成了 Apache Storm 和 Apache
              Spark,使得部署更容易。
                  既然 Apache Spark 与 Apache Storm 拥用如此多的优势,那为何还需要 Kafka Stream 呢?笔者认为主要有如下原因。
                    第一,Spark 和 Storm 都是流式处理框架,而 Kafka Stream 提供的是一个基于 Kafka 的
                       流式处理类库。框架要求开发者按照特定的方式去开发逻辑部分,供框架调用。开发者很难
                       了解框架的具体运行方式,从而使得调试成本高,并且使用受限。而 Kafka Stream 作为流式
                       处理类库,直接提供具体的类给开发者调用,整个应用的运行方式主要由开发者控制,方便使用和调试。
                          
                    第二,虽然 Cloudera 与 Hortonworks 方便了 Storm 和 Spark 的部署,但是这些框架的部
                        署仍然相对复杂。而 Kafka Stream 作为类库,可以非常方便的嵌入应用程序中,它对应用的
                        打包和部署基本没有任何要求。
                    第三,就流式处理系统而言,基本都支持 Kafka 作为数据源。例如 Storm 具有专门的
                        kafka-spout,而 Spark 也提供专门的 spark-streaming-kafka 模块。事实上,Kafka 基本上是主
                        流的流式处理系统的标准数据源。换言之,大部分流式系统中都已部署了 Kafka,此时使用Kafka Stream 的成本非常低。
                    第四,使用 Storm 或 Spark Streaming 时,需要为框架本身的进程预留资源,如 Storm
                        的 supervisor 和 Spark on YARN 的 node manager。即使对于应用实例而言,框架本身也会占
                        用部分资源,如 Spark Streaming 需要为 shuffle 和 storage 预留内存。但是 Kafka 作为类库不占用系统资源。
                    第五,由于 Kafka 本身提供数据持久化,因此 Kafka Stream 提供滚动部署和滚动升级以及重新计算的能力。
                    第六,由于 Kafka Consumer Rebalance 机制,Kafka Stream 可以在线动态调整并行度。
      6.2 Kafka Stream 案例
            6.2.1 eclipse 打包插件安装
              1)将net.sf.fjep.fatjar_0.0.32.jar拷贝到eclipse安装目录中的plugins目录下,然后重启eclipse即可。
              2)插件使用方法
                                                                      
                                             

                                           

          

        6.2.2 数据清洗案例
              0)需求:
                 实时处理单词带有”>>>”前缀的内容。例如输入”hadoop>>>ximenqing”,最终处理成   “ximenqing”
                                      
              注:如果使用 maven,加入如下依赖
                <dependency>
                  <groupId>org.apache.kafka</groupId>
                  <artifactId>kafka-streams</artifactId>
                  <version>2.0.0</version>
                </dependency>
            1)创建主类
                package cn.bw.kafka.stream;
                import java.util.Properties;
                import org.apache.kafka.streams.KafkaStreams;
                import org.apache.kafka.streams.StreamsConfig;
                import org.apache.kafka.streams.processor.Processor;
                import org.apache.kafka.streams.processor.ProcessorSupplier;
                import org.apache.kafka.streams.processor.TopologyBuilder;
                public class Application {
                  public static void main(String[] args) {
                    // 定义输入的 topic
                    String from = "first";
                    // 定义输出的 topic
                    String to = "second";
                    // 设置参数
                    Properties settings = new Properties();
                    settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter");
                    settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092");
                    StreamsConfig config = new StreamsConfig(settings);
                    // 构建拓扑
                    //0.10 版本如下
                    //TopologyBuilder builder = new TopologyBuilder();
                    //2.0 版本如下
                    Topology builder = new Topology();
                    builder.addSource("SOURCE", from).addProcessor("PROCESS", new ProcessorSupplier<byte[], byte[]>() {
                    @Override
                    public Processor<byte[], byte[]> get() {
                      // 具体分析处理
                      return new LogProcessor();
                      }
                    }, "SOURCE").addSink("SINK", to, "PROCESS");
                      // 创建 kafka stream
                    KafkaStreams streams = new KafkaStreams(builder, config);
                    streams.start();
                    }
                  }
              2)具体业务处理
    package cn.bw.kafka.stream;
    import org.apache.kafka.streams.processor.Processor;
    import org.apache.kafka.streams.processor.ProcessorContext;
    public class LogProcessor implements Processor<byte[], byte[]> {
    private ProcessorContext context;
    @Override
    public void init(ProcessorContext context) {
    this.context = context;
    }
    @Override
    public void process(byte[] key, byte[] value) {
    String input = new String(value);
    // 如果包含“>>>”则只保留该标记后面的内容
    if (input.contains(">>>")) {
    input = input.split(">>>")[1].trim();
    // 输出到下一个 topic
    context.forward("logProcessor".getBytes(), input.getBytes());
    }else{
    context.forward("logProcessor".getBytes(), input.getBytes());
    }
    }
    @Override
    public void punctuate(long timestamp) {
    }
    @Override
    public void close() {
    }
    }
              4) 将程序用 eclipse 插件打成 jar 包
              5) 创建 first topic
                  ./kafka-topics.sh --create --topic first --replication-factor 2 --partitions 3 --zookeeper node01:2181
              6) 创建 second topic
                  ./kafka-topics.sh --create --topic second --replication-factor 2 --partitions 3 --zookeeper node01:2181
              7) 验证 topic 是否创建完成
                  ./kafka-topics.sh --list --zookeeper node01:2181
              8)将 jar 包拷贝 node01 上运行  
                    [hadoop@node01 kafka]$ java -jar kafka0508_fat.jar cn.bw.kafka.stream.Application
                    注:maven 插件如果没有指定运行主类使用下面的命令运行:
                    java -cp processor.jar com.bw.kafka.streams.LogFilter
            5)在 node02 上启动生产者
                [hadoop@node02 kafka]$ bin/kafka-console-producer.sh --broker-list node02:9092 --topic first
                >hello>>>world
                >h>>>hadoop
                >hahaha
            6)在 node03 上启动消费者
                [hadoop@node03 kafka]$ bin/kafka-console-consumer.sh --zookeeper node03:2181 --from-beginning --topic second
                world
                hadoop
                hahaha
  • 相关阅读:
    国家语言,语言代码,locale id对应表
    SpringMVC,SpringBoot文件下载
    lombok使用基础教程
    博客园主题修改中用到的css属性
    Hexo next博客添加折叠块功能添加折叠代码块
    IntelIj IDEA运行JUnit Test OutOfMemoryError
    Spring Boot-JPA、Hibernate、Spring data jpa之间的关系
    IntelliJ IDEA-设置字体大小
    win10-mysql卸载干净
    IntelliJ IDEA Check out from git
  • 原文地址:https://www.cnblogs.com/JBLi/p/11555941.html
Copyright © 2011-2022 走看看