zoukankan      html  css  js  c++  java
  • KafkaStream低级别API

    开发者可以通过Processor接口来实现自己的自定义处理逻辑。接口提供了Process和Punctuate方法。

    其中:Process方法用于处理接受到的消息

    Punctuate方法指定时间间隔周期性的执行,用于处理周期数据:例如某些状态值计算生成 新的流。

    Processor接口还提供了init方法,init初始化方法可以将ProcessorContext转存到Procesor实例中,以供Prounctute使用。

    可以使用context的schedule方法实现punctute的周期性调用。

    将修改后的数据转存到下游处理节点:context.().forward

    体检当前处理节点的处理进度:context.commit.

    代码实例如下:

    public class MyProcessor extends Processor {
    
            private ProcessorContext context;
    
            private KeyValueStore kvStore;
            @Override
            @SuppressWarnings("unchecked")
            public void init(ProcessorContext context) {
                this.context = context;
                this.context.schedule(1000);
                this.kvStore = (KeyValueStore) context.getStateStore("Counts");
            }
            @Override
            public void process(String dummy, String line) {
                String[] words = line.toLowerCase().split(" ");
                for (String word : words) {
                    Integer oldValue = this.kvStore.get(word);
                    if (oldValue == null) {
                        this.kvStore.put(word, 1);
                    } else {
                        this.kvStore.put(word, oldValue + 1);
                    }
                }
            }
            @Override
            public void punctuate(long timestamp) {
                KeyValueIterator iter = this.kvStore.all();
                while (iter.hasNext()) {
                    KeyValue entry = iter.next();
                    context.forward(entry.key, entry.value.toString());
                }
                iter.close();
                context.commit();
            }
            @Override
            public void close() {
                this.kvStore.close();
            }
        };
    

      

    在上边的代码中:

    1、 init方法,定义了每秒调用punctuate方法,将名称为count的状态存储结构中转存到奔processor处理节点中。

    2、 在process方法中,每接受到一条消息,将字符串进行拆分,并更新到状态存储中,生成新的流。

    3、 在puncuate方法中,迭代本地状态存储并将流提交到下个处理节点进行处理。

    1.1    Processor Topology(处理器拓扑)

    通过Processor API定义的自定义的处理器,开发人员将使用TopologyBuilder通过连接这些处理器共同构建一个处理器拓扑。(类似于主方法)

    首先,所有的源节点命名为“SOURCE”并使用addSource方法添加到拓扑中,主题“src-topic”来提供记录(消息)。

    TopologyBuilder builder = new TopologyBuilder();
    builder.addSource("SOURCE", "src-topic")
    .addProcessor("PROCESS1", MyProcessor1::new /* the ProcessorSupplier that can generate MyProcessor1 */, "SOURCE")
    .addProcessor("PROCESS2", MyProcessor2::new /* the ProcessorSupplier that can generate MyProcessor2 */, "PROCESS1")
    .addProcessor("PROCESS3", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")
    .addSink("SINK1", "sink-topic1", "PROCESS1")
    .addSink("SINK2", "sink-topic2", "PROCESS2")
    .addSink("SINK3", "sink-topic3", "PROCESS3");
    

    3个processor节点,使用addProcessor方法添加;这里的第一个processor是”SOURCE”节点的子节点,但是其他两个处理器的父类。

    最后,使用addSink方法将3个sink节点添加到完整的拓扑中。每个管道从不同父类处理器节点输出到不同的topic。

    1.2    本地状态存储

     请注意,Processor API不仅限于当有消息到达时候调用process()方法,也可以保存记录到本地状态仓库(如汇总或窗口连接)。利用这个特性,开发者可以使用StateStore接口定义一个状态仓库(Kafka Streams库也有一些扩展的接口,如KeyValueStore)。在实际开发中,开发者通常不需要从头开始自定义这样的状态仓库,可以很简单使用Stores工厂来设定状态仓库是持久化的或日志备份等。在下面的例子中,创建一个名为”Counts“的持久化的key-value仓库,key类型String和value类型Long。

    StateStoreSupplier countStore = Stores.create("Counts")
        .withKeys(Serdes.String())
        .withValues(Serdes.Long())
        .persistent()
        .build();

    为了利用这些状态仓库,开发者可以在构建处理器拓扑时使用TopologyBuilder.addStateStore方法来创建本地状态,并将它与需要访问它的处理器节点相关联,或者也可以通过

    TopologyBuilder.connectProcessorAndStateStores将创建的状态仓库与现有的处理器节点连接。
      TopologyBuilder builder = new TopologyBuilder();
        builder.addSource("SOURCE", "src-topic")
            .addProcessor("PROCESS1", MyProcessor1::new, "SOURCE")
            // create the in-memory state store "COUNTS" associated with processor "PROCESS1"
        .addStateStore(Stores.create("COUNTS").withStringKeys().withStringValues().inMemory().build(), "PROCESS1")
            .addProcessor("PROCESS2", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")
            .addProcessor("PROCESS3", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")
            // connect the state store "COUNTS" with processor "PROCESS2"
            .connectProcessorAndStateStores("PROCESS2", "COUNTS");
            .addSink("SINK1", "sink-topic1", "PROCESS1")
            .addSink("SINK2", "sink-topic2", "PROCESS2")
            .addSink("SINK3", "sink-topic3", "PROCESS3");
    

      

  • 相关阅读:
    springmvc
    POJ 3683 Priest John's Busiest Day
    POJ 3678 Katu Puzzle
    HDU 1815 Building roads
    CDOJ UESTC 1220 The Battle of Guandu
    HDU 3715 Go Deeper
    HDU 3622 Bomb Game
    POJ 3207 Ikki's Story IV
    POJ 3648 Wedding
    HDU 1814 Peaceful Commission
  • 原文地址:https://www.cnblogs.com/likethis/p/9988908.html
Copyright © 2011-2022 走看看