今天突然发生问题一开始只有xshell连接hadoop102连接失败 103104是正常的 我就点了几次那个xshell的打开 然后vm就卡了 开始未响应 卡了很久 我只能关机 再开虚拟机还是连不上然后一下子又未响应 又关机 几次未响应之后三台都连不上了
经过好几个小时的调试,调试各种配置, 我的所有配置都没有问题,,是虚拟机自己的卡顿,过了一段时间后就恢复了
后面学习了序列化,跟着学习了一个简单的案例
** map 和reduce在两个不同服务器上,传递bean数据需要进行序列化,将内存中的数据存储到硬盘,从另一个服务器上将硬盘数据反序列化到内存
要自己写一个类
FlowBean类
* 1 定义类实现writable接口
* 2 重写序列化和反序列化分方法
* 3 重写空参构造
* 4 toString方法
*/
public class FlowBean implements Writable {
private long upFlow;//上行流量
private long downFlow;//下行流量
private long sumFlow;//总流量
//空参构造
public FlowBean() {
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
public void setSumFlow() {
this.sumFlow = this.downFlow+this.upFlow;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
@Override
public void readFields(DataInput in) throws IOException {
this.upFlow=in.readLong();
this.downFlow=in.readLong();
this.sumFlow=in.readLong();
}
@Override
public String toString() {
return upFlow +" " + downFlow + " " + sumFlow ;
}
}
FlowMapper类
public class FlowMapper extends Mapper<LongWritable, Text,Text,FlowBean> {
private Text outK=new Text();
private FlowBean outv=new FlowBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1 获取一行
String line=value.toString();
//2 切割
String[] split = line.split(" ");
//3 抓取需要内容
String phone=split[1];
String up=split[split.length-3];
String down=split[split.length-2];
//4 封装
outK.set(phone);
outv.setUpFlow(Long.parseLong(up));
outv.setDownFlow(Long.parseLong(down));
outv.setSumFlow();
//写出
context.write(outK,outv);
}
}
FlowReducer类
public class FlowReducer extends Reducer<Text,FlowBean,Text,FlowBean> {
private FlowBean outv=new FlowBean();
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
//1 遍历集合累加key相同量
long allup=0;
long alldown=0;
for (FlowBean value : values) {
allup+=value.getUpFlow();
alldown+=value.getDownFlow();
}
//2 封装
outv.setUpFlow(allup);
outv.setDownFlow(alldown);
outv.setSumFlow();
//3 写出
context.write(key,outv);
}
}
FlowDriver类
public class FlowDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1 获取job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2 设置jar包
job.setJarByClass(FlowDriver.class);
//3 关联mapper和reducer
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
//4 设置mapper 输出的key和value的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
//5 设置最终数据输出的key和value的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//6 设置数据的输入输出路径
FileInputFormat.setInputPaths(job,new Path("E:\hadoop\Flow"));
FileOutputFormat.setOutputPath(job,new Path("E:\hadoop\Flowout1"));
//7 提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0:1);
}
}
学习时间:12:25到15:50