zoukankan      html  css  js  c++  java
  • kafka stream 低级别的Processor API动态生成拓扑图

    public class KafkaSream {
    
        public static void main(String[] args) {
    
            Map<String, Object> props = new HashMap<String, Object>();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put("default.timestamp.extractor", WallclockTimestampExtractor.class);
            props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
            props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
            StreamsConfig config = new StreamsConfig(props);
    
            KStreamBuilder builder = new KStreamBuilder();
            //builder.stream("my-topic").mapValues(value -> value.toString()+"gyw").to("my-topics");
    
            ProcessorSupplier p = new ProcessorSupplier() {
                @Override
                public Processor get() {
                    try {
                        return Factory.getProcessor();
                    } catch (ClassNotFoundException e) {
                        e.printStackTrace();
                        return null;
                    } catch (IllegalAccessException e) {
                        e.printStackTrace();
                        return null;
                    } catch (InstantiationException e) {
                        e.printStackTrace();
                        return null;
                    }
    
                }
            };
    
            builder.addSource("SOURCE", "chinaws__contents")
                    // 添加第一个PROCESSOR,param1 定义一个processor名称,param2 processor实现类,param3 指定一个父名称
                    .addProcessor("PROCESS1", p , "SOURCE")
                    // 添加第二个PROCESSOR,param1 定义一个processor名称, param2 processor实现类,param3 指定一个父名称
                    .addProcessor("PROCESS2", MyProcessor2::new, "PROCESS1")
                    // 添加第三个PROCESSOR,param1 定义一个processor名称, param2 processor实现类,param3 指定一个父名称
                   // .addProcessor("PROCESS3", MyProcessorC::new, "PROCESS2")
    
                    // 最后添加SINK位置,param1 定义一个sink名称,param2 指定一个输出TOPIC,param3 指定接收哪一个PROCESSOR的数据
                    .addSink("SINK1", "topicA", "PROCESS2");
                    //.addSink("SINK2", "topicB", "PROCESS2")
                    //.addSink("SINK3", "topicC", "PROCESS3");
    
    
            KafkaStreams streams = new KafkaStreams(builder, config);
            streams.start();
        }
    }
    package com.bonc.kafka;
    
    import org.apache.kafka.streams.processor.Processor;
    
    public class Factory {
    
        public static Processor getProcessor() throws ClassNotFoundException, IllegalAccessException, InstantiationException {
    
            Class<?> processor = Class.forName("com.bonc.kafka.MyProcessor");
    
            Object o = processor.newInstance();
    
            return (Processor)o;
        }
    
    
    
    }

     方法二:

    package com.bonc.kafka;
    
    import org.apache.kafka.streams.processor.Processor;
    import org.apache.kafka.streams.processor.ProcessorSupplier;
    
    public class SupplierFactory implements ProcessorSupplier {
    
        private String className;
    
        public SupplierFactory(String className){
            this.className = className;
        }
        @Override
        public Processor get() {
            Class<?> processor = null;
            Object o = null;
            try {
                processor = Class.forName(className);
                o = processor.newInstance();
            } catch (Exception e) {
                e.printStackTrace();
            }
            return (Processor)o;
        }
    }
    
    
    添加拓扑:addProcessor("PROCESS1", new SupplierFactory("com.bonc.kafka.MyProcessor") , "SOURCE")
     
    欢迎指正,交流沟通,共同进步!对您有帮助的话点下推荐~~
  • 相关阅读:
    POJ3094 UVALive3594 HDU2734 ZOJ2812 Quicksum【进制】
    UVALive5583 UVA562 Dividing coins
    POJ1979 HDU1312 Red and Black【DFS】
    POJ1979 HDU1312 Red and Black【DFS】
    POJ2386 Lake Counting【DFS】
    POJ2386 Lake Counting【DFS】
    HDU4394 Digital Square
    HDU4394 Digital Square
    UVA213 UVALive5152 Message Decoding
    UVA213 UVALive5152 Message Decoding
  • 原文地址:https://www.cnblogs.com/gaoyawei/p/8144491.html
Copyright © 2011-2022 走看看