zoukankan      html  css  js  c++  java
  • kafka-stream数据清洗

    1、数据清洗业务类LogProcessor

    package com.css.kafka.kafka_stream;
    
    import org.apache.kafka.streams.processor.Processor;
    import org.apache.kafka.streams.processor.ProcessorContext;
    
    /**
     * 数据清洗*/
    public class LogProcessor implements Processor<byte[], byte[]>{
    
        private ProcessorContext context;
        
        //初始化
        public void init(ProcessorContext context) {
            //传输
            this.context = context;
        }
    
        //具体业务逻辑
        public void process(byte[] key, byte[] value) {
            //1.拿到消息数据,转成字符串
            String message = new String(value);
            
            //2.如果包含-  去除
            if (message.contains("-")) {
                //3.把- 去掉 之后去掉左侧数据
                message = message.split("-")[1];
            }
            //4.发送数据
            context.forward(key, message.getBytes());
        }
    
        //释放资源
        public void close() {
        }
    }

    2、Application类

    package com.css.kafka.kafka_stream;
    
    import java.util.Properties;
    
    import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.Topology;
    import org.apache.kafka.streams.processor.Processor;
    import org.apache.kafka.streams.processor.ProcessorSupplier;
    
    /**
     * 需求:对数据进行清洗操作
     * 
     * 思路:wo-henshuai  把-和wo清洗掉*/
    public class Application {
    
        public static void main(String[] args) {
            //1.定义主题 发送到 另外一个主题中 数据清洗
            String oneTopic = "t1";
            String twoTopic = "t2";
            
            //2.设置属性
            Properties prop = new Properties();
            prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "logProcessor");
            prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.146.132:9092,192.168.146.133:9092,192.168.146.134:9092");
            
            //3.实例对象
            StreamsConfig config = new StreamsConfig(prop);
            
            //4.流计算 拓扑
            Topology builder = new Topology();
            
            //5.定义kafka组件数据源
            builder.addSource("Source", oneTopic).addProcessor("Processor", new ProcessorSupplier<byte[], byte[]>() {
    
                public Processor<byte[], byte[]> get() {
                    return new LogProcessor();
                }
                //从哪里来
            }, "Source")
            //到哪里去
            .addSink("Sink", twoTopic, "Processor");
    
            //6.实例化kafkaStream
            KafkaStreams kafkaStreams = new KafkaStreams(builder, prop);
            kafkaStreams.start();
        }
    }

    3、运行Application类的main方法

    4、在hd09-1机器上创建主题t1

    bin/kafka-topics.sh --zookeeper hd09-1:2181 --create --replication-factor 3 --partition 1 --topic t1

    5、在hd09-2机器上启动消费者

    bin/kafka-console-consumer.sh --bootstrap-server hd09-2:9092 --topic t2 --from-beginning --consumer.config config/consumer.properties

    6、在hd09-1机器上启动生产者

    bin/kafka-console-producer.sh --broker-list hd09-1:9092 --topic t1

    7、此时在hd09-1机器kafka生产者上输入  wo-henshuai,在hd09-2消费者机器上会显示henshuai,即完成了数据清洗,如下图。

  • 相关阅读:
    二叉查找树的简易实现
    二叉树的部分简单实现
    二叉树的遍历(基于栈的非递归方式实现)
    简易学生成绩管理管理系统(java描述)
    简易的学生成绩管理系统(C++实现)
    Android中使用ExpandableListView实现微信通讯录界面(完善仿微信APP)
    JAVA环境变量和TomCat服务器配置
    Android中ListView实现图文并列并且自定义分割线(完善仿微信APP)
    Android中Fragment和ViewPager那点事儿(仿微信APP)
    Android中通过ActionBar为标题栏添加搜索以及分享视窗
  • 原文地址:https://www.cnblogs.com/areyouready/p/10139806.html
Copyright © 2011-2022 走看看