zoukankan      html  css  js  c++  java
  • hadoop编程:分析CSDN注冊邮箱分布情况

    版权声明:本文为博主原创文章,未经博主同意不得转载。

    https://blog.csdn.net/jdh99/article/details/37565825

    hadoop编程:分析CSDN注冊邮箱分布情况


    本文博客链接:http://blog.csdn.net/jdh99,作者:jdh,转载请注明.


    环境:

    主机:Ubuntu10.04

    hadoop版本号:1.2.1

    开发工具:eclipse4.4.0


    说明:

    要求:原始数据共6428632条。分析不同邮箱的注冊情况,并按使用人数从大到小排序。

    分析:hadoop自带一个排序,是按key值来进行排序的。要按值(value)进行排序,须要二次排序。

    步骤:

    1.job1:统计不同注冊邮箱的使用人数,用默认的key值排序,保存在HDFS系统中

    2.job2:对job1的输出进行二次排序,按值从大到小排序


    结果输出:

    使用人数在1W以上的邮箱共同拥有24个:

    qq.com    1976196
    163.com    1766927
    126.com    807895
    sina.com    351596
    yahoo.com.cn    205491
    hotmail.com    202948
    gmail.com    186843
    sohu.com    104736
    yahoo.cn    87048
    tom.com    72365
    yeah.net    53295
    21cn.com    50710
    vip.qq.com    35119
    139.com    29207
    263.net    24779
    sina.com.cn    19156
    live.cn    18920
    sina.cn    18601
    yahoo.com    18454
    foxmail.com    16432
    163.net    15176
    msn.com    14211
    eyou.com    13372
    yahoo.com.tw    10810


    源码:


    JOB1:统计不同注冊邮箱的人数

    CsdnData.java

    package com.bazhangkeji.hadoop;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    public class CsdnData 
    {
    	public static void main(String[] args) throws Exception 
    	{
    		Configuration conf = new Configuration();
    		String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    		if (otherArgs.length != 2) 
    		{
    			System.err.println("Usage: csdndata <in> <out>");
    			System.exit(2);
    		}
    		Job job = new Job(conf, "csdndata");
    		
    		job.setJarByClass(CsdnData.class);
    		job.setMapperClass(MapData.class);
    		
    		job.setReducerClass(ReducerData.class); 
    		job.setOutputKeyClass(Text.class);
    		
    		job.setOutputValueClass(IntWritable.class);
    		FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    		FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    		System.exit(job.waitForCompletion(true) ? 0 : 1);
      	}
    }
    


    MapData.java

    package com.bazhangkeji.hadoop;
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Mapper.Context;
    
    public class MapData extends Mapper<Object, Text, Text, IntWritable>
    {
    	IntWritable one = new IntWritable(1);
      	Text word = new Text();
    
      	public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
    	{
      		StringBuffer str_in = new StringBuffer();
    		StringBuffer str_out = new StringBuffer();
    		int index = 0;
    		
    		//初始化字符串
    		str_in.setLength(0);
    		str_out.setLength(0);
    		str_in.append(value.toString());
    		
    		//获得邮箱的起始位置
    		index = str_in.toString().lastIndexOf('@');
    		if (index != -1)
    		{
    			word.set(str_in.toString().substring(index + 1).trim().toLowerCase());
      			context.write(word, one);
    		}
      	}
    }
    


    ReducerData.java

    package com.bazhangkeji.hadoop;
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.Reducer.Context;
    
    public class ReducerData extends Reducer<Text,IntWritable,Text,IntWritable> 
    {
    	IntWritable result = new IntWritable();
    
      	public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException 
    	{
        	int sum = 0;
        	for (IntWritable val : values) 
    		{
          		sum += val.get();
        	}
        	result.set(sum);
        	context.write(key, result);
      	}
    }
    


    JOB2:对job1的输出进行二次排序。按值从大到小排序

    SortSecond.java

    package com.bazhangkeji.hadoop2;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    public class SortSecond 
    {
    	public static void main(String[] args) throws Exception 
    	{
    		Configuration conf = new Configuration();
    		String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    		if (otherArgs.length != 2) 
    		{
    			System.err.println("Usage: csdndata <in> <out>");
    			System.exit(2);
    		}
    		Job job = new Job(conf, "sortsecond");
    		job.setJarByClass(SortSecond.class);
    		
    		job.setMapperClass(MapSecond.class);
    		job.setReducerClass(ReduceSecond.class); 
    		
    		job.setSortComparatorClass(SortMy.class); //设置自己定义二次排序策略
    		
    		job.setOutputKeyClass(KeyMy.class);
    		job.setOutputValueClass(IntWritable.class);
    		
    		FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    		FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    		System.exit(job.waitForCompletion(true) ?

    0 : 1); } }


    MapSecond.java

    package com.bazhangkeji.hadoop2;
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Mapper.Context;
    
    public class MapSecond extends Mapper<LongWritable, Text, KeyMy, IntWritable>
    {
    	IntWritable one = new IntWritable(1);
      	Text word = new Text();
      	KeyMy keymy = new KeyMy();
    
      	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
    	{
      		String str_in = value.toString();
      		int index = 0;
      		
      		index = str_in.indexOf('	');
      		if (value.toString().length() > 3 && index != -1)
      		{
    	  		String str1 = str_in.substring(0, index);
    	  		String str2 = str_in.substring(index + 1);
    	  		
    	  		if (str1.length() != 0 && str2.length() != 0)
    	  		{
    	  			one.set(Integer.parseInt(str2));
    	  			word.set(str1);
    				keymy.setFirstKey(word);
    				keymy.setSecondKey(one);
    	  			context.write(keymy, one);
    	  		}
      		}
      	}
    }
    


    ReduceSecond.java

    package com.bazhangkeji.hadoop2;
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.Reducer.Context;
    
    public class ReduceSecond extends Reducer<KeyMy,IntWritable,Text,IntWritable> 
    {
    	IntWritable result = new IntWritable();
    
      	public void reduce(KeyMy key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException 
    	{
        	context.write(key.getFirstKey(), key.getSecondKey());
      	}
    }
    


    KeyMy.java

    package com.bazhangkeji.hadoop2;
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.WritableComparable;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    /**
     * 自己定义组合键
     */
    public class KeyMy implements WritableComparable<KeyMy>{
        private static final Logger logger = LoggerFactory.getLogger(KeyMy.class);
        private Text firstKey;
        private IntWritable secondKey;
        public KeyMy() {
            this.firstKey = new Text();
            this.secondKey = new IntWritable();
        }
        public Text getFirstKey() {
            return this.firstKey;
        }
        public void setFirstKey(Text firstKey) {
            this.firstKey = firstKey;
        }
        public IntWritable getSecondKey() {
            return this.secondKey;
        }
        public void setSecondKey(IntWritable secondKey) {
            this.secondKey = secondKey;
        }
        @Override
        public void readFields(DataInput dateInput) throws IOException {
            // TODO Auto-generated method stub
            this.firstKey.readFields(dateInput);
            this.secondKey.readFields(dateInput);
        }
        @Override
        public void write(DataOutput outPut) throws IOException {
            this.firstKey.write(outPut);
            this.secondKey.write(outPut);
        }
        /**
         * 自己定义比較策略
         * 注意:该比較策略用于 mapreduce的第一次默认排序,也就是发生在map阶段的sort小阶段,
         * 发生地点为环形缓冲区(能够通过io.sort.mb进行大小调整)
         */
        @Override
        public int compareTo(KeyMy KeyMy) {
            logger.info("-------KeyMy flag-------");
            return this.firstKey.compareTo(KeyMy.getFirstKey());
        }
    }
    


    SortMy.java

    package com.bazhangkeji.hadoop2;
    
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    /**
     * 自己定义二次排序策略
     */
    public class SortMy extends WritableComparator {
        private static final Logger logger = LoggerFactory.getLogger(SortMy.class);
        public SortMy() {
            super(KeyMy.class,true);
        }
        @Override
        public int compare(WritableComparable KeyMyOne,
                WritableComparable KeyMyOther) 
        {
            logger.info("---------enter SortMy flag---------");
                                                                                                                                                                                                 
            KeyMy c1 = (KeyMy) KeyMyOne;
            KeyMy c2 = (KeyMy) KeyMyOther;
            
            return c2.getSecondKey().get()-c1.getSecondKey().get();//0,负数,正数
        }
    }
    

    參考资料:

    1.《hadoop权威指南》

    2.  http://zengzhaozheng.blog.51cto.com/8219051/1379271



查看全文
  • 相关阅读:
    Linux Shell编程入门
    vim 文件在linux不换行,只显示^M解决办法
    服务器高性能程序 磁盘I/O篇
    车牌识别_转自别人的博客
    ubuntu网络简单设置
    C++设计模式(转载)
    结构算法之道
    C++设计模式工厂方法
    二叉树的深度优先遍历、广度优先遍历和非递归遍历
    iptables
  • 原文地址:https://www.cnblogs.com/ldxsuanfa/p/10682106.html
  • Copyright © 2011-2022 走看看