zoukankan      html  css  js  c++  java
  • kafka stream 低级别的Processor API动态生成拓扑图

    public class KafkaSream {
    
        public static void main(String[] args) {
    
            Map<String, Object> props = new HashMap<String, Object>();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put("default.timestamp.extractor", WallclockTimestampExtractor.class);
            props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
            props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
            StreamsConfig config = new StreamsConfig(props);
    
            KStreamBuilder builder = new KStreamBuilder();
            //builder.stream("my-topic").mapValues(value -> value.toString()+"gyw").to("my-topics");
    
            ProcessorSupplier p = new ProcessorSupplier() {
                @Override
                public Processor get() {
                    try {
                        return Factory.getProcessor();
                    } catch (ClassNotFoundException e) {
                        e.printStackTrace();
                        return null;
                    } catch (IllegalAccessException e) {
                        e.printStackTrace();
                        return null;
                    } catch (InstantiationException e) {
                        e.printStackTrace();
                        return null;
                    }
    
                }
            };
    
            builder.addSource("SOURCE", "chinaws__contents")
                    // 添加第一个PROCESSOR,param1 定义一个processor名称,param2 processor实现类,param3 指定一个父名称
                    .addProcessor("PROCESS1", p , "SOURCE")
                    // 添加第二个PROCESSOR,param1 定义一个processor名称, param2 processor实现类,param3 指定一个父名称
                    .addProcessor("PROCESS2", MyProcessor2::new, "PROCESS1")
                    // 添加第三个PROCESSOR,param1 定义一个processor名称, param2 processor实现类,param3 指定一个父名称
                   // .addProcessor("PROCESS3", MyProcessorC::new, "PROCESS2")
    
                    // 最后添加SINK位置,param1 定义一个sink名称,param2 指定一个输出TOPIC,param3 指定接收哪一个PROCESSOR的数据
                    .addSink("SINK1", "topicA", "PROCESS2");
                    //.addSink("SINK2", "topicB", "PROCESS2")
                    //.addSink("SINK3", "topicC", "PROCESS3");
    
    
            KafkaStreams streams = new KafkaStreams(builder, config);
            streams.start();
        }
    }
    package com.bonc.kafka;
    
    import org.apache.kafka.streams.processor.Processor;
    
    public class Factory {
    
        public static Processor getProcessor() throws ClassNotFoundException, IllegalAccessException, InstantiationException {
    
            Class<?> processor = Class.forName("com.bonc.kafka.MyProcessor");
    
            Object o = processor.newInstance();
    
            return (Processor)o;
        }
    
    
    
    }

     方法二:

    package com.bonc.kafka;
    
    import org.apache.kafka.streams.processor.Processor;
    import org.apache.kafka.streams.processor.ProcessorSupplier;
    
    public class SupplierFactory implements ProcessorSupplier {
    
        private String className;
    
        public SupplierFactory(String className){
            this.className = className;
        }
        @Override
        public Processor get() {
            Class<?> processor = null;
            Object o = null;
            try {
                processor = Class.forName(className);
                o = processor.newInstance();
            } catch (Exception e) {
                e.printStackTrace();
            }
            return (Processor)o;
        }
    }
    
    
    添加拓扑:addProcessor("PROCESS1", new SupplierFactory("com.bonc.kafka.MyProcessor") , "SOURCE")
     
    欢迎指正,交流沟通,共同进步!对您有帮助的话点下推荐~~
  • 相关阅读:
    listview item 动画
    android sqlite blob
    Python3 配置文件(configparser)(转载)
    python之字符串格式化(format)
    PHP模拟发送POST请求之一、HTTP协议头部解析
    用HTML/JS/PHP方式实现页面延时跳转
    用memoization优化递归算法[JS/PHP实现]
    开通博客,记录一下。
    SpringMvc Json LocalDateTime 互转,form urlencoded @ModelAttribute 转换
    Springdata mongodb 版本兼容 引起 Error [The 'cursor' option is required, except for aggregate with the explain argument
  • 原文地址:https://www.cnblogs.com/gaoyawei/p/8144491.html
Copyright © 2011-2022 走看看