zoukankan      html  css  js  c++  java
  • KafkaStream低级别API

    开发者可以通过Processor接口来实现自己的自定义处理逻辑。接口提供了Process和Punctuate方法。

    其中:Process方法用于处理接受到的消息

    Punctuate方法指定时间间隔周期性的执行,用于处理周期数据:例如某些状态值计算生成 新的流。

    Processor接口还提供了init方法,init初始化方法可以将ProcessorContext转存到Procesor实例中,以供Prounctute使用。

    可以使用context的schedule方法实现punctute的周期性调用。

    将修改后的数据转存到下游处理节点:context.().forward

    体检当前处理节点的处理进度:context.commit.

    代码实例如下:

    public class MyProcessor extends Processor {
    
            private ProcessorContext context;
    
            private KeyValueStore kvStore;
            @Override
            @SuppressWarnings("unchecked")
            public void init(ProcessorContext context) {
                this.context = context;
                this.context.schedule(1000);
                this.kvStore = (KeyValueStore) context.getStateStore("Counts");
            }
            @Override
            public void process(String dummy, String line) {
                String[] words = line.toLowerCase().split(" ");
                for (String word : words) {
                    Integer oldValue = this.kvStore.get(word);
                    if (oldValue == null) {
                        this.kvStore.put(word, 1);
                    } else {
                        this.kvStore.put(word, oldValue + 1);
                    }
                }
            }
            @Override
            public void punctuate(long timestamp) {
                KeyValueIterator iter = this.kvStore.all();
                while (iter.hasNext()) {
                    KeyValue entry = iter.next();
                    context.forward(entry.key, entry.value.toString());
                }
                iter.close();
                context.commit();
            }
            @Override
            public void close() {
                this.kvStore.close();
            }
        };
    

      

    在上边的代码中:

    1、 init方法,定义了每秒调用punctuate方法,将名称为count的状态存储结构中转存到奔processor处理节点中。

    2、 在process方法中,每接受到一条消息,将字符串进行拆分,并更新到状态存储中,生成新的流。

    3、 在puncuate方法中,迭代本地状态存储并将流提交到下个处理节点进行处理。

    1.1    Processor Topology(处理器拓扑)

    通过Processor API定义的自定义的处理器,开发人员将使用TopologyBuilder通过连接这些处理器共同构建一个处理器拓扑。(类似于主方法)

    首先,所有的源节点命名为“SOURCE”并使用addSource方法添加到拓扑中,主题“src-topic”来提供记录(消息)。

    TopologyBuilder builder = new TopologyBuilder();
    builder.addSource("SOURCE", "src-topic")
    .addProcessor("PROCESS1", MyProcessor1::new /* the ProcessorSupplier that can generate MyProcessor1 */, "SOURCE")
    .addProcessor("PROCESS2", MyProcessor2::new /* the ProcessorSupplier that can generate MyProcessor2 */, "PROCESS1")
    .addProcessor("PROCESS3", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")
    .addSink("SINK1", "sink-topic1", "PROCESS1")
    .addSink("SINK2", "sink-topic2", "PROCESS2")
    .addSink("SINK3", "sink-topic3", "PROCESS3");
    

    3个processor节点,使用addProcessor方法添加;这里的第一个processor是”SOURCE”节点的子节点,但是其他两个处理器的父类。

    最后,使用addSink方法将3个sink节点添加到完整的拓扑中。每个管道从不同父类处理器节点输出到不同的topic。

    1.2    本地状态存储

     请注意,Processor API不仅限于当有消息到达时候调用process()方法,也可以保存记录到本地状态仓库(如汇总或窗口连接)。利用这个特性,开发者可以使用StateStore接口定义一个状态仓库(Kafka Streams库也有一些扩展的接口,如KeyValueStore)。在实际开发中,开发者通常不需要从头开始自定义这样的状态仓库,可以很简单使用Stores工厂来设定状态仓库是持久化的或日志备份等。在下面的例子中,创建一个名为”Counts“的持久化的key-value仓库,key类型String和value类型Long。

    StateStoreSupplier countStore = Stores.create("Counts")
        .withKeys(Serdes.String())
        .withValues(Serdes.Long())
        .persistent()
        .build();

    为了利用这些状态仓库,开发者可以在构建处理器拓扑时使用TopologyBuilder.addStateStore方法来创建本地状态,并将它与需要访问它的处理器节点相关联,或者也可以通过

    TopologyBuilder.connectProcessorAndStateStores将创建的状态仓库与现有的处理器节点连接。
      TopologyBuilder builder = new TopologyBuilder();
        builder.addSource("SOURCE", "src-topic")
            .addProcessor("PROCESS1", MyProcessor1::new, "SOURCE")
            // create the in-memory state store "COUNTS" associated with processor "PROCESS1"
        .addStateStore(Stores.create("COUNTS").withStringKeys().withStringValues().inMemory().build(), "PROCESS1")
            .addProcessor("PROCESS2", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")
            .addProcessor("PROCESS3", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")
            // connect the state store "COUNTS" with processor "PROCESS2"
            .connectProcessorAndStateStores("PROCESS2", "COUNTS");
            .addSink("SINK1", "sink-topic1", "PROCESS1")
            .addSink("SINK2", "sink-topic2", "PROCESS2")
            .addSink("SINK3", "sink-topic3", "PROCESS3");
    

      

  • 相关阅读:
    golang-switch结构辨析有话
    不用中间变量交换变量值-golang版
    vue element ui表单验证不通过,滚动到页面上第一个验证失败的输入框位置
    表单校验中使用v-if和v-else来判断是福哦要校验时的注意项
    如何修改本地项目关联的远程仓库地址
    vue-cli3如何访问public文件夹下的静态资源
    Git 命令行的各种退出方式
    elementui表格如何在表头每个列标题后面插入图片用于插入tooltip
    js 把一个二叉树类型的对象转化为普通对象
    element-ui树结构设置默认选中节点时改变传入的数组树结构没有变化
  • 原文地址:https://www.cnblogs.com/likethis/p/9988908.html
Copyright © 2011-2022 走看看