zoukankan      html  css  js  c++  java
  • MapReduce入门2-流量监控

    3、流量监控汇总(使用LongWritable实现)

    hdfs文件路径:/tmp/flow.txt
    查看文件内容:
    13770759991	50	100	25	400
    13770759991	800	600	500	100
    13770759992	400	300	250	1400
    13770759992	800	1200	600	900

    字符串含义:
    号码	上行	下行	上传	下载
    phoneNum	uppackBytes	downpackBytes	uploadBytes	downloadBytes
     

    代码:

    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class FlowTest {
    
    	public static void main(String[] args) {
    		// TODO Auto-generated method stub
    		Path fromPath = new Path(args[0]);
    		Path toPath = new Path(args[1]);
    		
    		try {
    			Configuration conf = new Configuration();
    			Job job = Job.getInstance();; 
    			job.setJarByClass(FlowTest.class);
    			
    			FileInputFormat.addInputPath(job, fromPath);
    			FileOutputFormat.setOutputPath(job, toPath);
    			
    			job.setMapperClass(FlowMapper.class);
    			job.setReducerClass(FlowReducer.class);
    			
    			job.setMapOutputKeyClass(Text.class);
    			job.setMapOutputValueClass(Text.class);
    			
    			job.setOutputKeyClass(Text.class);
    			job.setOutputValueClass(Text.class);
    			
    			try {
    				job.waitForCompletion(true);
    			} catch (ClassNotFoundException e) {
    				// TODO Auto-generated catch block
    				e.printStackTrace();
    			} catch (InterruptedException e) {
    				// TODO Auto-generated catch block
    				e.printStackTrace();
    			}
    			
    		} catch (IOException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    		
    
    	}
    
    }
    
    /*
    号码	上行	下行	上传	下载
    phoneNum	uppackBytes	downpackBytes	uploadBytes	downloadBytes
    13770759991	50L	100L	25L	400L
    13770759991	800L	600L	500L	100L
    13770759992	400L	300L	250L	1400L
    13770759992	800L	1200L	600L	900L
    */
    class FlowMapper extends Mapper<LongWritable,Text,Text,Text>{
    
    	@Override
    	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
    			throws IOException, InterruptedException {
    		// TODO Auto-generated method stub
    		String[] line = value.toString().split("\W+"); 
    		String phoneNum = line[0];
    		long uppackBytes = Long.parseLong(line[1]);
    		long downpackBytes = Long.parseLong(line[2]);
    		long uploadBytes = Long.parseLong(line[3]);
    		long downloadBytes = Long.parseLong(line[4]);
    		context.write(new Text(phoneNum), new Text(uppackBytes+"-"+downpackBytes+"-"+uploadBytes+"-"+downloadBytes));
    	}
    
    	
    }
    
    class FlowReducer extends Reducer<Text,Text,Text,Text>{
    
    	 @Override
    	protected void reduce(Text phoneNum, Iterable<Text> text, Reducer<Text, Text, Text, Text>.Context context)
    			throws IOException, InterruptedException {
    		// TODO Auto-generated method stub
    
    		long sumUppack = 0L;
    		long sumDownpack = 0L;
    		long sumUpload = 0L;
    		long sumDownload = 0L;
    		for(Text t : text){
    			String[] line  = t.toString().split("-");
    			
    			sumUppack += Long.parseLong(line[0].toString());
    			sumDownpack += Long.parseLong(line[1].toString());
    			sumUpload += Long.parseLong(line[2].toString());
    			sumDownload += Long.parseLong(line[3].toString());
    			
    		}
    		
    		context.write(phoneNum,new Text(sumUppack+"-"+sumDownpack+"-"+sumUpload+"-"+sumDownload) );
    	}
    	
    }

    输出:

    导出成flow.jar并上传至服务器的/opt目录
    执行:
    hadoop jar flow.jar "FlowTest" "/tmp/flow.txt" "/tmp/flow/out"
    
    再执行:
    hadoop fs -ls /tmp/flow/out/*  查看输出的文件:
    
    

    4、流量监控汇总(使用自定义的writable类NetflowWritable实现)

    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    
    public class NetflowTest {
    
    	public static void main(String[] args) {
    		// TODO Auto-generated method stub
    		
    		Path fromPath = new Path(args[0]);
    		Path toPath = new Path(args[1]);
    		
    		
    		try {
    			Configuration conf = new Configuration();
    			Job job = Job.getInstance();
    			job.setJarByClass(NetflowTest.class);
    			
    			
    			FileInputFormat.addInputPath(job, fromPath);
    			FileOutputFormat.setOutputPath(job, toPath);
    			
    			job.setMapperClass(NetflowMapper.class);
    			job.setReducerClass(NetflowReducer.class);
    			
    			job.setMapOutputKeyClass(Text.class);
    			job.setMapOutputValueClass(NetflowWritable.class);
    			
    			job.setOutputKeyClass(NullWritable.class);
    			job.setOutputValueClass(NetflowWritable.class);
    			
    			try {
    				job.waitForCompletion(true);
    			} catch (ClassNotFoundException e) {
    				// TODO Auto-generated catch block
    				e.printStackTrace();
    			} catch (InterruptedException e) {
    				// TODO Auto-generated catch block
    				e.printStackTrace();
    			}
    			
    		} catch (IOException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    
    	}
    
    }
    
    
    class NetflowWritable implements Writable{
    
    	private long uppackBytes;
    	private long downpackBytes;
    	private long uploadBytes;
    	private long downloadBytes;
    	
    	//创建一个无参的构造方法,不加的话会执行报错
    	public NetflowWritable(){}
    	
    	public NetflowWritable(long uppackBytes,long downpackBytes,long uploadBytes,long downloadBytes) {
    		//this.phoneNum=phoneNum;
    		this.uppackBytes = uppackBytes;
    		this.downpackBytes = downpackBytes;	
    		this.uploadBytes = uploadBytes;
    		this.downloadBytes = downloadBytes;
    	}
    	
    
    	public long getUppackBytes() {
    		return uppackBytes;
    	}
    
    	public long getDownpackBytes() {
    		return downpackBytes;
    	}
    
    	public long getUploadBytes() {
    		return uploadBytes;
    	}
    
    	public long getDownloadBytes() {
    		return downloadBytes;
    	}
        
    	public void set( long uppackBytes,long downpackBytes,long uploadBytes,long downloadBytes) {
    		this.uppackBytes = uppackBytes;
    		this.downpackBytes = downpackBytes;	
    		this.uploadBytes = uploadBytes;
    		this.downloadBytes = downloadBytes;
    	}
    	
    	@Override
    	public void readFields(DataInput in) throws IOException {
    		// TODO Auto-generated method stub
    		
    		this.uppackBytes = in.readLong();
    		this.downpackBytes = in.readLong();
    		this.uploadBytes = in.readLong();
    		this.downloadBytes = in.readLong();
    	}
    
    	@Override
    	public void write(DataOutput out) throws IOException {
    		// TODO Auto-generated method stub
    		
    		out.writeLong(uppackBytes);
    		out.writeLong(downpackBytes);
    		out.writeLong(uploadBytes);
    		out.writeLong(downloadBytes);
    	
    		
    	}
    	
    	
    	@Override
    	//重写toString方法
    	public String toString() {
    		// TODO Auto-generated method stub
    		return "NetflowWritable [uppackBytes="+uppackBytes+",downpackBytes="+downpackBytes+",uploadBytes="+uploadBytes+",downloadBytes="+downloadBytes+"]" ;
    	}
    }
    
    
    class NetflowMapper extends Mapper<LongWritable,Text,Text,NetflowWritable>{
    	private String phoneNum;
    	private long uppackBytes;
    	private long downpackBytes;
    	private long uploadBytes;
    	private long downloadBytes;
    	
    	
    	NetflowWritable nf = new NetflowWritable();
    	//Text text = new Text();
    	
    	@Override
    	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NetflowWritable>.Context context)
    			throws IOException, InterruptedException {
    		// TODO Auto-generated method stub
    		String[] line = value.toString().split("\t");
    		phoneNum  = line[0];
    		uppackBytes = Long.parseLong(line[1]);
    		downpackBytes = Long.parseLong(line[2]);
    		uploadBytes = Long.parseLong(line[3]);
    		downloadBytes = Long.parseLong(line[4]);
    		nf.set( uppackBytes, downpackBytes, uploadBytes, downloadBytes);
    		context.write(new Text(phoneNum), nf);
    		
    	}
    	
    }
    
    
    
    class NetflowReducer extends Reducer<Text,NetflowWritable,Text,NetflowWritable>{
    	private NetflowWritable nf;
    
    	@Override
    	protected void reduce(Text arg0, Iterable<NetflowWritable> arg1,
    			Reducer<Text, NetflowWritable, Text, NetflowWritable>.Context context)
    			throws IOException, InterruptedException {
    		// TODO Auto-generated method stub
    		long uppackBytes = 0L;
    		long downpackBytes = 0L;
    		long uploadBytes = 0L;
    		long downloadBytes = 0L;
    		
    		for(NetflowWritable nw : arg1){
    			uppackBytes += nw.getUppackBytes();
    			downpackBytes += nw.getDownpackBytes();
    			uploadBytes  += nw.getUploadBytes();
    			downloadBytes += nw.getDownloadBytes();
    		}
    		
    		nf = new NetflowWritable(uppackBytes,downpackBytes,uploadBytes,downloadBytes);
    		context.write(arg0, nf);
    	}
    	
    }
    

      

    输出:

    导出成netflow.jar并上传至服务器的/opt目录
    执行:
    hadoop jar netflow.jar "NetflowTest" "/tmp/flow.txt" "/tmp/netflow/out"
    
    再执行:
    hadoop fs -ls /tmp/netflow/out/*  查看输出的文件:

  • 相关阅读:
    Git -- 分支管理简介
    Git -- 从远程库克隆
    Git -- 添加远程仓库
    C# sha256 加密算法
    如何将IOS版本的更新下载文件指向到自己的服务器
    如何让windows服务器IIS支持.apk/.ipa文件下载
    vistual studio 去除 git 源代码 绑定
    Redis 环境搭建与使用(C#)
    c#图片添加水印
    C#使用WSDL服务总结
  • 原文地址:https://www.cnblogs.com/cangos/p/6422144.html
Copyright © 2011-2022 走看看