zoukankan      html  css  js  c++  java
  • hadoop之join链接

    1、数据量大;

      1 import org.apache.hadoop.conf.Configuration;
      2 import org.apache.hadoop.fs.Path;
      3 import org.apache.hadoop.io.NullWritable;
      4 import org.apache.hadoop.io.Text;
      5 import org.apache.hadoop.mapreduce.Job;
      6 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
      7 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
      8 
      9 /**
     10  *
     11  */
     12 public class App {
     13     public static void main(String[] args) throws Exception {
     14         Configuration conf = new Configuration();
     15         conf.set("fs.defaultFS", "file:///");
     16         conf.set("mapreduce.framework.name", "local");
     17 
     18         Job job = Job.getInstance(conf);
     19         job.setJobName("ReduceJoinApp");
     20         job.setJarByClass(App.class);
     21         //
     22         FileInputFormat.addInputPath(job, new Path("d:/mr/join"));
     23         FileOutputFormat.setOutputPath(job, new Path("d:/mr/join/out"));
     24         //
     25         job.setMapperClass(ReduceJoinMapper.class);
     26         job.setReducerClass(ReduceJoinReducer.class);
     27 
     28         job.setMapOutputKeyClass(ComboKey.class);
     29         job.setMapOutputValueClass(Text.class);
     30 
     31         job.setOutputKeyClass(Text.class);
     32         job.setOutputValueClass(NullWritable.class);
     33 
     34         //分区类
     35         job.setPartitionerClass(CIDPartitioner.class);
     36         //排序类
     37         job.setSortComparatorClass(ComboKeyComparator.class);
     38         //分组类
     39         job.setGroupingComparatorClass(CIDGroupComparator.class);
     40 
     41         job.waitForCompletion(true);
     42     }
     43 }
     44 
     45 
     46 
     47 import org.apache.hadoop.io.WritableComparable;
     48 
     49 import java.io.DataInput;
     50 import java.io.DataOutput;
     51 import java.io.IOException;
     52 
     53 /**
     54  * 组合可以
     55  */
     56 public class ComboKey implements WritableComparable<ComboKey> {
     57     //0-customer 1-order
     58     public int type ;
     59     //cid
     60     public int cid ;
     61     //oid
     62     public int oid ;
     63 
     64     public int compareTo(ComboKey o) {
     65         int type0 = o.type ;
     66         int cid0 = o.cid ;
     67         int oid0 = o.oid ;
     68         //customer
     69         if(type == 0){
     70             //customer
     71             if(type0 == 0){
     72                 return cid - cid0 ;
     73             }
     74             //order
     75             else{
     76                 if(cid == cid0){
     77                     return -1;
     78                 }
     79                 else{
     80                     return cid - cid0 ;
     81                 }
     82             }
     83         }
     84         //order
     85         else{
     86             //customer
     87             if(type0 == 0){
     88                 if(cid == cid0){
     89                     return 1 ;
     90                 }
     91                 else{
     92                     return cid - cid0 ;
     93                 }
     94             }
     95             //orders
     96             else{
     97                 if(cid == cid0){
     98                     return oid - oid0 ;
     99                 }
    100                 else{
    101                     return cid - cid0 ;
    102                 }
    103             }
    104         }
    105     }
    106 
    107     public void write(DataOutput out) throws IOException {
    108         out.writeInt(type);
    109         out.writeInt(cid);
    110         out.writeInt(oid);
    111     }
    112 
    113     public void readFields(DataInput in) throws IOException {
    114         this.type = in.readInt();
    115         this.cid = in.readInt();
    116         this.oid = in.readInt();
    117     }
    118 }
    119 
    120 
    121 
    122 import org.apache.hadoop.io.WritableComparable;
    123 import org.apache.hadoop.io.WritableComparator;
    124 
    125 /**
    126  *
    127  */
    128 public class ComboKeyComparator extends WritableComparator {
    129     protected ComboKeyComparator() {
    130         super(ComboKey.class, true);
    131     }
    132 
    133     public int compare(WritableComparable w1, WritableComparable w2) {
    134         ComboKey k1 = (ComboKey) w1;
    135         ComboKey k2 = (ComboKey) w2;
    136         return k1.compareTo(k2);
    137     }
    138 }
    139 
    140 
    141 
    142 import org.apache.hadoop.io.Text;
    143 import org.apache.hadoop.mapreduce.Partitioner;
    144 
    145 
    146 /**
    147  * Created by Administrator on 2017/6/1.
    148  */
    149 public class CIDPartitioner extends Partitioner<ComboKey,Text>{
    150     public int getPartition(ComboKey key, Text text, int numPartitions) {
    151         return key.cid % numPartitions;
    152     }
    153 }
    154 
    155 
    156 import org.apache.hadoop.io.WritableComparable;
    157 import org.apache.hadoop.io.WritableComparator;
    158 
    159 /**
    160  * Created by Administrator on 2017/6/1.
    161  */
    162 public class CIDGroupComparator extends WritableComparator {
    163     protected CIDGroupComparator() {
    164         super(ComboKey.class, true);
    165     }
    166 
    167     public int compare(WritableComparable w1, WritableComparable w2) {
    168         ComboKey k1 = (ComboKey) w1;
    169         ComboKey k2 = (ComboKey) w2;
    170         return k1.cid - k2.cid ;
    171     }
    172 }
    173 
    174 
    175 import org.apache.hadoop.io.LongWritable;
    176 import org.apache.hadoop.io.Text;
    177 import org.apache.hadoop.mapreduce.Mapper;
    178 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    179 
    180 import java.io.IOException;
    181 
    182 /**
    183  *
    184  */
    185 public class ReduceJoinMapper extends Mapper<LongWritable,Text,ComboKey, Text>{
    186     protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    187         String line = value.toString();
    188         String[] arr = line.split(",") ;
    189         FileSplit split = (FileSplit) context.getInputSplit();
    190         ComboKey k = new ComboKey();
    191         //customers
    192         if(split.getPath().toString().contains("customers")){
    193             k.type = 0 ;
    194             k.cid = Integer.parseInt(arr[0]);
    195         }
    196         //orders
    197         else{
    198             k.type= 1 ;
    199             k.cid = Integer.parseInt(arr[3]);//cid
    200             k.oid = Integer.parseInt(arr[0]);//oid
    201         }
    202         context.write(k, value);
    203     }
    204 }
    205 
    206 
    207 import org.apache.hadoop.io.NullWritable;
    208 import org.apache.hadoop.io.Text;
    209 import org.apache.hadoop.mapreduce.Reducer;
    210 
    211 import java.io.IOException;
    212 import java.util.Iterator;
    213 
    214 
    215 /**
    216  * Created by Administrator on 2017/6/1.
    217  */
    218 public class ReduceJoinReducer extends Reducer<ComboKey,Text,Text,NullWritable> {
    219 
    220     protected void reduce(ComboKey key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
    221         Iterator<Text> it = values.iterator();
    222         int type = key.type ;
    223         String custInfo = null ;
    224         //customer
    225         if(type == 0){
    226             Text custText = it.next();
    227             custInfo = custText.toString() ;
    228             context.write(new Text(custInfo), NullWritable.get());
    229             while(it.hasNext()){
    230                 String orderInfo = it.next().toString();
    231                 context.write(new Text(custInfo + "," + orderInfo), NullWritable.get());
    232             }
    233         }
    234         //订单(没有客户)
    235         else{
    236             while(it.hasNext()){
    237                 String orderInfo = it.next().toString();
    238                 context.write(new Text("null," + orderInfo),NullWritable.get());
    239             }
    240         }
    241     }
    242 }


    2、数据量小,可直接内存加载

     1 import com.it18zhang.hadoop.mapreduce.inputformat.wholefile.WholeFileInputFormat;
     2 import com.it18zhang.hadoop.mapreduce.inputformat.wholefile.WordCountMapper;
     3 import com.it18zhang.hadoop.mapreduce.inputformat.wholefile.WordCountReducer;
     4 import org.apache.hadoop.conf.Configuration;
     5 import org.apache.hadoop.fs.Path;
     6 import org.apache.hadoop.io.IntWritable;
     7 import org.apache.hadoop.io.NullWritable;
     8 import org.apache.hadoop.io.Text;
     9 import org.apache.hadoop.mapreduce.Job;
    10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    12 
    13 import java.io.IOException;
    14 
    15 /**
    16  * Created by Administrator on 2017/6/1.
    17  */
    18 public class App {
    19     public static void main(String[] args) throws Exception {
    20         Configuration conf = new Configuration();
    21         conf.set("fs.defaultFS", "file:///");
    22         conf.set("mapreduce.framework.name", "local");
    23         conf.set("customers.data.path", "file:///d:/mr/customers.txt");
    24         //创建job
    25         Job job = Job.getInstance(conf);
    26         job.setJobName("MapJoin");
    27         job.setJarByClass(App.class);
    28 
    29         //添加输入路径(可以多次添加)
    30         FileInputFormat.addInputPath(job, new Path("d:/mr/orders.txt"));
    31         //设置输出路径,只能一个。而且目录不能存在。
    32         FileOutputFormat.setOutputPath(job, new Path("d:/mr/out"));
    33 
    34         //设置mapper类
    35         job.setMapperClass(JoinMapper.class);
    36         job.setNumReduceTasks(0);
    37 
    38         job.setMapOutputKeyClass(Text.class);
    39         job.setMapOutputValueClass(NullWritable.class);
    40 
    41         job.waitForCompletion(true);
    42     }
    43     
    44 }
    45 
    46 import org.apache.hadoop.conf.Configuration;
    47 import org.apache.hadoop.fs.FSDataInputStream;
    48 import org.apache.hadoop.fs.FileSystem;
    49 import org.apache.hadoop.fs.Path;
    50 import org.apache.hadoop.io.LongWritable;
    51 import org.apache.hadoop.io.Text;
    52 import org.apache.hadoop.io.NullWritable;
    53 import org.apache.hadoop.mapreduce.Mapper;
    54 
    55 import java.io.BufferedReader;
    56 import java.io.IOException;
    57 import java.io.InputStreamReader;
    58 import java.util.HashMap;
    59 import java.util.Map;
    60 
    61 
    62 /**
    63  *
    64  */
    65 public class JoinMapper extends Mapper<LongWritable,Text,Text,NullWritable>{
    66     //存放所有customers
    67     private Map<String,String> allCustomers = new HashMap<String, String>();
    68 
    69     /**
    70      * 读取customers数据到内存中
    71      */
    72     protected void setup(Context context) throws IOException, InterruptedException {
    73         //得到文件系统
    74         Configuration conf = context.getConfiguration();
    75         FileSystem fs = FileSystem.get(conf);
    76         String path = conf.get("customers.data.path") ;
    77         FSDataInputStream in = fs.open(new Path(path));
    78         BufferedReader reader = new BufferedReader(new InputStreamReader(in));
    79         String line = null ;
    80         while((line = reader.readLine()) != null){
    81             String cid = line.split(",")[0] ;
    82             allCustomers.put(cid,line);
    83         }
    84         reader.close();
    85     }
    86 
    87     protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    88         String line = value.toString();
    89         String[] arr = line.split(",");
    90         //cid
    91         String cid = arr[3] ;
    92         String custInfo = allCustomers.get(cid);
    93         context.write(new Text(custInfo + "," + line),NullWritable.get());
    94     }
    95 }
  • 相关阅读:
    智能移动机器人背后蕴含的技术——激光雷达
    Kalman Filters
    Fiddler抓HttpClient的包
    VSCode开发WebApi EFCore的坑
    WPF之小米Logo超圆角的实现
    windows react打包发布
    jenkins in docker踩坑汇总
    Using ML.NET in Jupyter notebooks 在jupyter notebook中使用ML.NET ——No design time or full build available
    【Linux知识点】CentOS7 更换阿里云源
    【Golang 报错】exec gcc executable file not found in %PATH%
  • 原文地址:https://www.cnblogs.com/yihaifutai/p/6931522.html
Copyright © 2011-2022 走看看