zoukankan      html  css  js  c++  java
  • mapreduce实现从hbase中统计数据,结果存入mysql中

    最近开始学习使用mapreduce统计hbase中的数据,并将结果集存入mysql中,供前台查询使用。

    使用hadoop版本为2.5.1,hbase版本为0.98.6.1

    mapreduce程序分为三个部分:job、map函数、reduce函数

    job类:

     1 public class DayFaultStatisticsJob {
     2     private static final Logger logger = LoggerFactory.getLogger(DayFaultStatisticsJob.class);
     3 
     4     public void runJob(String start){
     5         try {
     6             logger.info("开始运行mapreduce,统计故障信息");
     7             Configuration config = HBaseConfiguration.create();
     8             // 传递统计条件给map
     9             config.set("search.time.start",start);
    10 
    11             DBConnector dbConnector = DBConnectorUtil.getDBConnector();
    12             DBConfiguration.configureDB(config, "com.mysql.jdbc.Driver", "jdbc:mysql://" + dbConnector.getHost() + ":" + dbConnector.getPort() + "/xcloud", dbConnector.getUser(), dbConnector.getPwd());
    13 
    14             Job job = Job.getInstance(config);
    15             job.setJarByClass(DayFaultStatisticsJob.class); // class that contains mapper and reducer
    16             Scan scan = new Scan();
    17             scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
    18             scan.setCacheBlocks(false);  // don't set to true for MR jobs
    19             scan.addFamily(GlobalConstants.DEVICE_FAULT_FAMILY_NAME);
    20 
    21             // set other scan attrs
    22             // TODO 增加遍历条件
    23             /*scan.setStartRow();
    24               scan.setStopRow();*/
    25             //scan.setFilter();
    26 
    27             TableMapReduceUtil.initTableMapperJob(
    28                     GlobalConstants.TABLE_NAME_DEVICE_FAULT,          // input table
    29                     scan,                // Scan instance to control CF and attribute selection
    30                     DayFaultStatisticsMapper.class,     // mapper class
    31                     Text.class,         // mapper output key
    32                     IntWritable.class,  // mapper output value
    33                     job);
    34             job.setReducerClass(DayFaultStatisticsReducer.class);    // reducer class
    35             job.setNumReduceTasks(2);    // at least one, adjust as required
    36             //FileOutputFormat.setOutputPath(job, new Path("/usr/local/mapreduce"));  // adjust directories as required
    37             DBOutputFormat.setOutput(job, GlobalConstants.MYSQL_DEVICE_FAULT_DAY, GlobalConstants.MYSQL_DEVICE_FAULT_DAY_FIELDS);
    38             boolean b = job.waitForCompletion(true);
    39             if (b) {
    40                 logger.info("mapreduce任务正常结束");
    41                 System.exit(0);
    42             } else {
    43                 logger.info("mapreduce任务异常结束");
    44                 System.exit(1);
    45             }
    46         } catch (IOException e) {
    47             logger.error(e.getMessage(),e);
    48         } catch (InterruptedException e) {
    49             logger.error(e.getMessage(),e);
    50         } catch (ClassNotFoundException e) {
    51             logger.error(e.getMessage(),e);
    52         }
    53     }
    54 
    55 }

    mapper类:

    public class DayFaultStatisticsMapper extends TableMapper<Text, IntWritable> {
        private static final Logger logger = LoggerFactory.getLogger(DayFaultStatisticsMapper.class);
    
        private Text text = new Text();
        private IntWritable ONE = new IntWritable(1);
        HashMap<String,String> conditionMap = new HashMap<String,String>();
    
        // 接收过滤条件
        protected void setup(Context context) throws IOException,
                InterruptedException {
            conditionMap.put("start",context.getConfiguration().get("search.time.start").trim());
        }
    
        public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
            String timestamp = null;
            String company_id = null;
            String product_id = null;
            String model_id = null;
            String fault_id = null;
            String province_id = null;
            String city_id = null;
            String district_id = null;
            for(java.util.Map.Entry<byte[], byte[]> val : value.getFamilyMap(Bytes.toBytes("info")).entrySet()){
                String ikey = new String(val.getKey());
                String ivalue = new String(val.getValue());
                // 按条件进行map
                if (StringUtil.isEmpty(ikey) || StringUtil.isEmpty(ivalue)){
                     return;
                } else if (ikey.equals("company_id")){
                    company_id = ivalue;
                    continue;
                } else if (ikey.equals("product_id")){
                    product_id = ivalue;
                    continue;
                } else if (ikey.equals("model_id")){
                    model_id = ivalue;
                    continue;
                } else if (ikey.equals("fault_id")){
                    fault_id = ivalue;
                    continue;
                } else if (ikey.equals("province_id")){
                    province_id = ivalue;
                    continue;
                } else if (ikey.equals("city_id")){
                    city_id = ivalue;
                    continue;
                } else if (ikey.equals("district_id")){
                    district_id = ivalue;
                    continue;
                } else if (ikey.equals("timestamp")){
                    String time = ivalue.substring(0,8);// 判断是否是当日
                    if (time.equals(conditionMap.get("start"))){
                        timestamp = time; // 统计日故障发生次数
                    }
                    continue;
                }
            }
            if (company_id == null || product_id ==  null || model_id == null || fault_id == null || timestamp == null || fault_id.equals("-1")) {
                return;
            }
            // 故障码为 -1的不统计
            String val = company_id + "-" +product_id + "-" +model_id + "-"+fault_id + "-"+timestamp + "-"+province_id + "-"+ city_id+ "-"+ district_id; // 分组key
            text.set(val);     // we can only emit Writables...
            context.write(text, ONE);
        }
    }

    reducer类:

     1 public class DayFaultStatisticsReducer extends Reducer<Text, IntWritable, FaultDay, Text> {
     2     private static final Logger logger = LoggerFactory.getLogger(DayFaultStatisticsReducer.class);
     3 
     4     public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
     5         int value = 0;
     6         for (IntWritable val : values) {
     7             value += val.get();
     8         }
     9         //company_id + "-" +product_id + "-" +model_id + "-"+fault_id + "-"+timestamp + "-"+province_id + "-"+ city_id+ "-"+ district_id
    10         String fields[] = key.toString().split("-");
    11         if (fields.length != 8){
    12             return;
    13         }
    14         // 处理day day格式:2015-02-01
    15         String day = fields[4].substring(0,4) + "-"  + fields[4].substring(4,6) + "-" + fields[4].substring(6,8);
    16         // DateUtil.format(new Date(), new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"));
    17         FaultDay faultDay = new FaultDay();
    18         faultDay.setId(UUIDGen.generate());
    19         faultDay.setDay(day);
    20         faultDay.setCompany_id(fields[0]);
    21         faultDay.setProduct_id(fields[1]);
    22         faultDay.setModel_id(fields[2]);
    23         faultDay.setFault_id(fields[3]);
    24         if (!fields[5].equals("null")){
    25             faultDay.setProvince_id(fields[5]);
    26         }
    27         if (!fields[6].equals("null")){
    28             faultDay.setCity_id(fields[6]);
    29         }
    30         if (!fields[7].equals("null")){
    31             faultDay.setDistrict_id(fields[7]);
    32         }
    33         faultDay.setNum(value);
    34         context.write(faultDay, null);
    35     }
    36 
    37 }


    FaultDay中实现DBWritable接口

      1 public class FaultDay implements Writable, DBWritable {
      2     private String id;
      3 
      4     private String day; // 2015-02-01
      5     private String company_id;
      6     private String product_id;
      7     private String model_id;
      8     private String fault_id;
      9     private String province_id;
     10     private String city_id;
     11     private String district_id;
     12     private int num;
     13 
     14     @Override
     15     public void write(PreparedStatement statement) throws SQLException {
     16         int index = 1;
     17         statement.setString(index++, this.getId());
     18         statement.setString(index++, this.getDay());
     19         statement.setString(index++, this.getCompany_id());
     20         statement.setString(index++, this.getProduct_id());
     21         statement.setString(index++, this.getModel_id());
     22         statement.setString(index++, this.getFault_id());
     23         statement.setString(index++, this.getProvince_id());
     24         statement.setString(index++, this.getCity_id());
     25         statement.setString(index++, this.getDistrict_id());
     26         statement.setInt(index++, this.getNum());
     27     }
     28 
     29     @Override
     30     public void readFields(ResultSet resultSet) throws SQLException {
     31         this.id = resultSet.getString(1);
     32         this.day = resultSet.getString(2);
     33         this.company_id = resultSet.getString(3);
     34         this.product_id = resultSet.getString(4);
     35         this.model_id = resultSet.getString(5);
     36         this.fault_id = resultSet.getString(6);
     37         this.province_id = resultSet.getString(7);
     38         this.city_id = resultSet.getString(8);
     39         this.district_id = resultSet.getString(9);
     40         this.num = resultSet.getInt(10);
     41     }
     42 
     43     @Override
     44     public void write(DataOutput out) throws IOException {
     45         //To change body of implemented methods use File | Settings | File Templates.
     46     }
     47 
     48     @Override
     49     public void readFields(DataInput in) throws IOException {
     50         //To change body of implemented methods use File | Settings | File Templates.
     51     }
     52 
     53 
     54     public String getCity_id() {
     55         return city_id;
     56     }
     57 
     58     public void setCity_id(String city_id) {
     59         this.city_id = city_id;
     60     }
     61 
     62     public String getCompany_id() {
     63         return company_id;
     64     }
     65 
     66     public void setCompany_id(String company_id) {
     67         this.company_id = company_id;
     68     }
     69 
     70     public String getDay() {
     71         return day;
     72     }
     73 
     74     public void setDay(String day) {
     75         this.day = day;
     76     }
     77 
     78     public String getDistrict_id() {
     79         return district_id;
     80     }
     81 
     82     public void setDistrict_id(String district_id) {
     83         this.district_id = district_id;
     84     }
     85 
     86     public String getFault_id() {
     87         return fault_id;
     88     }
     89 
     90     public void setFault_id(String fault_id) {
     91         this.fault_id = fault_id;
     92     }
     93 
     94     public String getId() {
     95         return id;
     96     }
     97 
     98     public void setId(String id) {
     99         this.id = id;
    100     }
    101 
    102     public String getModel_id() {
    103         return model_id;
    104     }
    105 
    106     public void setModel_id(String model_id) {
    107         this.model_id = model_id;
    108     }
    109 
    110     public int getNum() {
    111         return num;
    112     }
    113 
    114     public void setNum(int num) {
    115         this.num = num;
    116     }
    117 
    118     public String getProduct_id() {
    119         return product_id;
    120     }
    121 
    122     public void setProduct_id(String product_id) {
    123         this.product_id = product_id;
    124     }
    125 
    126     public String getProvince_id() {
    127         return province_id;
    128     }
    129 
    130     public void setProvince_id(String province_id) {
    131         this.province_id = province_id;
    132     }
    133 }


    主类:

    public class FaultStatistics {
        private static final Logger logger = LoggerFactory.getLogger(FaultStatistics.class);
    
        public static void main(String[] args) {
            // 传递统计条件给map
            String start;
            if ( args.length != 0 && StringUtil.notEmpty(args[0])) {
                start = args[0].substring(0,8);
            } else {
                // 默认为当前时间的前一天
                start = DateUtil.format(DateUtil.getPreDay(new Date()), new SimpleDateFormat("yyyyMMdd"));
            }
            
            DayFaultStatisticsJob dayFaultStatisticsJob = new DayFaultStatisticsJob();
            dayFaultStatisticsJob.runJob(start);
        }
    
    }


    打成jar包,hadoop中运行,可在mysql中查询到运行结果(mysql中要存在对应的表)

  • 相关阅读:
    内部类
    三大修饰符:static、final、abstract
    面向对象三大特性
    类和方法
    Vue-创建工程+element UI
    xshell连接虚拟机较慢问题 -----已解决
    Hbase配置
    Hive的安装配置
    Hive内容+配置
    Redis全局命令
  • 原文地址:https://www.cnblogs.com/iiot/p/4479237.html
Copyright © 2011-2022 走看看