zoukankan      html  css  js  c++  java
  • mapreduce实现"浏览该商品的人大多数还浏览了"经典应用

    转自:http://blog.csdn.net/u011750989/article/details/12004065

    看着思路不错。是协同过滤的一种实现。

    注意:01是分隔符,16进制,ascii码里001,叫SOH(start of heading)。用vi编辑器Ctrl+v然后Ctrl+a。其他的分隔符有tab键(ascii 为9),空格(32)

    输入:

    日期 ...cookie id. ...商品id..

    xx xx xx

    输出:

    商品id 商品id列表(按优先级排序,用逗号分隔)

    xx xx

    比如:

    id1 id3,id0,id4,id2

    id2 id0,id5

    整个计算过程分为4步

    1、提取原始日志中的(日期,cookie id,商品id)信息,按天处理,最后输出数据格式

    商品id-0 商品id-1

    xx x x

    这一步做了次优化,商品id-0一定比商品id-1小,为了减少存储,在最后汇总数据转置下即可

    reduce做局部排序及排重

    2、基于上次的结果做汇总,按天计算

    商品id-0 商品id-1 关联值(关联值即同时访问这两个商品的用户数)

    xx x x xx

    3、汇总最近三个月数据,同时考虑时间衰减,时间越久关联值的贡献越低,最后输出两两商品的关联值(包括转置后)

    4、行列转换,生成最后要的推荐结果数据,按关联值排序生成

    第一个MR

    1. import java.io.IOException;
    2. import java.util.ArrayList;
    3. import org.apache.hadoop.conf.Configuration;
    4. import org.apache.hadoop.fs.FileSystem;
    5. import org.apache.hadoop.fs.Path;
    6. import org.apache.hadoop.io.LongWritable;
    7. import org.apache.hadoop.io.Text;
    8. import org.apache.hadoop.io.WritableComparable;
    9. import org.apache.hadoop.io.WritableComparator;
    10. import org.apache.hadoop.mapreduce.Job;
    11. import org.apache.hadoop.mapreduce.Mapper;
    12. import org.apache.hadoop.mapreduce.Partitioner;
    13. import org.apache.hadoop.mapreduce.Reducer;
    14. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    15. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    16. import org.apache.hadoop.util.GenericOptionsParser;
    17. import org.apache.log4j.Logger;
    18. /*
    19. * 输入:原始数据,会有重复
    20. *日期 cookie 楼盘id
    21. *
    22. * 输出:
    23. * 日期 楼盘id1 楼盘id2 //楼盘id1一定小于楼盘id2 ,按日期 cookie进行分组
    24. *
    25. */
    26. public class HouseMergeAndSplit {
    27. public staticclass Partitioner1 extends Partitioner<TextPair, Text> {
    28. @Override
    29. public int getPartition(TextPair key, Text value,int numParititon) {
    30. return Math.abs((new Text(key.getFirst().toString()+key.getSecond().toString())).hashCode() *127) % numParititon;
    31. }
    32. }
    33. public staticclass Comp1 extends WritableComparator {
    34. public Comp1() {
    35. super(TextPair.class,true);
    36. }
    37. @SuppressWarnings("unchecked")
    38. public int compare(WritableComparable a, WritableComparable b) {
    39. TextPair t1 = (TextPair) a;
    40. TextPair t2 = (TextPair) b;
    41. int comp= t1.getFirst().compareTo(t2.getFirst());
    42. if (comp!=0)
    43. return comp;
    44. return t1.getSecond().compareTo(t2.getSecond());
    45. }
    46. }
    47. public staticclass TokenizerMapper
    48. extends Mapper<LongWritable, Text, TextPair, Text>{
    49. Text val=new Text("test");
    50. public void map(LongWritable key, Text value, Context context
    51. ) throws IOException, InterruptedException {
    52. String s[]=value.toString().split("01");
    53. TextPair tp=new TextPair(s[0],s[1],s[4]+s[3]);//thedate cookie city+houseid
    54. context.write(tp, val);
    55. }
    56. }
    57. public staticclass IntSumReducer
    58. extends Reducer<TextPair,Text,Text,Text> {
    59. private static String comparedColumn[] =new String[3];
    60. ArrayList<String> houselist= new ArrayList<String>();
    61. private static Text keyv =new Text();
    62. private static Text valuev =new Text();
    63. static Logger logger = Logger.getLogger(HouseMergeAndSplit.class.getName());
    64. public void reduce(TextPair key, Iterable<Text> values,
    65. Context context
    66. ) throws IOException, InterruptedException {
    67. houselist.clear();
    68. String thedate=key.getFirst().toString();
    69. String cookie=key.getSecond().toString();
    70. for (int i=0;i<3;i++)
    71. comparedColumn[i]="";
    72. //first+second为分组键,每次不同重新调用reduce函数
    73. for (Text val:values)
    74. {
    75. if (thedate.equals(comparedColumn[0]) && cookie.equals(comparedColumn[1])&& !key.getThree().toString().equals(comparedColumn[2]))
    76. {
    77. // context.write(new Text(key.getFirst()+" "+key.getSecond().toString()), new Text(key.getThree().toString()+" first"+ " "+comparedColumn[0]+" "+comparedColumn[1]+" "+comparedColumn[2]));
    78. houselist.add(key.getThree().toString());
    79. comparedColumn[0]=key.getFirst().toString();
    80. comparedColumn[1]=key.getSecond().toString();
    81. comparedColumn[2]=key.getThree().toString();
    82. }
    83. if (!thedate.equals(comparedColumn[0])||!cookie.equals(comparedColumn[1]))
    84. {
    85. // context.write(new Text(key.getFirst()+" "+key.getSecond().toString()), new Text(key.getThree().toString()+" second"+ " "+comparedColumn[0]+" "+comparedColumn[1]+" "+comparedColumn[2]));
    86. houselist.add(key.getThree().toString());
    87. comparedColumn[0]=key.getFirst().toString();
    88. comparedColumn[1]=key.getSecond().toString();
    89. comparedColumn[2]=key.getThree().toString();
    90. }
    91. }
    92. keyv.set(comparedColumn[0]); //日期
    93. //valuev.set(houselist.toString());
    94. //logger.info(houselist.toString());
    95. //context.write(keyv,valuev);
    96. for (int i=0;i<houselist.size()-1;i++)
    97. {
    98. for (int j=i+1;j<houselist.size();j++)
    99. { valuev.set(houselist.get(i)+" "+houselist.get(j));//关联的楼盘
    100. context.write(keyv,valuev);
    101. }
    102. }
    103. }
    104. }
    105. public staticvoid main(String[] args) throws Exception {
    106. Configuration conf = new Configuration();
    107. String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    108. if (otherArgs.length != 2) {
    109. System.err.println("Usage: wordcount <in> <out>");
    110. System.exit(2);
    111. }
    112. FileSystem fstm = FileSystem.get(conf);
    113. Path outDir = new Path(otherArgs[1]);
    114. fstm.delete(outDir, true);
    115. conf.set("mapred.textoutputformat.separator"," "); //reduce输出时key value中间的分隔符
    116. Job job = new Job(conf, "HouseMergeAndSplit");
    117. job.setNumReduceTasks(4);
    118. job.setJarByClass(HouseMergeAndSplit.class);
    119. job.setMapperClass(TokenizerMapper.class);
    120. job.setMapOutputKeyClass(TextPair.class);
    121. job.setMapOutputValueClass(Text.class);
    122. // 设置partition
    123. job.setPartitionerClass(Partitioner1.class);
    124. // 在分区之后按照指定的条件分组
    125. job.setGroupingComparatorClass(Comp1.class);
    126. // 设置reduce
    127. // 设置reduce的输出
    128. job.setReducerClass(IntSumReducer.class);
    129. job.setOutputKeyClass(Text.class);
    130. job.setOutputValueClass(Text.class);
    131. //job.setNumReduceTasks(18);
    132. FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    133. FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    134. System.exit(job.waitForCompletion(true) ?0 : 1);
    135. }
    136. }

    TextPair

    1. import java.io.DataInput;
    2. import java.io.DataOutput;
    3. import java.io.IOException;
    4. import org.apache.hadoop.io.Text;
    5. import org.apache.hadoop.io.WritableComparable;
    6. public class TextPairimplements WritableComparable<TextPair> {
    7. private Text first;
    8. private Text second;
    9. private Text three;
    10. public TextPair() {
    11. set(new Text(), new Text(),new Text());
    12. }
    13. public TextPair(String first, String second,String three) {
    14. set(new Text(first), new Text(second),new Text(three));
    15. }
    16. public TextPair(Text first, Text second,Text Three) {
    17. set(first, second,three);
    18. }
    19. public void set(Text first, Text second,Text three) {
    20. this.first = first;
    21. this.second = second;
    22. this.three=three;
    23. }
    24. public Text getFirst() {
    25. return first;
    26. }
    27. public Text getSecond() {
    28. return second;
    29. }
    30. public Text getThree() {
    31. return three;
    32. }
    33. public void write(DataOutput out)throws IOException {
    34. first.write(out);
    35. second.write(out);
    36. three.write(out);
    37. }
    38. public void readFields(DataInput in)throws IOException {
    39. first.readFields(in);
    40. second.readFields(in);
    41. three.readFields(in);
    42. }
    43. public int compareTo(TextPair tp) {
    44. int cmp = first.compareTo(tp.first);
    45. if (cmp != 0) {
    46. return cmp;
    47. }
    48. cmp= second.compareTo(tp.second);
    49. if (cmp != 0) {
    50. return cmp;
    51. }
    52. return three.compareTo(tp.three);
    53. }
    54. }


    TextPairSecond

    1. import java.io.DataInput;
    2. import java.io.DataOutput;
    3. import java.io.IOException;
    4. import org.apache.hadoop.io.FloatWritable;
    5. import org.apache.hadoop.io.Text;
    6. import org.apache.hadoop.io.WritableComparable;
    7. public class TextPairSecondimplements WritableComparable<TextPairSecond> {
    8. private Text first;
    9. private FloatWritable second;
    10. public TextPairSecond() {
    11. set(new Text(), new FloatWritable());
    12. }
    13. public TextPairSecond(String first,float second) {
    14. set(new Text(first), new FloatWritable(second));
    15. }
    16. public TextPairSecond(Text first, FloatWritable second) {
    17. set(first, second);
    18. }
    19. public void set(Text first, FloatWritable second) {
    20. this.first = first;
    21. this.second = second;
    22. }
    23. public Text getFirst() {
    24. return first;
    25. }
    26. public FloatWritable getSecond() {
    27. return second;
    28. }
    29. public void write(DataOutput out)throws IOException {
    30. first.write(out);
    31. second.write(out);
    32. }
    33. public void readFields(DataInput in)throws IOException {
    34. first.readFields(in);
    35. second.readFields(in);
    36. }
    37. public int compareTo(TextPairSecond tp) {
    38. int cmp = first.compareTo(tp.first);
    39. if (cmp != 0) {
    40. return cmp;
    41. }
    42. return second.compareTo(tp.second);
    43. }
    44. }

    第二个MR

    1. import java.io.IOException;
    2. import java.text.SimpleDateFormat;
    3. import java.util.ArrayList;
    4. import java.util.Date;
    5. import org.apache.hadoop.conf.Configuration;
    6. import org.apache.hadoop.fs.FileSystem;
    7. import org.apache.hadoop.fs.Path;
    8. import org.apache.hadoop.io.IntWritable;
    9. import org.apache.hadoop.io.LongWritable;
    10. import org.apache.hadoop.io.NullWritable;
    11. import org.apache.hadoop.io.Text;
    12. import org.apache.hadoop.io.WritableComparable;
    13. import org.apache.hadoop.io.WritableComparator;
    14. import org.apache.hadoop.mapred.OutputCollector;
    15. import org.apache.hadoop.mapreduce.Job;
    16. import org.apache.hadoop.mapreduce.Mapper;
    17. import org.apache.hadoop.mapreduce.Partitioner;
    18. import org.apache.hadoop.mapreduce.Reducer;
    19. import org.apache.hadoop.mapreduce.Mapper.Context;
    20. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    21. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    22. import org.apache.hadoop.util.GenericOptionsParser;
    23. import org.apache.log4j.Logger;
    24. /*
    25. * 统计楼盘之间共同出现的次数
    26. * 输入:
    27. * 日期 楼盘1 楼盘2
    28. *
    29. * 输出:
    30. * 日期 楼盘1 楼盘2 共同出现的次数
    31. *
    32. */
    33. public class HouseCount {
    34. public staticclass TokenizerMapper
    35. extends Mapper<LongWritable, Text, Text, IntWritable>{
    36. IntWritable iw=new IntWritable(1);
    37. public void map(LongWritable key, Text value, Context context
    38. ) throws IOException, InterruptedException {
    39. context.write(value, iw);
    40. }
    41. }
    42. public staticclass IntSumReducer
    43. extends Reducer<Text,IntWritable,Text,IntWritable> {
    44. IntWritable result=new IntWritable();
    45. public void reduce(Text key, Iterable<IntWritable> values,
    46. Context context
    47. ) throws IOException, InterruptedException {
    48. int sum=0;
    49. for (IntWritable iw:values)
    50. {
    51. sum+=iw.get();
    52. }
    53. result.set(sum);
    54. context.write(key, result) ;
    55. }
    56. }
    57. public staticvoid main(String[] args) throws Exception {
    58. Configuration conf = new Configuration();
    59. String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    60. if (otherArgs.length != 2) {
    61. System.err.println("Usage: wordcount <in> <out>");
    62. System.exit(2);
    63. }
    64. FileSystem fstm = FileSystem.get(conf);
    65. Path outDir = new Path(otherArgs[1]);
    66. fstm.delete(outDir, true);
    67. conf.set("mapred.textoutputformat.separator"," "); //reduce输出时key value中间的分隔符
    68. Job job = new Job(conf, "HouseCount");
    69. job.setNumReduceTasks(2);
    70. job.setJarByClass(HouseCount.class);
    71. job.setMapperClass(TokenizerMapper.class);
    72. job.setMapOutputKeyClass(Text.class);
    73. job.setMapOutputValueClass(IntWritable.class);
    74. // 设置reduce
    75. // 设置reduce的输出
    76. job.setReducerClass(IntSumReducer.class);
    77. job.setOutputKeyClass(Text.class);
    78. job.setOutputValueClass(IntWritable.class);
    79. //job.setNumReduceTasks(18);
    80. FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    81. FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    82. System.exit(job.waitForCompletion(true) ?0 : 1);
    83. }
    84. }


    第三个MR

    1. import java.io.IOException;
    2. import java.text.ParseException;
    3. import java.text.SimpleDateFormat;
    4. import java.util.ArrayList;
    5. import java.util.Calendar;
    6. import java.util.Date;
    7. import org.apache.hadoop.conf.Configuration;
    8. import org.apache.hadoop.fs.FileSystem;
    9. import org.apache.hadoop.fs.Path;
    10. import org.apache.hadoop.io.FloatWritable;
    11. import org.apache.hadoop.io.IntWritable;
    12. import org.apache.hadoop.io.LongWritable;
    13. import org.apache.hadoop.io.NullWritable;
    14. import org.apache.hadoop.io.Text;
    15. import org.apache.hadoop.io.WritableComparable;
    16. import org.apache.hadoop.io.WritableComparator;
    17. import org.apache.hadoop.mapred.OutputCollector;
    18. import org.apache.hadoop.mapreduce.Job;
    19. import org.apache.hadoop.mapreduce.Mapper;
    20. import org.apache.hadoop.mapreduce.Partitioner;
    21. import org.apache.hadoop.mapreduce.Reducer;
    22. import org.apache.hadoop.mapreduce.Mapper.Context;
    23. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    24. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    25. import org.apache.hadoop.util.GenericOptionsParser;
    26. import org.apache.log4j.Logger;
    27. /*
    28. * 汇总近三个月统计楼盘之间共同出现的次数,考虑衰减系数, 并最后a b 转成 b a输出一次
    29. * 输入:
    30. * 日期 楼盘1 楼盘2 共同出现的次数
    31. *
    32. * 输出
    33. * 楼盘1 楼盘2 共同出现的次数(考虑了衰减系数,每天的衰减系数不一样)
    34. *
    35. */
    36. public class HouseCountHz {
    37. public staticclass HouseCountHzMapper
    38. extends Mapper<LongWritable, Text, Text, FloatWritable>{
    39. Text keyv=new Text();
    40. FloatWritable valuev=new FloatWritable();
    41. public void map(LongWritable key, Text value, Context context
    42. ) throws IOException, InterruptedException {
    43. String[] s=value.toString().split(" ");
    44. keyv.set(s[1]+" "+s[2]);//楼盘1,楼盘2
    45. Calendar date1=Calendar.getInstance();
    46. Calendar d2=Calendar.getInstance();
    47. Date b = null;
    48. SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd");
    49. try {
    50. b=sdf.parse(s[0]);
    51. } catch (ParseException e) {
    52. e.printStackTrace();
    53. }
    54. d2.setTime(b);
    55. long n=date1.getTimeInMillis();
    56. long birth=d2.getTimeInMillis();
    57. long sss=n-birth;
    58. int day=(int)((sss)/(3600*24*1000));//该条记录的日期与当前日期的日期差
    59. float factor=1/(1+(float)(day-1)/10);//衰减系数
    60. valuev.set(Float.parseFloat(s[3])*factor);
    61. context.write(keyv, valuev);
    62. }
    63. }
    64. public staticclass HouseCountHzReducer
    65. extends Reducer<Text,FloatWritable,Text,FloatWritable> {
    66. FloatWritable result=new FloatWritable();
    67. Text keyreverse=new Text();
    68. public void reduce(Text key, Iterable<FloatWritable> values,
    69. Context context
    70. ) throws IOException, InterruptedException {
    71. float sum=0;
    72. for (FloatWritable iw:values)
    73. {
    74. sum+=iw.get();
    75. }
    76. result.set(sum);
    77. String[] keys=key.toString().split(" ");
    78. keyreverse.set(keys[1]+" "+keys[0]);
    79. context.write(key, result) ;
    80. context.write(keyreverse, result) ;
    81. }
    82. }
    83. public staticvoid main(String[] args) throws Exception {
    84. Configuration conf = new Configuration();
    85. String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    86. if (otherArgs.length !=2) {
    87. System.err.println("Usage: wordcount <in> <out>");
    88. System.exit(2);
    89. }
    90. FileSystem fstm = FileSystem.get(conf);
    91. Path outDir = new Path(otherArgs[1]);
    92. fstm.delete(outDir, true);
    93. conf.set("mapred.textoutputformat.separator"," "); //reduce输出时key value中间的分隔符
    94. Job job = new Job(conf,"HouseCountHz");
    95. job.setNumReduceTasks(2);
    96. job.setJarByClass(HouseCountHz.class);
    97. job.setMapperClass(HouseCountHzMapper.class);
    98. job.setMapOutputKeyClass(Text.class);
    99. job.setMapOutputValueClass(FloatWritable.class);
    100. // 设置reduce
    101. // 设置reduce的输出
    102. job.setReducerClass(HouseCountHzReducer.class);
    103. job.setOutputKeyClass(Text.class);
    104. job.setOutputValueClass(FloatWritable.class);
    105. //job.setNumReduceTasks(18);
    106. FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    107. FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    108. System.exit(job.waitForCompletion(true) ?0 : 1);
    109. }
    110. }


    第四个MR

    1. import java.io.IOException;
    2. import java.util.Iterator;
    3. import org.apache.hadoop.conf.Configuration;
    4. import org.apache.hadoop.fs.FileSystem;
    5. import org.apache.hadoop.fs.Path;
    6. import org.apache.hadoop.io.FloatWritable;
    7. import org.apache.hadoop.io.LongWritable;
    8. import org.apache.hadoop.io.Text;
    9. import org.apache.hadoop.io.WritableComparable;
    10. import org.apache.hadoop.io.WritableComparator;
    11. import org.apache.hadoop.mapreduce.Job;
    12. import org.apache.hadoop.mapreduce.Mapper;
    13. import org.apache.hadoop.mapreduce.Partitioner;
    14. import org.apache.hadoop.mapreduce.Reducer;
    15. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    16. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    17. import org.apache.hadoop.util.GenericOptionsParser;
    18. /*
    19. * 输入数据:
    20. * 楼盘1 楼盘2 共同出现的次数
    21. *
    22. * 输出数据
    23. * 楼盘1 楼盘2,楼盘3,楼盘4 (按次数排序)
    24. */
    25. public class HouseRowToCol {
    26. public staticclass Partitioner1 extends Partitioner<TextPairSecond, Text> {
    27. @Override
    28. //分区
    29. public int getPartition(TextPairSecond key, Text value,int numParititon) {
    30. return Math.abs((new Text(key.getFirst().toString()+key.getSecond().toString())).hashCode() *127) % numParititon;
    31. }
    32. }
    33. //分组
    34. public staticclass Comp1 extends WritableComparator {
    35. public Comp1() {
    36. super(TextPairSecond.class,true);
    37. }
    38. @SuppressWarnings("unchecked")
    39. public int compare(WritableComparable a, WritableComparable b) {
    40. TextPairSecond t1 = (TextPairSecond) a;
    41. TextPairSecond t2 = (TextPairSecond) b;
    42. return t1.getFirst().compareTo(t2.getFirst());
    43. }
    44. }
    45. //排序
    46. public staticclass KeyComp extends WritableComparator {
    47. public KeyComp() {
    48. super(TextPairSecond.class,true);
    49. }
    50. @SuppressWarnings("unchecked")
    51. public int compare(WritableComparable a, WritableComparable b) {
    52. TextPairSecond t1 = (TextPairSecond) a;
    53. TextPairSecond t2 = (TextPairSecond) b;
    54. int comp= t1.getFirst().compareTo(t2.getFirst());
    55. if (comp!=0)
    56. return comp;
    57. return -t1.getSecond().compareTo(t2.getSecond());
    58. }
    59. }
    60. public staticclass HouseRowToColMapper
    61. extends Mapper<LongWritable, Text, TextPairSecond, Text>{
    62. Text houseid1=new Text();
    63. Text houseid2=new Text();
    64. FloatWritable weight=new FloatWritable();
    65. public void map(LongWritable key, Text value, Context context
    66. ) throws IOException, InterruptedException {
    67. String s[]=value.toString().split(" ");
    68. weight.set(Float.parseFloat(s[2]));
    69. houseid1.set(s[0]);
    70. houseid2.set(s[1]);
    71. TextPairSecond tp=new TextPairSecond(houseid1,weight);
    72. context.write(tp, houseid2);
    73. }
    74. }
    75. public staticclass HouseRowToColReducer
    76. extends Reducer<TextPairSecond,Text,Text,Text> {
    77. Text valuev=new Text();
    78. public void reduce(TextPairSecond key, Iterable<Text> values,
    79. Context context
    80. ) throws IOException, InterruptedException {
    81. Text keyv=key.getFirst();
    82. Iterator<Text> it=values.iterator();
    83. StringBuilder sb=new StringBuilder(it.next().toString());
    84. while(it.hasNext())
    85. {
    86. sb.append(","+it.next().toString());
    87. }
    88. valuev.set(sb.toString());
    89. context.write(keyv, valuev);
    90. }
    91. }
    92. public staticvoid main(String[] args) throws Exception {
    93. Configuration conf = new Configuration();
    94. String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    95. if (otherArgs.length != 2) {
    96. System.err.println("Usage: wordcount <in> <out>");
    97. System.exit(2);
    98. }
    99. FileSystem fstm = FileSystem.get(conf);
    100. Path outDir = new Path(otherArgs[1]);
    101. fstm.delete(outDir, true);
    102. conf.set("mapred.textoutputformat.separator"," "); //reduce输出时key value中间的分隔符
    103. Job job = new Job(conf, "HouseRowToCol");
    104. job.setNumReduceTasks(4);
    105. job.setJarByClass(HouseRowToCol.class);
    106. job.setMapperClass(HouseRowToColMapper.class);
    107. job.setMapOutputKeyClass(TextPairSecond.class);
    108. job.setMapOutputValueClass(Text.class);
    109. // 设置partition
    110. job.setPartitionerClass(Partitioner1.class);
    111. // 在分区之后按照指定的条件分组
    112. job.setGroupingComparatorClass(Comp1.class);
    113. job.setSortComparatorClass(KeyComp.class);
    114. // 设置reduce
    115. // 设置reduce的输出
    116. job.setReducerClass(HouseRowToColReducer.class);
    117. job.setOutputKeyClass(Text.class);
    118. job.setOutputValueClass(Text.class);
    119. //job.setNumReduceTasks(18);
    120. FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    121. FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    122. System.exit(job.waitForCompletion(true) ?0 : 1);
    123. }
    124. }
  • 相关阅读:
    接口优先于抽象类
    接口优先于抽象类
    PHP的isset()函数
    mysql linux安装
    为mediawiki用户重置密码
    explode在PHP中的用法
    ERROR 1045 (28000): Access denied for user 'ODBC'@'localhost' (using password: N)
    http://blog.csdn.net/spidertiger/archive/2006/09/11/1206512.aspx
    最新linux+vsftpd配置详解
    widows下安装mediawiki
  • 原文地址:https://www.cnblogs.com/cl1024cl/p/6205450.html
Copyright © 2011-2022 走看看