zoukankan      html  css  js  c++  java
  • flink ETL数据处理

                          Flink ETL 实现数据清洗 

      

     一:需求(针对算法产生的日志数据进行清洗拆分)

      1. 算法产生的日志数据是嵌套json格式,需要拆分

      2.针对算法中的国家字段进行大区转换

      3.最后把不同类型的日志数据分别进行储存

    二:整体架构 

          这里演示处理从rabbitmq来的数据 进行数据处理 然后发送到rabbitmq                            

     自定义redistSource flink没有redis的source

    package com.yw.source;
    
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.exceptions.JedisConnectionException;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * redis中进行数据初始化
     * <p>
     * 在reids中保存国家和大区关系
     * hset areas AREA_IN IN
     * hset areas AREA_US US
     * hset areas AREA_CT TW,HK
     * hset areas AREA_AR PK,KW,SA
     *
     *
     * @Auther: YW
     * @Date: 2019/6/15 10:23
     * @Description:
     */
    public class MyRedisSource implements SourceFunction<HashMap<String, String>> {
        private final Logger LOG = LoggerFactory.getLogger(MyRedisSource.class);
    
        private boolean isRuning = true;
        private Jedis jedis = null;
        private final long SLEEP = 60000;
        private final long expire = 60;
    
        @Override
        public void run(SourceContext<HashMap<String, String>> ctx) throws Exception {
            this.jedis = new Jedis("localhost", 6397);
            // 存储国家和地区关系
            HashMap<String, String> map = new HashMap<>();
            while (isRuning) {
                try {
                    map.clear(); // 老数据清除
                    Map<String, String> areas = jedis.hgetAll("areas");
                    for (Map.Entry<String, String> entry : areas.entrySet()) {
                        String key = entry.getKey();
                        String value = entry.getValue();
                        String[] splits = value.split(",");
                        for (String split : splits) {
                            map.put(split, key);
                        }
                    }
                    if (map.size() > 0) {
                        // map >0 数据发送出去
                        ctx.collect(map);
                    }else {
                        LOG.warn("获取数据为空!");
                    }
                    // 歇6秒
                    Thread.sleep(SLEEP);
                } catch (JedisConnectionException e) {
                    LOG.error("redis连接异常 重新连接",e.getCause());
                    // 如果连接异常 重新连接
                    jedis = new Jedis("localhost", 6397);
                }catch (Exception e){
                    LOG.error("redis Source其他异常",e.getCause());
                }
    
            }
        }
    
        @Override
        public void cancel() {
            isRuning = false;
            while (jedis != null) {
                jedis.close();
            }
        }
    }
    DataClean数据处理
    package com.yw;
    
    import com.alibaba.fastjson.JSONArray;
    import com.alibaba.fastjson.JSONObject;
    import com.rabbitmq.client.AMQP;
    import com.yw.source.MyRedisSource;
    import org.apache.flink.streaming.api.CheckpointingMode;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.CheckpointConfig;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
    import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;
    import org.apache.flink.streaming.connectors.rabbitmq.RMQSinkPublishOptions;
    import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
    import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
    import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
    import org.apache.flink.util.Collector;
    
    import java.util.HashMap;
    
    /**
     * @Auther: YW
     * @Date: 2019/6/15 10:09
     * @Description:
     */
    public class DataClean {
        // 队列名
        public final static String QUEUE_NAME = "two.aa.in";
    
        public static void main(String[] args) throws Exception {
            // 获取环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 一分钟 checkpoint
            env.enableCheckpointing(60000);
            env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
            env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000); // enableCheckpointing最小间隔时间(一半)
            env.getCheckpointConfig().setCheckpointTimeout(10000);// 超时时间
            env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
            env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    
            final RMQConnectionConfig rmqConf = new RMQConnectionConfig.Builder().setHost("127.0.0.1").setPort(5672).setVirtualHost("/").setUserName("guest").setPassword("guest").build();
            // 获取mq数据
            DataStream<String> data1 = env.addSource(new RMQSource<String>(rmqConf, QUEUE_NAME, false, new SimpleStringSchema())).setParallelism(1);
            //{"dt":"2019-06-10","countryCode":"US","data":[{"type":"s1","score":0.3,"level":"A"},{"type":"s2","score":0.1,"level":"B"},{"type":"s3","score":0.2,"level":"C"}]}
            DataStreamSource<HashMap<String, String>> mapData = env.addSource(new MyRedisSource());
            // connect可以连接两个流
            DataStream<String> streamOperator = data1.connect(mapData).flatMap(new CoFlatMapFunction<String, HashMap<String, String>, String>() {
                // 保存 redis返回数据  国家和大区的映射关系
                private HashMap<String, String> allMap = new HashMap<String, String>();
    
                // flatMap1 处理rabbitmq的数据
                @Override
                public void flatMap1(String value, Collector<String> out) throws Exception {
                    JSONObject jsonObject = JSONObject.parseObject(value);
                    String countryCode = jsonObject.getString("countryCode");
                    String dt = jsonObject.getString("dt");
                    // 获取大区
                    String area = allMap.get(countryCode);
                    JSONArray jsonArray = jsonObject.getJSONArray("data");
                    for (int i = 0; i < jsonArray.size(); i++) {
                        JSONObject jsonObject1 = jsonArray.getJSONObject(i);
                        jsonObject1.put("area", area);
                        jsonObject1.put("dt", dt);
                        out.collect(jsonObject1.toJSONString());
                    }
                }
    
                // 处理redis的返回的map类型的数据
                @Override
                public void flatMap2(HashMap<String, String> value, Collector<String> out) throws Exception {
                    this.allMap = value;
                }
            });
            streamOperator.addSink(new RMQSink<String>(rmqConf, new SimpleStringSchema(), new RMQSinkPublishOptions<String>() {
                @Override
                public String computeRoutingKey(String s) {
                    return "CC";
                }
    
                @Override
                public AMQP.BasicProperties computeProperties(String s) {
                    return null;
                }
    
                @Override
                public String computeExchange(String s) {
                    return "test.flink.output";
                }
            }));
            data1.print();
            env.execute("etl");
        }
    }

    rabbitmq 模拟数据

    package com.yw;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.Random;
    
    /**
     * @Auther: YW
     * @Date: 2019/6/5 14:57
     * @Description:
     */
    public class RabbitMQProducerUtil {
        public final static String QUEUE_NAME = "two.aa.in";
    
        public static void main(String[] args) throws Exception {
            //创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
    
            //设置RabbitMQ相关信息
            factory.setHost("127.0.0.1");
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("/");
            factory.setPort(5672);
    
            //创建一个新的连接
            Connection connection = factory.newConnection();
            //创建一个通道
            Channel channel = connection.createChannel();
            // 声明一个队列
    //        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //发送消息到队列中
            String message = "{"dt":""+getCurrentTime()+"","countryCode":""+getCountryCode()+""," +
                    "{"type":""+getType()+"","score":"+getScore()+""level":""+getLevel()+""}," +
                    "{"type":""+getType()+"","score":"+getScore()+""level":""+getLevel()+""}," +
                    "{"type":""+getType()+"","score":"+getScore()+""level":""+getLevel()+""}]}";
    
            //我们这里演示发送一千条数据
            for (int i = 0; i < 20; i++) {
                    channel.basicPublish("", QUEUE_NAME, null, (message + i).getBytes("UTF-8"));
                  System.out.println("Producer Send +'" + message);
            }
    
            //关闭通道和连接
            channel.close();
            connection.close();
        }
    
        public static String getCurrentTime() {
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            return sdf.format(new Date());
        }
    
        public static String getCountryCode() {
            String[] types={"US","TN","HK","PK","KW","SA","IN"};
            Random random = new Random();
            int i = random.nextInt(types.length);
            return types[i];
        }
    
        public static String getType() {
            String[] types={"s1","s2","s3","s4","s5"};
            Random random = new Random();
            int i = random.nextInt(types.length);
            return types[i];
        }
    
        public static String getScore() {
            String[] types={"0.1","0.2","0.3","0.4","0.5"};
            Random random = new Random();
            int i = random.nextInt(types.length);
            return types[i];
        }
        public static String getLevel() {
            String[] types={"A","B","C","D","E"};
            Random random = new Random();
            int i = random.nextInt(types.length);
            return types[i];
        }
    }

    redis 初始化数据

    * hset areas AREA_IN IN
    * hset areas AREA_US US
    * hset areas AREA_CT TW,HK
    * hset areas AREA_AR PK,KW,SA

    ------------最后运行DataClean------------

  • 相关阅读:
    OnClose()和 OnDestroy()
    非计算机专业人员如何成为专业人员的学习之路
    【轉】白话文、简化字白化了文化,简化了思想
    【轉】研制汉字计算机的意义和可能性
    【轉】智慧的辨思:最优秀的语种 汉语
    安卓命令行操作备忘
    virtualbox 安装 android 经验总结
    【转】HTML5 LocalStorage 本地存储
    iOS开发之 获取手机的网络的ip地址
    iOS开发之 Xcode 6 创建一个Empty Application
  • 原文地址:https://www.cnblogs.com/YuanWeiBlogger/p/11815828.html
Copyright © 2011-2022 走看看