zoukankan      html  css  js  c++  java
  • 五 数据组织模式 2) 分区模式 代码

    简单4个分区。 
    1. package com.rocky.mr.partition;
    2. import com.alibaba.fastjson.JSON;
    3. import com.alibaba.fastjson.JSONObject;
    4. import com.rocky.util.TimeUtils;
    5. import org.apache.hadoop.conf.Configurable;
    6. import org.apache.hadoop.conf.Configuration;
    7. import org.apache.hadoop.fs.FileSystem;
    8. import org.apache.hadoop.fs.Path;
    9. import org.apache.hadoop.io.IntWritable;
    10. import org.apache.hadoop.io.LongWritable;
    11. import org.apache.hadoop.io.Text;
    12. import org.apache.hadoop.mapreduce.Job;
    13. import org.apache.hadoop.mapreduce.Mapper;
    14. import org.apache.hadoop.mapreduce.Reducer;
    15. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    16. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    17. import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
    18. import java.io.IOException;
    19. import java.net.URI;
    20. import java.net.URISyntaxException;
    21. /**
    22. * Created by Administrator on 2016/4/11.
    23. */
    24. public class MyPartition {
    25. public static final String clazz = "com.spring.aop.StorageManagerStatAspect";
    26. public static final String m_download = "com.systoon.scloud.master.controller.ImageController.download";
    27. public static final String m_upload = "com.systoon.scloud.master.controller.DirectUploadFile.directUploadFile";
    28. /** patrition param */
    29. public static Text word = new Text();
    30. public static Text partitionDownload = new Text("download"); // download 0
    31. public static Text partitionUpload = new Text("upload"); // upload 1
    32. public static Text partitionOther = new Text("others"); // others 2
    33. public static Text partitionCount = new Text("count"); // count 3
    34. public static class PMapper extends Mapper<LongWritable, Text, Text, Text>{
    35. @Override
    36. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    37. String line = value.toString();
    38. word.set("1");
    39. context.write(partitionCount, new Text("1"));
    40. if(line.contains(clazz)){
    41. if(line.contains(m_download)){
    42. String tempObject = line.split(clazz)[1];
    43. String tmp = tempObject.substring(1,tempObject.length());
    44. JSONObject jsonObject = JSON.parseObject(tmp);
    45. String method = jsonObject.get("method").toString();
    46. if( method.equals(m_download) ){
    47. context.write(partitionDownload, word);
    48. }
    49. } else if(line.contains(m_upload)) {
    50. String tempObject = line.split(clazz)[1];
    51. String tmp = tempObject.substring(1,tempObject.length());
    52. JSONObject jsonObject = JSON.parseObject(tmp);
    53. String method = jsonObject.get("method").toString();
    54. if( method.equals(m_upload) ){
    55. context.write(partitionUpload, word);
    56. }
    57. } else {
    58. context.write(partitionOther, word);
    59. }
    60. } else {
    61. context.write(partitionOther , word);
    62. }
    63. }
    64. }
    65. public static class PReduce extends Reducer<Text,Text,Text,Text>{
    66. @Override
    67. protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
    68. // long count = 0l;
    69. // if(key.toString().equals(partitionCount.toString())){
    70. // for (Text test:values){
    71. // count ++;
    72. // }
    73. // word.set(count+"");
    74. // context.write(key,word);
    75. // } else {
    76. // for (Text test:values){
    77. // context.write(key,test);
    78. // }
    79. // word.set(count+"");
    80. // context.write(key,word);
    81. // }
    82. long count = 0l;
    83. for (Text text:values){
    84. count ++;
    85. }
    86. word.set(count + "");
    87. context.write(key,word);
    88. }
    89. }
    90. public static class CustomizationPartition extends HashPartitioner<Text,Text> implements Configurable {
    91. private Configuration conf = null;
    92. public CustomizationPartition(){
    93. }
    94. @Override
    95. public Configuration getConf() {
    96. return conf;
    97. }
    98. @Override
    99. public void setConf(Configuration conf) {
    100. this.conf = conf;
    101. }
    102. public int getPartition(Text key, Text value, int numReduceTasks){
    103. if(key.toString().equals("download")){
    104. return 0;
    105. } else if(key.toString().equals("upload")){
    106. return 1;
    107. } else if(key.toString().equals("count")){
    108. return 2;
    109. } else {
    110. // key.toString().equals("others")
    111. return 3;
    112. }
    113. }
    114. }
    115. public static void main(String[] args) throws URISyntaxException, IOException, ClassNotFoundException, InterruptedException {
    116. Configuration conf = new Configuration();
    117. String outPath = "/test/mapReduce/partition"+ TimeUtils.getStringDate();
    118. // check
    119. final FileSystem filesystem = FileSystem.get(new URI(outPath), conf);
    120. if(filesystem.exists(new Path(outPath))){
    121. filesystem.delete(new Path(outPath), true);
    122. }
    123. Job job = new Job( conf,"rocky_partition");
    124. job.setJarByClass(MyPartition.class);
    125. job.setPartitionerClass(CustomizationPartition.class);
    126. job.setNumReduceTasks(4);
    127. job.setMapperClass(PMapper.class);
    128. job.setReducerClass(PReduce.class);
    129. job.setOutputKeyClass(Text.class);
    130. job.setOutputValueClass(Text.class);
    131. FileInputFormat.addInputPath(job, new Path("/test/mapReduce/source/statistics.log.2016-03-01"));
    132. // FileInputFormat.addInputPath(job, new Path("/test/mapReduce/source/statistics.log.2016-03-02"));
    133. FileOutputFormat.setOutputPath(job, new Path(outPath));
    134. System.exit(job.waitForCompletion(true)?0:1);// 是否正常退出
    135. }
    136. }





    God has given me a gift. Only one. I am the most complete fighter in the world. My whole life, I have trained. I must prove I am worthy of someting. rocky_24
  • 相关阅读:
    指出在 spring aop 中 concern 和 cross-cutting concern 的不同之处?
    什么是 spring bean?
    Java 中,Serializable 与 Externalizable 的区别?
    spring DAO 有什么用?
    spring 支持集中 bean scope?
    Spring 应用程序有哪些不同组件?
    什么是切点JoinPoint?
    @Required 注解有什么用?
    用什么命令对一个文件的内容进行统计?(行号、单词数、 字节数) ?
    区分构造函数注入和 setter 注入?
  • 原文地址:https://www.cnblogs.com/rocky24/p/b8b32977f3f3c4eec56ab7a1763b7b6f.html
Copyright © 2011-2022 走看看