zoukankan      html  css  js  c++  java
  • 关于mapreducer 读取hbase数据 存入mysql的实现过程

    mapreducer编程模型是一种八股文的代码逻辑,就以用户行为分析求流存率的作为例子
    
    1.map端来说:必须继承hadoop规定好的mapper类:在读取hbase数据时,已经有现成的接口
    TableMapper,只需要规定输出的key和value的类型
    public class LoseUserMapper extends TableMapper<KeyStatsDimension, Text> {
     //////////省去代码

    在执行map方法前会执行setup方法,在流失率的时候 比如说求的是七天的流失率:
    1。先将七天前的那天的组合key+uuid存入一个map集合,这过程在setup方法中进行

    2.再将今天的数据根据key+uuid2组成字符串
    3。if(map.get(
    key+uuid2)!=null)
    4.context.write(userDimensionKey, uuidText);发给reducer
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            Configuration conf = context.getConfiguration();
            long endDate = TimeUtil.parseString2Long(conf.get(GlobalConstants.END_DATE), TimeUtil.DATE_FORMAT);
            long beginDate = TimeUtil.parseString2Long(conf.get(GlobalConstants.BEGIN_DATE), TimeUtil.DATE_FORMAT);
            dateKey = DateDimensionKey.buildDate(endDate, DateEnum.DAY);
    
            if ((endDate - beginDate) == 7 * GlobalConstants.DAY_OF_MILLISECONDS) {
                channelKpiKey = new KpiDimensionKey(KpiEnum.CHANNEL_SEVEN_DAY_LOSE.name);
                versionKpiKey = new KpiDimensionKey(KpiEnum.VERSION_SEVEN_DAY_LOSE.name);
                areaKpiKey = new KpiDimensionKey(KpiEnum.AREA_SEVEN_DAY_LOSE.name);
            }
    
            if ((endDate - beginDate) == 14 * GlobalConstants.DAY_OF_MILLISECONDS) {
                channelKpiKey = new KpiDimensionKey(KpiEnum.CHANNEL_FOURTEEN_DAY_LOSE.name);
                versionKpiKey = new KpiDimensionKey(KpiEnum.VERSION_FOURTEEN_DAY_LOSE.name);
                areaKpiKey = new KpiDimensionKey(KpiEnum.AREA_FOURTEEN_DAY_LOSE.name);
            }
    
            if ((endDate - beginDate) /30 == GlobalConstants.DAY_OF_MILLISECONDS) {
                channelKpiKey = new KpiDimensionKey(KpiEnum.CHANNEL_THIRTY_DAY_LOSE.name);
                versionKpiKey = new KpiDimensionKey(KpiEnum.VERSION_THIRTY_DAY_LOSE.name);
                areaKpiKey = new KpiDimensionKey(KpiEnum.AREA_THIRTY_DAY_LOSE.name);
            }
    
            setActiveUserCache(conf);
        }
    
        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context)
                throws IOException, InterruptedException {
            String uuid = Bytes.toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_UUID)));
            String appId = Bytes.toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_APP)));
            String platformId = Bytes.toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_PLATFORM)));
            String channel = Bytes.toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_CHANNEL)));
            String version = Bytes.toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_VERSION)));
            String country = Bytes.toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_COUNTRY)));
            String province = Bytes.toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_PROVINCE)));
            String city = Bytes.toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_CITY)));
            String isp = Bytes.toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_ISP)));
    
            List<StatsUserDimensionKey> userDimensionKeys = getDimensionKeys(appId, platformId, channel, version, country, province, city, isp);
    
            for (StatsUserDimensionKey userDimensionKey : userDimensionKeys) {
                if (activeUserCache.get(userDimensionKey.toString() + GlobalConstants.KEY_SEPARATOR + uuid) != null) {
                    this.uuidText.set(uuid);
                    context.write(userDimensionKey, uuidText);
                }
            }
        }
    
        public void setActiveUserCache(Configuration conf){
            String date = conf.get(GlobalConstants.BEGIN_DATE).replaceAll("-", "");
    
            FilterList filterList = new FilterList();
            filterList.addFilter(
                    new SingleColumnValueFilter(EventLogConstants.EVENT_LOGS_FAMILY_NAME_BYTES,
                            Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME),
                            CompareFilter.CompareOp.EQUAL, Bytes.toBytes(EventLogConstants.EventEnum.START.alias)));
            String[] columns = new String[] {
                    EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME,
                    EventLogConstants.LOG_COLUMN_NAME_APP,
                    EventLogConstants.LOG_COLUMN_NAME_PLATFORM,
                    EventLogConstants.LOG_COLUMN_NAME_CHANNEL,
                    EventLogConstants.LOG_COLUMN_NAME_VERSION,
                    EventLogConstants.LOG_COLUMN_NAME_PROVINCE,
                    EventLogConstants.LOG_COLUMN_NAME_ISP,
                    EventLogConstants.LOG_COLUMN_NAME_UUID,
            };
            filterList.addFilter(this.getColumnFilter(columns));
    
            Connection conn = null;
            Admin admin = null;
            Scan scan = new Scan();
            Table table = null;
            try {
                conn = ConnectionFactory.createConnection(conf);
                admin = conn.getAdmin();
                String tableName = EventLogConstants.HBASE_NAME_EVENT_LOGS +"_"+ date;
                table = conn.getTable(TableName.valueOf(tableName));
                scan.setFilter(filterList);
                ResultScanner rs = table.getScanner(scan);
                for (Result r : rs) {
                    String appId = Bytes.toString(r.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_APP)));
                    String platformId = Bytes.toString(r.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_PLATFORM)));
                    String channel = Bytes.toString(r.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_CHANNEL)));
                    String version = Bytes.toString(r.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_VERSION)));
                    String country = Bytes.toString(r.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_COUNTRY)));
                    String province = Bytes.toString(r.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_PROVINCE)));
                    String city = Bytes.toString(r.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_CITY)));
                    String isp = Bytes.toString(r.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_ISP)));
                    String uuid = Bytes.toString(r.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_UUID)));
    
                    List<StatsUserDimensionKey> userDimensionKeys = getDimensionKeys(appId, platformId, channel, version, country, province, city, isp);
    
                    for (StatsUserDimensionKey userDimensionKey : userDimensionKeys) {
                        activeUserCache.put(userDimensionKey.toString() + GlobalConstants.KEY_SEPARATOR + uuid, 1);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
                throw new RuntimeException("创建HBaseAdmin发生异常", e);
            } finally {
                if (admin != null) {
                    try {
                        admin.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        private Filter getColumnFilter(String[] columns) {
            int length = columns.length;
            byte[][] filter = new byte[length][];
            for (int i = 0; i < length; i++) {
                filter[i] = Bytes.toBytes(columns[i]);
            }
            return new MultipleColumnPrefixFilter(filter);
        }
    
        private List<StatsUserDimensionKey> getDimensionKeys(String appid, String platformId, String channel, String version, String country, String province, String city, String isp) {
            List<StatsUserDimensionKey> keys = new ArrayList<>();
    
            AppDimensionKey appKey = new AppDimensionKey(appid, AppEnum.valueOfAlias(appid).name);
            List<PlatformDimensionKey> platformKeys = PlatformDimensionKey.buildList(platformId, PlatformEnum.valueOfAlias(platformId).name);
            List<ChannelDimensionKey> channelKeys = ChannelDimensionKey.buildList(channel);
            List<VersionDimensionKey> versionKeys = VersionDimensionKey.buildList(version);
            List<AreaDimensionKey> areaKeys = AreaDimensionKey.buildList(country, province, city);
            List<IspDimensionKey> ispKeys = IspDimensionKey.buildList(isp);
    
            for (PlatformDimensionKey platformKey : platformKeys) {
    
                //应用+终端+渠道 维度
                for (ChannelDimensionKey channelKey : channelKeys) {
                    StatsUserDimensionKey userDimensionKey = new StatsUserDimensionKey();
                    StatsCommonDimensionKey commonKey = userDimensionKey.getCommonDimensionKey();
                    commonKey.setDateDimensionKey(dateKey);
                    commonKey.setAppDimensionKey(appKey);
                    commonKey.setPlatformDimensionKey(platformKey);
                    commonKey.setKpiDimensionKey(channelKpiKey);
                    userDimensionKey.setCommonDimensionKey(commonKey);
                    userDimensionKey.setChannelDimensionKey(channelKey);
                    keys.add(userDimensionKey);
                }
                //应用+终端+版本 维度
                for (VersionDimensionKey versionKey : versionKeys) {
                    StatsUserDimensionKey userDimensionKey = new StatsUserDimensionKey();
                    StatsCommonDimensionKey commonKey = userDimensionKey.getCommonDimensionKey();
                    commonKey.setDateDimensionKey(dateKey);
                    commonKey.setAppDimensionKey(appKey);
                    commonKey.setPlatformDimensionKey(platformKey);
                    commonKey.setKpiDimensionKey(versionKpiKey);
                    userDimensionKey.setVersionDimensionKey(versionKey);
                    keys.add(userDimensionKey);
                }
                //应用+终端+地域+运营商 维度
                for (AreaDimensionKey areaKey : areaKeys) {
                    for (IspDimensionKey ispKey : ispKeys) {
                        StatsUserDimensionKey userDimensionKey = new StatsUserDimensionKey();
                        StatsCommonDimensionKey commonKey = userDimensionKey.getCommonDimensionKey();
                        commonKey.setDateDimensionKey(dateKey);
                        commonKey.setAppDimensionKey(appKey);
                        commonKey.setPlatformDimensionKey(platformKey);
                        commonKey.setKpiDimensionKey(areaKpiKey);
                        userDimensionKey.setAreaDimensionKey(areaKey);
                        userDimensionKey.setIspDimensionKey(ispKey);
                        keys.add(userDimensionKey);
                    }
                }
            }
            return keys;
        }
    }





    reducer:对map端的key进行拉取,相同的key存入一个集合中 ,不同的组合key可能有相同的uuid,遍历value将uuid存入set集合求取他的长度就是
    今天在七天前的留存人数
    public class LoseUserReducer extends Reducer<StatsUserDimensionKey, Text, StatsUserDimensionKey, MapWritableValue> {
        private MapWritableValue outputValue = new MapWritableValue();
        private Set<String> unique = new HashSet<String>();
    
        @Override
        protected void reduce(StatsUserDimensionKey key, Iterable<Text> values, Context context)
                        throws IOException, InterruptedException {
            this.unique.clear();
            for (Text value : values) {
                this.unique.add(value.toString());
            }
    
            // 设置值
            MapWritable map = new MapWritable();
            map.put(new IntWritable(-1), new IntWritable(this.unique.size()));
            this.outputValue.setValue(map);
    
            // 设置kpi
            this.outputValue.setKpi(KpiEnum.valueOfName(key.getCommonDimensionKey().getKpiDimensionKey().getKpiName()));
    
            // 数据输出
            context.write(key, outputValue);
        }
    }
    输出到mysql:
     /**
             * 输出数据, 当在reduce中调用context.write方法的时候,底层调用的是该方法
             * 将Reduce输出的Key/Value写成特定的格式
    * 自定义输出到MySQL的outputformat类
     */
    public class TransformerOutputFormat extends OutputFormat<KeyBaseDimension, BaseStatsValueWritable> {
    
        /**
         * 返回一个具体定义如何输出数据的对象, recordwriter被称为数据的输出器
         * getRecordWriter用于返回一个RecordWriter的实例,Reduce任务在执行的时候就是利用这个实例来输出Key/Value的。
         * (如果Job不需要Reduce,那么Map任务会直接使用这个实例来进行输出。)
          */
        @Override
        public RecordWriter<KeyBaseDimension, BaseStatsValueWritable> getRecordWriter(TaskAttemptContext context)
                throws IOException, InterruptedException {
            Configuration conf = context.getConfiguration();
            /**
             * 使用RPC方式创建converter,很重要,通过配置获取维度id
             */
            IDimensionConverter converter = DimensionConverterClient.createDimensionConverter(conf);
            Connection conn = null;
    
            try {
                conn = JdbcManager.getConnection(conf, GlobalConstants.WAREHOUSE_OF_REPORT);
                // 关闭自动提交机制
                conn.setAutoCommit(false);
            } catch (Exception e) {
                throw new RuntimeException("获取数据库连接失败", e);
            }
            return new TransformerRecordWriter(conn, conf, converter);
        }
    
        /**
         * 执行reduce时,会验证输出目录是否存在,
         * checkOutputSpecs是 在JobClient提交Job之前被调用的(在使用InputFomat进行输入数据划分之前),用于检测Job的输出路径。
         * 比如,FileOutputFormat通过这个方法来确认在Job开始之前,Job的Output路径并不存在,然后该方法又会重新创建这个Output 路径。
         * 这样一来,就能确保Job结束后,Output路径下的东西就是且仅是该Job输出的。
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
            // 这个方法在自己实现的时候不需要关注,如果你非要关注,最多检查一下表数据存在
    
        }
    
        /**
         * getOutputCommitter则 用于返回一个OutputCommitter的实例
         * @param context
         * @return
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
            return new FileOutputCommitter(FileOutputFormat.getOutputPath(context), context);
        }
    
        /**
         * 自定义的数据输出器
         */
        public class TransformerRecordWriter extends RecordWriter<KeyBaseDimension, BaseStatsValueWritable> {
    
            private Connection conn = null;
            private Configuration conf = null;
            private IDimensionConverter converter = null;
            private Map<KpiEnum, PreparedStatement> kpiTypeSQLMap = new HashMap<>();
            private Map<KpiEnum, Integer> batchMap = new HashMap<>();
    
            public TransformerRecordWriter(Connection conn, Configuration conf, IDimensionConverter converter) {
                super();
                this.conn = conn;
                this.conf = conf;
                this.converter = converter;
            }
    
            /**
             * 输出数据, 当在reduce中调用context.write方法的时候,底层调用的是该方法
             * 将Reduce输出的Key/Value写成特定的格式
             * @param key
             * @param value
             * @throws IOException
             * @throws InterruptedException
             */
            @Override
            public void write(KeyBaseDimension key, BaseStatsValueWritable value)
                    throws IOException, InterruptedException {
    
                KpiEnum kpiEnum = value.getKpi();
    
                String sql = this.conf.get(kpiEnum.name);
                PreparedStatement pstmt;
                int count = 1;
                try {
                    if (kpiTypeSQLMap.get(kpiEnum) == null) {
                        // 第一次创建
                        pstmt = this.conn.prepareStatement(sql);
                        kpiTypeSQLMap.put(kpiEnum, pstmt);
                    } else {
                        // 标示已经存在
                        pstmt = kpiTypeSQLMap.get(kpiEnum);
                        if (!batchMap.containsKey(kpiEnum)) {
                            batchMap.put(kpiEnum, count);
                        }
                        count = batchMap.get(kpiEnum);
                        count++;
                    }
                    batchMap.put(kpiEnum, count);
    
                    // 针对特定的MR任务有特定的输出器:IOutputCollector
                    String collectorClassName = conf.get(GlobalConstants.OUTPUT_COLLECTOR_KEY_PREFIX + kpiEnum.name);
                    Class<?> clazz = Class.forName(collectorClassName);
                    // 创建对象, 要求实现子类一定要有一个无参数的构造方法
                    IOutputCollector collector = (IOutputCollector) clazz.newInstance();
                    collector.collect(conf, key, value, pstmt, converter);
    
                    // 批量提交
                    if (count % conf.getInt(GlobalConstants.JDBC_BATCH_NUMBER, GlobalConstants.DEFAULT_JDBC_BATCH_NUMBER) == 0) {
                        pstmt.executeBatch(); // 批量提交
                        conn.commit();
                        batchMap.remove(kpiEnum); // 移除已经存在的输出数据
                    }
                } catch (Exception e) {
                    throw new IOException("数据输出产生异常", e);
                }
            }
    
            /**
             * 关闭资源使用,最终一定会调用
             * 负责对输出做最后的确认并关闭输出
             * @param context
             * @throws IOException
             * @throws InterruptedException
             */
            @Override
            public void close(TaskAttemptContext context) throws IOException, InterruptedException {
                try {
                    try {
                        for (Map.Entry<KpiEnum, PreparedStatement> entry : this.kpiTypeSQLMap.entrySet()) {
                            entry.getValue().executeBatch();
                        }
                    } catch (Exception e) {
                        throw new IOException("输出数据出现异常", e);
                    } finally {
                        try {
                            if (conn != null) {
                                conn.commit();
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                        } finally {
                            if (conn != null) {
                                for (Map.Entry<KpiEnum, PreparedStatement> entry : this.kpiTypeSQLMap.entrySet()) {
                                    try {
                                        entry.getValue().close();
                                    } catch (SQLException e) {
                                        e.printStackTrace();
                                    }
                                }
                                try {
                                    conn.close();
                                } catch (SQLException e) {
                                    e.printStackTrace();
                                }
                            }
                        }
                    }
                } finally {
                    // 关闭远程连接
                    DimensionConverterClient.stopDimensionConverterProxy(converter);
                }
            }
    
        }
    
    }
     
  • 相关阅读:
    多线程之volatile关键字
    多线程具体实现
    多线程的概述
    Linux基本目录机构
    Java13新特性
    CF1316D【Nash Matrix】(dfs+构造+思维)
    ego商城项目学习总结+出现问题及解决
    java.lang.OutOfMemoryError: GC overhead limit exceeded之tomcat7优化
    jsp在tomcat中更新不起作用
    js取值及赋值
  • 原文地址:https://www.cnblogs.com/hejunhong/p/10373174.html
Copyright © 2011-2022 走看看