zoukankan      html  css  js  c++  java
  • Hadoop基础-MapReduce的常用文件格式介绍

                  Hadoop基础-MapReduce的常用文件格式介绍  

                                        作者:尹正杰

    版权声明:原创作品,谢绝转载!否则将追究法律责任。

    一.MR文件格式-SequenceFile

    1>.生成SequenceFile文件(SequenceFileOutputFormat)

    The Apache Hadoop software library is a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models. It is designed to scale up from single servers to thousands of machines, each offering local computation and storage. Rather than rely on hardware to deliver high-availability, the library itself is designed to detect and handle failures at the application layer, so delivering a highly-available service on top of a cluster of computers, each of which may be prone to failures.
    word.txt 文件内容
     1 /*
     2 @author :yinzhengjie
     3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
     4 EMAIL:y1053419035@qq.com
     5 */
     6 package cn.org.yinzhengjie.sequencefile.output;
     7 
     8 import org.apache.hadoop.io.LongWritable;
     9 import org.apache.hadoop.io.Text;
    10 import org.apache.hadoop.mapreduce.Mapper;
    11 
    12 import java.io.IOException;
    13 
    14 public class SeqMapper extends Mapper<LongWritable, Text , LongWritable, Text> {
    15 
    16     @Override
    17     protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    18 
    19         context.write(key,value);
    20 
    21 
    22     }
    23 }
    SeqMapper.java 文件内容
     1 /*
     2 @author :yinzhengjie
     3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
     4 EMAIL:y1053419035@qq.com
     5 */
     6 package cn.org.yinzhengjie.sequencefile.output;
     7 
     8 import org.apache.hadoop.conf.Configuration;
     9 import org.apache.hadoop.fs.FileSystem;
    10 import org.apache.hadoop.fs.Path;
    11 import org.apache.hadoop.io.LongWritable;
    12 import org.apache.hadoop.io.SequenceFile;
    13 import org.apache.hadoop.io.Text;
    14 import org.apache.hadoop.mapreduce.Job;
    15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    17 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
    18 
    19 /**
    20  * 把wc.txt变为SequenceFile
    21  * k-偏移量-LongWritable
    22  * v-一行文本-Text
    23  */
    24 public class SeqApp {
    25 
    26     public static void main(String[] args) throws Exception {
    27 
    28         Configuration conf = new Configuration();
    29         conf.set("fs.defaultFS","file:///");
    30         FileSystem fs = FileSystem.get(conf);
    31         Job job = Job.getInstance(conf);
    32 
    33         job.setJobName("Seq-Out");
    34         job.setJarByClass(SeqApp.class);
    35 
    36         //设置输出格式,这里的输出格式要和咱们Mapper程序的格式要一致哟!
    37         job.setOutputKeyClass(LongWritable.class);
    38         job.setOutputValueClass(Text.class);
    39 
    40         job.setMapperClass(SeqMapper.class);
    41 
    42         FileInputFormat.addInputPath(job, new Path("D:\10.Java\IDE\yhinzhengjieData\MyHadoop\word.txt"));
    43 
    44         Path outPath = new Path("D:\10.Java\IDE\yhinzhengjieData\MyHadoop\seqout");
    45         if (fs.exists(outPath)){
    46             fs.delete(outPath);
    47         }
    48         FileOutputFormat.setOutputPath(job,outPath);
    49 
    50         //设置文件输出格式为SequenceFile
    51         job.setOutputFormatClass(SequenceFileOutputFormat.class);
    52 
    53         //设置SeqFile的压缩类型为块压缩
    54         SequenceFileOutputFormat.setOutputCompressionType(job,SequenceFile.CompressionType.BLOCK);
    55 
    56         //以上设置参数完毕后,我们通过下面这行代码就开始运行job
    57         job.waitForCompletion(true);
    58     }
    59 }

      运行以上代码之后,我们可以去输出目录通过hdfs命令查看生成的SequenceFile文件内容,具体操作如下:

    2>.对SequenceFile文件进行单词统计测试(SequenceFileInputFormat)

      我们就不用去可以找具体的SequenceFile啦,我们直接用上面生成的Sequence进行测试,具体代码如下:

     1 /*
     2 @author :yinzhengjie
     3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
     4 EMAIL:y1053419035@qq.com
     5 */
     6 package cn.org.yinzhengjie.sequencefile.input;
     7 
     8 import org.apache.hadoop.io.IntWritable;
     9 import org.apache.hadoop.io.LongWritable;
    10 import org.apache.hadoop.io.Text;
    11 import org.apache.hadoop.mapreduce.Mapper;
    12 
    13 import java.io.IOException;
    14 
    15 public class SeqMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    16 
    17     @Override
    18     protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    19 
    20         String line = value.toString();
    21         String[] arr = line.split(" ");
    22         for(String word: arr){
    23             context.write(new Text(word),new IntWritable(1));
    24 
    25         }
    26 
    27 
    28     }
    29 }
    SeqMapper.java 文件内容
     1 /*
     2 @author :yinzhengjie
     3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
     4 EMAIL:y1053419035@qq.com
     5 */
     6 package cn.org.yinzhengjie.sequencefile.input;
     7 
     8 import org.apache.hadoop.io.IntWritable;
     9 import org.apache.hadoop.io.Text;
    10 import org.apache.hadoop.mapreduce.Reducer;
    11 import java.io.IOException;
    12 
    13 public class SeqReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    14     protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    15         Integer sum = 0;
    16         for (IntWritable value : values) {
    17             sum += value.get();
    18         }
    19         context.write(key, new IntWritable(sum));
    20     }
    21 }
    SeqReducer.java 文件内容
     1 /*
     2 @author :yinzhengjie
     3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
     4 EMAIL:y1053419035@qq.com
     5 */
     6 package cn.org.yinzhengjie.sequencefile.input;
     7 
     8 
     9 import org.apache.hadoop.conf.Configuration;
    10 import org.apache.hadoop.fs.FileSystem;
    11 import org.apache.hadoop.fs.Path;
    12 import org.apache.hadoop.io.IntWritable;
    13 import org.apache.hadoop.io.Text;
    14 import org.apache.hadoop.mapreduce.Job;
    15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    16 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
    17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    18 
    19 public class SeqApp  {
    20     public static void main(String[] args) throws Exception {
    21         Configuration conf = new Configuration();
    22         conf.set("fs.defaultFS","file:///");
    23         FileSystem fs = FileSystem.get(conf);
    24         Job job = Job.getInstance(conf);
    25         job.setJobName("Seq-in");
    26         job.setJarByClass(SeqApp.class);
    27         job.setOutputKeyClass(Text.class);
    28         job.setOutputValueClass(IntWritable.class);
    29         job.setMapperClass(SeqMapper.class);
    30         job.setReducerClass(SeqReducer.class);
    31         //将我们生成的SequenceFile文件作为输入
    32         FileInputFormat.addInputPath(job, new Path("D:\10.Java\IDE\yhinzhengjieData\MyHadoop\seqout"));
    33         Path outPath = new Path("D:\10.Java\IDE\yhinzhengjieData\MyHadoop\out");
    34         if (fs.exists(outPath)){
    35             fs.delete(outPath);
    36         }
    37         FileOutputFormat.setOutputPath(job, outPath);
    38         //设置输入格式
    39         job.setInputFormatClass(SequenceFileInputFormat.class);
    40         //以上设置参数完毕后,我们通过下面这行代码就开始运行job
    41         job.waitForCompletion(true);
    42     }
    43 }

      运行以上代码之后,我们可以查看输出的单词统计情况,具体操作如下:

    二.MR文件格式-DB

    1>.创建数据库表信息

    create database yinzhengjie;
    
    use yinzhengjie;
    
    create table wordcount(id int,line varchar(100));
    
    insert into wordcount values(1,'hello my name is yinzhengjie');
    
    insert into wordcount values(2,'I am a good boy');
    
    create table wordcount2(word varchar(100),count int);

    2>.编写代码

     1 /*
     2 @author :yinzhengjie
     3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
     4 EMAIL:y1053419035@qq.com
     5 */
     6 package cn.org.yinzhengjie.dbformat;
     7 
     8 import org.apache.hadoop.io.Writable;
     9 import org.apache.hadoop.mapreduce.lib.db.DBWritable;
    10 
    11 import java.io.DataInput;
    12 import java.io.DataOutput;
    13 import java.io.IOException;
    14 import java.sql.PreparedStatement;
    15 import java.sql.ResultSet;
    16 import java.sql.SQLException;
    17 
    18 /**
    19  *  设置数据对应的格式,需要实现两个接口,即Writable, DBWritable。
    20  */
    21 public class MyDBWritable implements Writable, DBWritable {
    22 
    23     //注意 : 这里我们定义了2个私有属性,这两个属性分别对应的数据库中的字段,id和line
    24     private int id;
    25     private String line;
    26 
    27 
    28     //wrutable串行化
    29     public void write(DataOutput out) throws IOException {
    30         out.writeInt(id);
    31         out.writeUTF(line);
    32     }
    33 
    34     //writable反串行化,注意反串行化的顺序要和串行化的顺序保持一致
    35     public void readFields(DataInput in) throws IOException {
    36         id = in.readInt();
    37         line = in.readUTF();
    38 
    39     }
    40 
    41 
    42     //DB串行化,设置值的操作
    43     public void write(PreparedStatement st) throws SQLException {
    44         //指定表中的第一列为id列
    45         st.setInt(1, id);
    46         //指定表中的第二列为line列
    47         st.setString(2,line);
    48 
    49     }
    50 
    51     //DB反串行,赋值操作
    52     public void readFields(ResultSet rs) throws SQLException {
    53         //读取数据库的第一列,我们赋值给id
    54         id = rs.getInt(1);
    55         //读取数据库的第二列,我们赋值给line
    56         line = rs.getString(2);
    57     }
    58 
    59     public int getId() {
    60         return id;
    61     }
    62 
    63     public void setId(int id) {
    64         this.id = id;
    65     }
    66 
    67     public String getLine() {
    68         return line;
    69     }
    70 
    71     public void setLine(String line) {
    72         this.line = line;
    73     }
    74 }
    MyDBWritable.java 文件内容
     1 /*
     2 @author :yinzhengjie
     3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
     4 EMAIL:y1053419035@qq.com
     5 */
     6 package cn.org.yinzhengjie.dbformat;
     7 
     8 import org.apache.hadoop.io.Writable;
     9 import org.apache.hadoop.mapreduce.lib.db.DBWritable;
    10 
    11 import java.io.DataInput;
    12 import java.io.DataOutput;
    13 import java.io.IOException;
    14 import java.sql.PreparedStatement;
    15 import java.sql.ResultSet;
    16 import java.sql.SQLException;
    17 
    18 public class MyDBWritable2 implements Writable, DBWritable {
    19     //这两个属性分别对应的数据库中的字段,word和count分别对应的是输出表中的字段哟。
    20     private String word;
    21     private int count;
    22     //wrutable串行化
    23     public void write(DataOutput out) throws IOException {
    24         out.writeUTF(word);
    25         out.writeInt(count);
    26     }
    27     //writable反串行化
    28     public void readFields(DataInput in) throws IOException {
    29         word = in.readUTF();
    30         count = in.readInt();
    31 
    32     }
    33     //DB串行化
    34     public void write(PreparedStatement st) throws SQLException {
    35         st.setString(1,word);
    36         st.setInt(2,count);
    37 
    38     }
    39     //DB反串行
    40     public void readFields(ResultSet rs) throws SQLException {
    41         word = rs.getString(1);
    42         count = rs.getInt(2);
    43     }
    44     public String getWord() {
    45         return word;
    46     }
    47     public void setWord(String word) {
    48         this.word = word;
    49     }
    50     public int getCount() {
    51         return count;
    52     }
    53     public void setCount(int count) {
    54         this.count = count;
    55     }
    56 }
    MyDBWritable2.java 文件内容
     1 /*
     2 @author :yinzhengjie
     3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
     4 EMAIL:y1053419035@qq.com
     5 */
     6 package cn.org.yinzhengjie.dbformat;
     7 
     8 import org.apache.hadoop.io.IntWritable;
     9 import org.apache.hadoop.io.LongWritable;
    10 import org.apache.hadoop.io.Text;
    11 import org.apache.hadoop.mapreduce.Mapper;
    12 
    13 import java.io.IOException;
    14 
    15 /**
    16  * 注意MyDBWritable为数据库输入格式哟
    17  */
    18 public class DBMapper extends Mapper<LongWritable, MyDBWritable, Text, IntWritable> {
    19     @Override
    20     protected void map(LongWritable key, MyDBWritable value, Context context) throws IOException, InterruptedException {
    21         String line = value.getLine();
    22         String[] arr = line.split(" ");
    23         for(String word : arr){
    24             context.write(new Text(word), new IntWritable(1));
    25         }
    26     }
    27 }
    DBMapper.java 文件内容
     1 /*
     2 @author :yinzhengjie
     3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
     4 EMAIL:y1053419035@qq.com
     5 */
     6 package cn.org.yinzhengjie.dbformat;
     7 
     8 import org.apache.hadoop.io.IntWritable;
     9 import org.apache.hadoop.io.NullWritable;
    10 import org.apache.hadoop.io.Text;
    11 import org.apache.hadoop.mapreduce.Reducer;
    12 
    13 import java.io.IOException;
    14 
    15 public class DBReducer extends Reducer<Text, IntWritable, MyDBWritable2, NullWritable> {
    16     protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    17         Integer sum = 0;
    18         for (IntWritable value : values) {
    19             sum += value.get();
    20         }
    21         MyDBWritable2 db = new MyDBWritable2();
    22         //设置需要往数据表中写入数据的值
    23         db.setWord(key.toString());
    24         db.setCount(sum);
    25         //将数据写到到数据库中
    26         context.write(db,NullWritable.get());
    27     }
    28 }
    DBReducer.java 文件内容
     1 /*
     2 @author :yinzhengjie
     3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
     4 EMAIL:y1053419035@qq.com
     5 */
     6 package cn.org.yinzhengjie.dbformat;
     7 
     8 import org.apache.hadoop.conf.Configuration;
     9 import org.apache.hadoop.io.IntWritable;
    10 import org.apache.hadoop.io.Text;
    11 import org.apache.hadoop.mapreduce.Job;
    12 import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
    13 import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
    14 import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
    15 
    16 public class DBApp {
    17 
    18     public static void main(String[] args) throws Exception {
    19 
    20         Configuration conf = new Configuration();
    21         conf.set("fs.defaultFS","file:///");
    22         Job job = Job.getInstance(conf);
    23 
    24         job.setJobName("DB");
    25         job.setJarByClass(DBApp.class);
    26 
    27         job.setOutputKeyClass(Text.class);
    28         job.setOutputValueClass(IntWritable.class);
    29 
    30         job.setMapperClass(DBMapper.class);
    31         job.setReducerClass(DBReducer.class);
    32 
    33         String driver = "com.mysql.jdbc.Driver";
    34         String url = "jdbc:mysql://192.168.0.254:5200/yinzhengjie";
    35         String name = "root";
    36         String pass = "yinzhengjie";
    37 
    38         DBConfiguration.configureDB(job.getConfiguration(), driver, url, name, pass);
    39 
    40         DBInputFormat.setInput(job, MyDBWritable.class,"select * from wordcount", "select count(*) from wordcount");
    41 
    42         //指定表名为“wordcount2”并指定字段为2
    43         DBOutputFormat.setOutput(job,"wordcount2",2);
    44 
    45         //指定输入输出格式
    46         job.setInputFormatClass(DBInputFormat.class);
    47         job.setOutputFormatClass(DBOutputFormat.class);
    48 
    49         job.waitForCompletion(true);
    50     }
    51 }

     运行以上代码之后,我们可以查看数据库wordcount2表中的数据是否有新的数据生成,具体操作如下:

  • 相关阅读:
    jQuery选择器
    CSS选择器性能分析
    JavaScript 之垃圾回收和内存管理
    六个字符,带你领略JavaScript (js的艺术编写)
    Redis(1) 简介以及linux环境下的安装
    Teamviewer被商业检测处理办法
    Linux 分配/home的磁盘空间给根目录
    vmware虚拟机安装Oracle Linux 出现Unable to open the image
    Vim编辑器常用命令
    Xshell连接虚拟机中的Ubuntu
  • 原文地址:https://www.cnblogs.com/yinzhengjie/p/9297573.html
Copyright © 2011-2022 走看看