zoukankan      html  css  js  c++  java
  • Hadoop MapReduce编程 API入门系列之分区和合并(十四)

      不多说,直接上代码。

     

    代码

      1 package zhouls.bigdata.myMapReduce.Star;
      2 
      3 
      4 import java.io.IOException;
      5 import org.apache.hadoop.conf.Configuration;
      6 import org.apache.hadoop.conf.Configured;
      7 import org.apache.hadoop.fs.FileSystem;
      8 import org.apache.hadoop.fs.Path;
      9 import org.apache.hadoop.io.Text;
     10 import org.apache.hadoop.mapreduce.Job;
     11 import org.apache.hadoop.mapreduce.Mapper;
     12 import org.apache.hadoop.mapreduce.Partitioner;
     13 import org.apache.hadoop.mapreduce.Reducer;
     14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     16 import org.apache.hadoop.util.Tool;
     17 import org.apache.hadoop.util.ToolRunner;
     18 /**
     19  * 
     20  * @function 统计分别统计出男女明星最大搜索指数
     21  * @author 小讲
     22  */
     23  
     24  /*
     25 姓名    性别    搜索指数
     26 李易峰    male    32670
     27 朴信惠    female    13309
     28 林心如    female    5242
     29 黄海波    male    5505
     30 成龙    male    7757
     31 刘亦菲    female    14830
     32 angelababy    female    55083
     33 王宝强    male    9472
     34 郑爽    female    9279
     35 周杰伦    male    42020
     36 莫小棋    female    13978
     37 朱一龙    male    10524
     38 宋智孝    female    12494
     39 吴京    male    6684
     40 赵丽颖    female    24174
     41 尹恩惠    female    5985
     42 李金铭    female    5925
     43 关之琳    female    7668
     44 邓超    male    11532
     45 钟汉良    male    8289
     46 周润发    male    4808
     47 甄子丹    male    5479
     48 林妙可    female    5306
     49 柳岩    female    8221
     50 蔡琳    female    7320
     51 张佳宁    female    6628
     52 裴涩琪    female    5658
     53 李晨    male    9559
     54 周星驰    male    11483
     55 杨紫    female    11094
     56 全智贤    female    5336
     57 张柏芝    female    9337
     58 孙俪    female    7295
     59 鲍蕾    female    5375
     60 杨幂    female    20238
     61 刘德华    male    19786
     62 柯震东    male    6398
     63 张国荣    male    5013
     64 王阳    male    5169
     65 李小龙    male    6859
     66 林志颖    male    4512
     67 林正英    male    5832
     68 吴秀波    male    5668
     69 陈伟霆    male    12817
     70 陈奕迅    male    10472
     71 赵又廷    male    5190
     72 张馨予    female    35062
     73 陈晓    male    17901
     74 赵韩樱子    female    7077
     75 乔振宇    male    8877
     76 宋慧乔    female    5708
     77 韩艺瑟    female    5426
     78 张翰    male    7012
     79 谢霆锋    male    6654
     80 刘晓庆    female    5553
     81 陈翔    male    7999
     82 陈学冬    male    8829
     83 秋瓷炫    female    6504
     84 王祖蓝    male    6662
     85 吴亦凡    male    16472
     86 陈妍希    female    32590
     87 倪妮    female    9278
     88 高梓淇    male    7101
     89 赵奕欢    female    7197
     90 赵本山    male    12655
     91 高圆圆    female    13688
     92 陈赫    male    6820
     93 鹿晗    male    32492
     94 贾玲    female    5304
     95 宋佳    female    6202
     96 郭碧婷    female    5295
     97 唐嫣    female    12055
     98 杨蓉    female    10512
     99 李钟硕    male    26278
    100 郑秀晶    female    10479
    101 熊黛林    female    26732
    102 金秀贤    male    11370
    103 古天乐    male    4954
    104 黄晓明    male    10964
    105 李敏镐    male    10512
    106 王丽坤    female    5501
    107 谢依霖    female    7000
    108 陈冠希    male    9135
    109 范冰冰    female    13734
    110 姚笛    female    6953
    111 彭于晏    male    14136
    112 张学友    male    4578
    113 谢娜    female    6886
    114 胡歌    male    8015
    115 古力娜扎    female    8858
    116 黄渤    male    7825
    117 周韦彤    female    7677
    118 刘诗诗    female    16548
    119 郭德纲    male    10307
    120 郑恺    male    21145
    121 赵薇    female    5339
    122 李连杰    male    4621
    123 宋茜    female    11164
    124 任重    male    8383
    125 李若彤    female    9968
    126 
    127 
    128 得到:
    129 angelababy    female    55083
    130 周杰伦    male    42020
    131 */
    132 public class Star extends Configured implements Tool{
    133     /**
    134      * @function Mapper 解析明星数据
    135      * @input key=偏移量  value=明星数据
    136      * @output key=gender value=name+hotIndex
    137      */
    138     public static class ActorMapper extends Mapper<Object,Text,Text,Text>{
    139                     //在这个例子里,第一个参数Object是Hadoop根据默认值生成的,一般是文件块里的一行文字的行偏移数,这些偏移数不重要,在处理时候一般用不上
    140         public void map(Object key,Text value,Context context) throws IOException,InterruptedException{
    141         //拿:周杰伦    male    42020
    142             //value=name+gender+hotIndex
    143             String[] tokens = value.toString().split("	");//使用分隔符	,将数据解析为数组 tokens
    144             String gender = tokens[1].trim();//性别,trim()是去除两边空格的方法
    145                         //tokens[0]        tokens[1]        tokens[2]        
    146                         //周杰伦        male            42020
    147             String nameHotIndex = tokens[0] + "	" + tokens[2];//名称和关注指数
    148             //输出key=gender value=name+hotIndex
    149             context.write(new Text(gender), new Text(nameHotIndex));//写入gender是k2,nameHotIndex是v2
    150 //            context.write(gender,nameHotIndex);等价        
    151             //将gender和nameHotIndex写入到context中
    152         }
    153     }
    154 
    155     
    156     
    157     /**
    158      * @function Partitioner 根据sex选择分区
    159      */
    160     public static class ActorPartitioner extends Partitioner<Text, Text>{     
    161         @Override
    162         public int getPartition(Text key, Text value, int numReduceTasks){
    163             String sex = key.toString();//按性别分区
    164             
    165             // 默认指定分区 0
    166             if(numReduceTasks==0)
    167                 return 0;
    168             
    169             //性别为male 选择分区0
    170             if(sex.equals("male"))             
    171                 return 0;
    172             //性别为female 选择分区1
    173             if(sex.equals("female"))
    174                 return 1 % numReduceTasks;
    175             //其他性别 选择分区2
    176             else
    177                 return 2 % numReduceTasks;
    178            
    179         }
    180     }
    181 
    182     
    183     
    184     /**
    185      * @function 定义Combiner 合并 Mapper 输出结果
    186      */
    187     public static class ActorCombiner extends Reducer<Text, Text, Text, Text>{
    188         private Text text = new Text();
    189         @Override
    190         public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException{
    191             int maxHotIndex = Integer.MIN_VALUE;
    192             int hotIndex = 0;
    193             String name="";
    194             for (Text val : values){//星型for循环,即把values的值传给Text val
    195                 String[] valTokens = val.toString().split("\t");
    196                 hotIndex = Integer.parseInt(valTokens[1]);
    197                 if(hotIndex>maxHotIndex){
    198                     name = valTokens[0];
    199                     maxHotIndex = hotIndex;
    200                 }
    201             }
    202             text.set(name+"	"+maxHotIndex);
    203             context.write(key, text);
    204         }
    205     }
    206     
    207     
    208     
    209     /**
    210      * @function Reducer 统计男、女明星最高搜索指数
    211      * @input key=gender  value=name+hotIndex
    212      * @output key=name value=gender+hotIndex(max)
    213      */
    214     public static class ActorReducer extends Reducer<Text,Text,Text,Text>{
    215         @Override
    216         public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException{
    217             int maxHotIndex = Integer.MIN_VALUE;
    218 
    219             String name = " ";
    220             int hotIndex = 0;
    221             // 根据key,迭代 values 集合,求出最高搜索指数
    222             for (Text val : values){//星型for循环,即把values的值传给Text val
    223                 String[] valTokens = val.toString().split("\t");
    224                 hotIndex = Integer.parseInt(valTokens[1]);
    225                 if (hotIndex > maxHotIndex){
    226                     name = valTokens[0];
    227                     maxHotIndex = hotIndex;
    228                 }
    229             }
    230             context.write(new Text(name), new Text(key + "	"+ maxHotIndex));//写入name是k3,key + "	"+ maxHotIndex是v3
    231 //            context.write(name,key + "	"+ maxHotIndex);//等价            
    232         }
    233     }
    234 
    235     /**
    236      * @function 任务驱动方法
    237      * @param args
    238      * @return
    239      * @throws Exception
    240      */
    241 
    242     public int run(String[] args) throws Exception{
    243         // TODO Auto-generated method stub
    244         
    245         Configuration conf = new Configuration();//读取配置文件,比如core-site.xml等等
    246         Path mypath = new Path(args[1]);//Path对象mypath
    247         FileSystem hdfs = mypath.getFileSystem(conf);//FileSystem对象hdfs
    248         if (hdfs.isDirectory(mypath)){    
    249             hdfs.delete(mypath, true);
    250         }
    251 
    252         Job job = new Job(conf, "star");//新建一个任务
    253         job.setJarByClass(Star.class);//主类
    254         
    255         job.setNumReduceTasks(2);//reduce的个数设置为2
    256         job.setPartitionerClass(ActorPartitioner.class);//设置Partitioner类
    257         
    258         job.setMapperClass(ActorMapper.class);//Mapper
    259         job.setMapOutputKeyClass(Text.class);//map 输出key类型
    260         job.setMapOutputValueClass(Text.class);//map 输出value类型
    261                 
    262         job.setCombinerClass(ActorCombiner.class);//设置Combiner类
    263         
    264         job.setReducerClass(ActorReducer.class);//Reducer
    265         job.setOutputKeyClass(Text.class);//输出结果 key类型
    266         job.setOutputValueClass(Text.class);//输出结果 value类型
    267         
    268         FileInputFormat.addInputPath(job, new Path(args[0]));// 输入路径
    269         FileOutputFormat.setOutputPath(job, new Path(args[1]));// 输出路径
    270         job.waitForCompletion(true);//提交任务
    271         return 0;
    272     }
    273     
    274     
    275     /**
    276      * @function main 方法
    277      * @param args
    278      * @throws Exception
    279      */
    280     public static void main(String[] args) throws Exception{
    281 //        String[] args0 = { "hdfs://HadoopMaster:9000/star/star.txt",
    282 //                            "hdfs://HadoopMaster:9000/out/star/" };
    283         String[] args0 = { "./data/star/star.txt",
    284                             "./out/star" };
    285         
    286         int ec = ToolRunner.run(new Configuration(), new Star(), args0);
    287         System.exit(ec);
    288     }
    289 }
  • 相关阅读:
    lvs+keepalived集群架构服务
    GlusterFS分布式存储系统
    Zabbix监控平台(三)生产环境案例
    Zabbix监控平台(二)深入了解
    Zabbix监控平台(一)搭建部署与概述
    Memcached数据库缓存
    Mariadb 基于Mycat实现读写分离
    基于mysqld_multi实现MySQL多实例配置
    tomcat-APR配置及三种工作模式简介
    Session服务器之Memcached与Redis
  • 原文地址:https://www.cnblogs.com/zlslch/p/6165047.html
Copyright © 2011-2022 走看看