zoukankan      html  css  js  c++  java
  • 第二个MapReduce

    大家在学习Hadoop的MapReduce的时候,90%的第一个程序都是WordCount,所以在这里分享一下我的第二个MapReduce程序。对于学习编程语言的人来说,有时候代码是最好的沟通方式之一。

     1 package com.zhongxin.mr;
     2 
     3 import org.apache.hadoop.conf.Configuration;
     4 import org.apache.hadoop.fs.Path;
     5 import org.apache.hadoop.io.LongWritable;
     6 import org.apache.hadoop.io.Text;
     7 import org.apache.hadoop.mapreduce.Job;
     8 import org.apache.hadoop.mapreduce.Mapper;
     9 import org.apache.hadoop.mapreduce.Reducer;
    10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    11 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    13 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    14 
    15 import java.io.IOException;
    16 import java.math.BigDecimal;
    17 import java.util.regex.Pattern;
    18 
    19 /**
    20  * 用户已收本息
    21  * Created by DingYS on 2017/11/21.
    22  */
    23 public class UserReceiveAmount {
    24 
    25     public static class Map extends Mapper<LongWritable,Text,Text,Text>{
    26         private Text outKey = new Text();
    27         private Text outValue = new Text();
    28         private Pattern pattern = Pattern.compile(",");
    29 
    30 
    31         @Override
    32         public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
    33             // 利息
    34             BigDecimal interest = new BigDecimal(0);
    35             // 本金
    36             BigDecimal capital = new BigDecimal(0);
    37             String splits[] = pattern.split(String.valueOf(value));
    38             String onwerType = splits[2];
    39             String fundsDirection = splits[6];
    40             String tradeType = splits[5];
    41             String penaltyAmount = splits[15];
    42             String tradeAmount = splits[7];
    43             String tradeShare = splits[8];
    44             String ownerCustomNo = splits[1];
    45             if("USER".equals(onwerType) && "INCR".equals(fundsDirection) && !Pattern.matches("CURRENT_.*?",tradeType)){
    46                 if("INTEREST".equals(tradeType) && ("null".equals(penaltyAmount) || "".equals(penaltyAmount) ||"0.00".equals(penaltyAmount))){
    47                     interest =new BigDecimal(Double.parseDouble(tradeAmount)).setScale(2,BigDecimal.ROUND_HALF_UP);
    48                 }else{
    49                         interest = new BigDecimal(Double.parseDouble(tradeAmount)).subtract(new BigDecimal(Double.parseDouble(tradeShare))).setScale(2,BigDecimal.ROUND_HALF_UP);
    50                         capital = new BigDecimal(Double.parseDouble(tradeShare)).setScale(2,BigDecimal.ROUND_HALF_UP);
    51                 }
    52                 outKey.set(ownerCustomNo);
    53                 outValue.set(String.valueOf(interest) + pattern + String.valueOf(capital));
    54                 context.write(outKey,outValue);
    55             }
    56         }
    57     }
    58 
    59     public static class Reduce extends Reducer<Text,Text,Text,Text>{
    60 
    61         public void reduce(Text key,Iterable<Text> values,Context context) throws IOException,InterruptedException{
    62             Text outValue = new Text();
    63             BigDecimal interest = new BigDecimal(0);
    64             BigDecimal capital = new BigDecimal(0);
    65             for(Text value:values){
    66                 String[] splits = value.toString().split(",");
    67                 interest = interest.add(new BigDecimal(Double.parseDouble(splits[0]))).setScale(2,BigDecimal.ROUND_HALF_UP);
    68                 capital = capital.add(new BigDecimal(Double.parseDouble(splits[1]))).setScale(2,BigDecimal.ROUND_HALF_UP);
    69             }
    70             outValue.set(String.valueOf(interest) + "\t" + String.valueOf(capital));
    71             context.write(key,outValue);
    72         }
    73     }
    74 
    75     public static void main(String[] args) throws Exception{
    76         Configuration config = new Configuration();
    77         Job job = Job.getInstance(config);
    78         job.setJobName("userReceiveAmount");
    79         job.setJarByClass(UserReceiveAmount.class);
    80 
    81         job.setOutputKeyClass(Text.class);
    82         job.setOutputValueClass(Text.class);
    83 
    84         job.setMapperClass(Map.class);
    85         job.setReducerClass(Reduce.class);
    86 
    87         job.setInputFormatClass(TextInputFormat.class);
    88         job.setOutputFormatClass(TextOutputFormat.class);
    89 
    90         FileInputFormat.addInputPath(job,new Path(args[0]));
    91         FileOutputFormat.setOutputPath(job,new Path(args[1]));
    92 
    93         job.waitForCompletion(true);
    94 
    95     }
    96 }

    对于看懂mapReduce这个程序,有一个非常关键的点就是:map每次读取一行数据,相同key的数据进入到同一个reduce中。

    上面是将统计结果输出到hdfs上,下面来一个输出到Hbase中的,请看码

    package com.zhongxin.mr;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Mutation;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    
    import java.io.IOException;
    import java.math.BigDecimal;
    import java.util.regex.Pattern;
    
    /**
     * 用户已收本息
     * Created by DingYS on 2017/11/21.
     */
    public class UserReceiveAmount {
    
        public static class Map extends Mapper<LongWritable,Text,Text,Text>{
            private Text outKey = new Text();
            private Text outValue = new Text();
            private Pattern pattern = Pattern.compile(",");
    
    
            @Override
            public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
                // 利息
                BigDecimal interest = new BigDecimal(0);
                // 本金
                BigDecimal capital = new BigDecimal(0);
                String splits[] = pattern.split(String.valueOf(value));
                String onwerType = splits[2];
                String fundsDirection = splits[6];
                String tradeType = splits[5];
                String penaltyAmount = splits[15];
                String tradeAmount = splits[7];
                String tradeShare = splits[8];
                String ownerCustomNo = splits[1];
                if("USER".equals(onwerType) && "INCR".equals(fundsDirection) && !Pattern.matches("CURRENT_.*?",tradeType)){
                    if("INTEREST".equals(tradeType) && ("null".equals(penaltyAmount) || "".equals(penaltyAmount) ||"0.00".equals(penaltyAmount))){
                        interest =new BigDecimal(Double.parseDouble(tradeAmount)).setScale(2,BigDecimal.ROUND_HALF_UP);
                    }else{
                            interest = new BigDecimal(Double.parseDouble(tradeAmount)).subtract(new BigDecimal(Double.parseDouble(tradeShare))).setScale(2,BigDecimal.ROUND_HALF_UP);
                            capital = new BigDecimal(Double.parseDouble(tradeShare)).setScale(2,BigDecimal.ROUND_HALF_UP);
                    }
                    outKey.set(ownerCustomNo);
                    outValue.set(String.valueOf(interest) + pattern + String.valueOf(capital));
                    context.write(outKey,outValue);
                }
            }
        }
    
        public static class Reduce extends TableReducer<Text,Text,ImmutableBytesWritable> {
    
            ImmutableBytesWritable k = new ImmutableBytesWritable();
    
            public void reduce(Text key,Iterable<Text> values,Context context) throws IOException,InterruptedException{
                BigDecimal interest = new BigDecimal(0);
                BigDecimal capital = new BigDecimal(0);
                for(Text value:values){
                    String[] splits = value.toString().split(",");
                    interest = interest.add(new BigDecimal(Double.parseDouble(splits[0]))).setScale(2,BigDecimal.ROUND_HALF_UP);
                    capital = capital.add(new BigDecimal(Double.parseDouble(splits[1]))).setScale(2,BigDecimal.ROUND_HALF_UP);
                }
                String family = "info";
                Put put = new Put(String.valueOf(key).getBytes());
                put.addColumn(family.getBytes(),"interest".getBytes(),String.valueOf(interest).getBytes());
                put.addColumn(family.getBytes(),"capital".getBytes(),String.valueOf(capital).getBytes());
                k.set(key.getBytes());
                context.write(k,put);
            }
        }
    
        public static void main(String[] args) throws Exception{
            Configuration config = HBaseConfiguration.create();
            Job job = Job.getInstance(config,"userReceiveAmount");
            job.setJarByClass(UserReceiveAmount.class);
    
            FileInputFormat.addInputPath(job,new Path(args[0]));
            job.setMapperClass(Map.class);
            TableMapReduceUtil.initTableReducerJob("userReceiveAmount",Reduce.class,job);
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
    
            job.setOutputKeyClass(ImmutableBytesWritable.class);
            job.setOutputValueClass(Text.class);
            job.setOutputValueClass(Mutation.class);
    
            System.exit(job.waitForCompletion(true) ? 0 : 1);
    
        }
    }
    

      注意点:rowkey的类型需要是String,如果是Text,需要Text.toString()一下,Text中重写了toString()方法,经测试String.valueOf()也没问题

    Put put = new Put(rowkey.getBytes());
    

      

  • 相关阅读:
    关于object和embed
    关于跨域问题的解决办法
    SQL 中 SELECT 语句的执行顺序
    sql子查询 嵌套SELECT语句
    PL/SQL中SELECT总结
    SQL Server
    SQL Server SELECT逻辑处理顺序
    SQL 基础:Select语句,各种join,union用法
    sql基础知识(新手必备)
    mysql处理海量数据时的一些优化查询速度方法
  • 原文地址:https://www.cnblogs.com/Smilence1024/p/7884679.html
Copyright © 2011-2022 走看看