zoukankan      html  css  js  c++  java
  • kafka-stream数据清洗

    1、数据清洗业务类LogProcessor

    package com.css.kafka.kafka_stream;
    
    import org.apache.kafka.streams.processor.Processor;
    import org.apache.kafka.streams.processor.ProcessorContext;
    
    /**
     * 数据清洗*/
    public class LogProcessor implements Processor<byte[], byte[]>{
    
        private ProcessorContext context;
        
        //初始化
        public void init(ProcessorContext context) {
            //传输
            this.context = context;
        }
    
        //具体业务逻辑
        public void process(byte[] key, byte[] value) {
            //1.拿到消息数据,转成字符串
            String message = new String(value);
            
            //2.如果包含-  去除
            if (message.contains("-")) {
                //3.把- 去掉 之后去掉左侧数据
                message = message.split("-")[1];
            }
            //4.发送数据
            context.forward(key, message.getBytes());
        }
    
        //释放资源
        public void close() {
        }
    }

    2、Application类

    package com.css.kafka.kafka_stream;
    
    import java.util.Properties;
    
    import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.Topology;
    import org.apache.kafka.streams.processor.Processor;
    import org.apache.kafka.streams.processor.ProcessorSupplier;
    
    /**
     * 需求:对数据进行清洗操作
     * 
     * 思路:wo-henshuai  把-和wo清洗掉*/
    public class Application {
    
        public static void main(String[] args) {
            //1.定义主题 发送到 另外一个主题中 数据清洗
            String oneTopic = "t1";
            String twoTopic = "t2";
            
            //2.设置属性
            Properties prop = new Properties();
            prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "logProcessor");
            prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.146.132:9092,192.168.146.133:9092,192.168.146.134:9092");
            
            //3.实例对象
            StreamsConfig config = new StreamsConfig(prop);
            
            //4.流计算 拓扑
            Topology builder = new Topology();
            
            //5.定义kafka组件数据源
            builder.addSource("Source", oneTopic).addProcessor("Processor", new ProcessorSupplier<byte[], byte[]>() {
    
                public Processor<byte[], byte[]> get() {
                    return new LogProcessor();
                }
                //从哪里来
            }, "Source")
            //到哪里去
            .addSink("Sink", twoTopic, "Processor");
    
            //6.实例化kafkaStream
            KafkaStreams kafkaStreams = new KafkaStreams(builder, prop);
            kafkaStreams.start();
        }
    }

    3、运行Application类的main方法

    4、在hd09-1机器上创建主题t1

    bin/kafka-topics.sh --zookeeper hd09-1:2181 --create --replication-factor 3 --partition 1 --topic t1

    5、在hd09-2机器上启动消费者

    bin/kafka-console-consumer.sh --bootstrap-server hd09-2:9092 --topic t2 --from-beginning --consumer.config config/consumer.properties

    6、在hd09-1机器上启动生产者

    bin/kafka-console-producer.sh --broker-list hd09-1:9092 --topic t1

    7、此时在hd09-1机器kafka生产者上输入  wo-henshuai,在hd09-2消费者机器上会显示henshuai,即完成了数据清洗,如下图。

  • 相关阅读:
    转:无线AP模式之无线AP Client客户端模式的应用体验(一)
    Bridge mode
    无线组网(六)——11n无线路由器WDS功能应用举例
    NETGEAR无线路由器WDS功能介绍
    Memory Access vs CPU Speed_你真的了解CPU和内存吗?
    Readyboost功能
    分析:新技术解决服务器内三大I/O瓶颈
    TPLINK Mini系列无线路由器设置指南(三)——Repeater模式
    smartbit网络性能测试介绍
    低端路由器和高端路由的区别
  • 原文地址:https://www.cnblogs.com/areyouready/p/10139806.html
Copyright © 2011-2022 走看看