1、wordcount打包在集群上运行
点击idea右侧的mavenproject
成功之后:
然后重命名为1.jar上传到我们的linux集群中
之后在linux中运行刚刚上传的jar文件
之后就会成功运行了。
2、hadoop序列化
首先神魔是序列化:
序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。
反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。
为神魔要进行序列化:
一般来说, “活的" 对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络.上的另外-台计算机。 然而序列化可以存储“活的"对象,可以将“活的”对象发送到远程计算机。
为神魔不用java的序列化:
Java的序列化是一个重量级序列化框架 (Serializable) ,一个对象被序列化后,会附带很多额外的信息(各种校验信息,Header, 继承体系等),不便于 在网络中高效传输。所以,Hadoop自己开发了 -套序列化机制(Writable) 。
Hadoop序列化的特点:
(1)紧凑:高效使用存储空间。
(2)快速:读写数据的额外开销小。
(3)可扩展:随着通信协议的升级而可升级.
(4)互操作:支持多语言的交互
本次代码的目标是:
统计每一个手机号耗费的总上行流量、下行流量、总流量
原始的数据是这样的:
代码如下:
FlowBean.java
package flow; import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class FlowBean implements Writable { private long upFlow; private long downFlow; private long sumFlow; //准备一个空参构造器 public FlowBean() {} public void set(long upFlow,long downFlow) { this.downFlow=downFlow; this.upFlow=upFlow; this.sumFlow=upFlow+downFlow; } @Override public String toString() { return upFlow+" "+downFlow+" "+sumFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } //序列化方法 //提供数据的出口 public void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(upFlow); dataOutput.writeLong(downFlow); dataOutput.writeLong(sumFlow); } //反序列化方法 //框架提供的数据来源 public void readFields(DataInput dataInput) throws IOException { upFlow=dataInput.readLong(); downFlow=dataInput.readLong(); sumFlow=dataInput.readLong(); } //这两个方法里面的内容顺序要一样uds, }
FlowMapper.java
package flow; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class FlowMapper extends Mapper<LongWritable,Text,Text,FlowBean> { private Text phone=new Text(); private FlowBean flow=new FlowBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] files=value.toString().split(" "); phone.set(files[1]); Long upFlow=Long.parseLong(files[files.length-3]); Long downFlow=Long.parseLong(files[files.length-2]); flow.set(upFlow,downFlow); context.write(phone,flow); } }
FlowReduce.java
package flow; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class FlowReducer extends Reducer<Text,FlowBean,Text,FlowBean>{ private FlowBean sumFlow=new FlowBean(); @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { long sumUpFlow=0; long sumDownFLow=0; for(FlowBean value:values) { sumUpFlow+=value.getUpFlow(); sumDownFLow+=value.getDownFlow(); } sumFlow.set(sumUpFlow,sumDownFLow); context.write(key,sumFlow); } }
FlowDriver.java
package flow; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.Job; import java.io.IOException; public class FlowDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //1、获取job实例 Job job=Job.getInstance(new Configuration()); //2、设置类路径 job.setJarByClass(FlowDriver.class); //3、设置Mapper和Reducer job.setMapperClass(FlowMapper.class); job.setReducerClass(FlowReducer.class); //4、设置输入输入输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //5、设置输入输出路径 FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); //6、进行提交 boolean b=job.waitForCompletion(true); System.exit(b ? 0:1); } }
运行成功代表是:
运行之后的数据:
3、MapReduce框架原理(并行度决定机制 )
mapreduce的数据流。