zoukankan      html  css  js  c++  java
  • 第二个MapReduce

    大家在学习Hadoop的MapReduce的时候,90%的第一个程序都是WordCount,所以在这里分享一下我的第二个MapReduce程序。对于学习编程语言的人来说,有时候代码是最好的沟通方式之一。

     1 package com.zhongxin.mr;
     2 
     3 import org.apache.hadoop.conf.Configuration;
     4 import org.apache.hadoop.fs.Path;
     5 import org.apache.hadoop.io.LongWritable;
     6 import org.apache.hadoop.io.Text;
     7 import org.apache.hadoop.mapreduce.Job;
     8 import org.apache.hadoop.mapreduce.Mapper;
     9 import org.apache.hadoop.mapreduce.Reducer;
    10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    11 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    13 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    14 
    15 import java.io.IOException;
    16 import java.math.BigDecimal;
    17 import java.util.regex.Pattern;
    18 
    19 /**
    20  * 用户已收本息
    21  * Created by DingYS on 2017/11/21.
    22  */
    23 public class UserReceiveAmount {
    24 
    25     public static class Map extends Mapper<LongWritable,Text,Text,Text>{
    26         private Text outKey = new Text();
    27         private Text outValue = new Text();
    28         private Pattern pattern = Pattern.compile(",");
    29 
    30 
    31         @Override
    32         public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
    33             // 利息
    34             BigDecimal interest = new BigDecimal(0);
    35             // 本金
    36             BigDecimal capital = new BigDecimal(0);
    37             String splits[] = pattern.split(String.valueOf(value));
    38             String onwerType = splits[2];
    39             String fundsDirection = splits[6];
    40             String tradeType = splits[5];
    41             String penaltyAmount = splits[15];
    42             String tradeAmount = splits[7];
    43             String tradeShare = splits[8];
    44             String ownerCustomNo = splits[1];
    45             if("USER".equals(onwerType) && "INCR".equals(fundsDirection) && !Pattern.matches("CURRENT_.*?",tradeType)){
    46                 if("INTEREST".equals(tradeType) && ("null".equals(penaltyAmount) || "".equals(penaltyAmount) ||"0.00".equals(penaltyAmount))){
    47                     interest =new BigDecimal(Double.parseDouble(tradeAmount)).setScale(2,BigDecimal.ROUND_HALF_UP);
    48                 }else{
    49                         interest = new BigDecimal(Double.parseDouble(tradeAmount)).subtract(new BigDecimal(Double.parseDouble(tradeShare))).setScale(2,BigDecimal.ROUND_HALF_UP);
    50                         capital = new BigDecimal(Double.parseDouble(tradeShare)).setScale(2,BigDecimal.ROUND_HALF_UP);
    51                 }
    52                 outKey.set(ownerCustomNo);
    53                 outValue.set(String.valueOf(interest) + pattern + String.valueOf(capital));
    54                 context.write(outKey,outValue);
    55             }
    56         }
    57     }
    58 
    59     public static class Reduce extends Reducer<Text,Text,Text,Text>{
    60 
    61         public void reduce(Text key,Iterable<Text> values,Context context) throws IOException,InterruptedException{
    62             Text outValue = new Text();
    63             BigDecimal interest = new BigDecimal(0);
    64             BigDecimal capital = new BigDecimal(0);
    65             for(Text value:values){
    66                 String[] splits = value.toString().split(",");
    67                 interest = interest.add(new BigDecimal(Double.parseDouble(splits[0]))).setScale(2,BigDecimal.ROUND_HALF_UP);
    68                 capital = capital.add(new BigDecimal(Double.parseDouble(splits[1]))).setScale(2,BigDecimal.ROUND_HALF_UP);
    69             }
    70             outValue.set(String.valueOf(interest) + "\t" + String.valueOf(capital));
    71             context.write(key,outValue);
    72         }
    73     }
    74 
    75     public static void main(String[] args) throws Exception{
    76         Configuration config = new Configuration();
    77         Job job = Job.getInstance(config);
    78         job.setJobName("userReceiveAmount");
    79         job.setJarByClass(UserReceiveAmount.class);
    80 
    81         job.setOutputKeyClass(Text.class);
    82         job.setOutputValueClass(Text.class);
    83 
    84         job.setMapperClass(Map.class);
    85         job.setReducerClass(Reduce.class);
    86 
    87         job.setInputFormatClass(TextInputFormat.class);
    88         job.setOutputFormatClass(TextOutputFormat.class);
    89 
    90         FileInputFormat.addInputPath(job,new Path(args[0]));
    91         FileOutputFormat.setOutputPath(job,new Path(args[1]));
    92 
    93         job.waitForCompletion(true);
    94 
    95     }
    96 }

    对于看懂mapReduce这个程序,有一个非常关键的点就是:map每次读取一行数据,相同key的数据进入到同一个reduce中。

    上面是将统计结果输出到hdfs上,下面来一个输出到Hbase中的,请看码

    package com.zhongxin.mr;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Mutation;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    
    import java.io.IOException;
    import java.math.BigDecimal;
    import java.util.regex.Pattern;
    
    /**
     * 用户已收本息
     * Created by DingYS on 2017/11/21.
     */
    public class UserReceiveAmount {
    
        public static class Map extends Mapper<LongWritable,Text,Text,Text>{
            private Text outKey = new Text();
            private Text outValue = new Text();
            private Pattern pattern = Pattern.compile(",");
    
    
            @Override
            public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
                // 利息
                BigDecimal interest = new BigDecimal(0);
                // 本金
                BigDecimal capital = new BigDecimal(0);
                String splits[] = pattern.split(String.valueOf(value));
                String onwerType = splits[2];
                String fundsDirection = splits[6];
                String tradeType = splits[5];
                String penaltyAmount = splits[15];
                String tradeAmount = splits[7];
                String tradeShare = splits[8];
                String ownerCustomNo = splits[1];
                if("USER".equals(onwerType) && "INCR".equals(fundsDirection) && !Pattern.matches("CURRENT_.*?",tradeType)){
                    if("INTEREST".equals(tradeType) && ("null".equals(penaltyAmount) || "".equals(penaltyAmount) ||"0.00".equals(penaltyAmount))){
                        interest =new BigDecimal(Double.parseDouble(tradeAmount)).setScale(2,BigDecimal.ROUND_HALF_UP);
                    }else{
                            interest = new BigDecimal(Double.parseDouble(tradeAmount)).subtract(new BigDecimal(Double.parseDouble(tradeShare))).setScale(2,BigDecimal.ROUND_HALF_UP);
                            capital = new BigDecimal(Double.parseDouble(tradeShare)).setScale(2,BigDecimal.ROUND_HALF_UP);
                    }
                    outKey.set(ownerCustomNo);
                    outValue.set(String.valueOf(interest) + pattern + String.valueOf(capital));
                    context.write(outKey,outValue);
                }
            }
        }
    
        public static class Reduce extends TableReducer<Text,Text,ImmutableBytesWritable> {
    
            ImmutableBytesWritable k = new ImmutableBytesWritable();
    
            public void reduce(Text key,Iterable<Text> values,Context context) throws IOException,InterruptedException{
                BigDecimal interest = new BigDecimal(0);
                BigDecimal capital = new BigDecimal(0);
                for(Text value:values){
                    String[] splits = value.toString().split(",");
                    interest = interest.add(new BigDecimal(Double.parseDouble(splits[0]))).setScale(2,BigDecimal.ROUND_HALF_UP);
                    capital = capital.add(new BigDecimal(Double.parseDouble(splits[1]))).setScale(2,BigDecimal.ROUND_HALF_UP);
                }
                String family = "info";
                Put put = new Put(String.valueOf(key).getBytes());
                put.addColumn(family.getBytes(),"interest".getBytes(),String.valueOf(interest).getBytes());
                put.addColumn(family.getBytes(),"capital".getBytes(),String.valueOf(capital).getBytes());
                k.set(key.getBytes());
                context.write(k,put);
            }
        }
    
        public static void main(String[] args) throws Exception{
            Configuration config = HBaseConfiguration.create();
            Job job = Job.getInstance(config,"userReceiveAmount");
            job.setJarByClass(UserReceiveAmount.class);
    
            FileInputFormat.addInputPath(job,new Path(args[0]));
            job.setMapperClass(Map.class);
            TableMapReduceUtil.initTableReducerJob("userReceiveAmount",Reduce.class,job);
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
    
            job.setOutputKeyClass(ImmutableBytesWritable.class);
            job.setOutputValueClass(Text.class);
            job.setOutputValueClass(Mutation.class);
    
            System.exit(job.waitForCompletion(true) ? 0 : 1);
    
        }
    }
    

      注意点:rowkey的类型需要是String,如果是Text,需要Text.toString()一下,Text中重写了toString()方法,经测试String.valueOf()也没问题

    Put put = new Put(rowkey.getBytes());
    

      

  • 相关阅读:
    nullnullConnecting with WiFi Direct 与WiFi直接连接
    nullnullUsing WiFi Direct for Service Discovery 直接使用WiFi服务发现
    nullnullSetting Up the Loader 设置装载机
    nullnullDefining and Launching the Query 定义和启动查询
    nullnullHandling the Results 处理结果
    装置输出喷泉装置(贪心问题)
    数据状态什么是事务?
    停止方法iOS CGD 任务开始与结束
    盘文件云存储——金山快盘
    函数标识符解决jQuery与其他库冲突的方法
  • 原文地址:https://www.cnblogs.com/Smilence1024/p/7884679.html
Copyright © 2011-2022 走看看