zoukankan      html  css  js  c++  java
  • 寒假学习(四)编写MapReduce程序清洗信件内容数据

    对爬取到的数据进行清洗,按照一定的规则把“脏数据”“洗掉”。

    数据清洗是对数据进行重新审查和校验的过程,目的在于删除重复信息、纠正存在的错误,并提供数据一致性。

    import java.io.BufferedReader;  
    import java.io.InputStreamReader;  
      
    import java.io.IOException;  
      
    import org.apache.hadoop.fs.FSDataInputStream;  
    import org.apache.hadoop.fs.FileSystem;  
    import org.apache.hadoop.fs.Path;  
      
    import org.apache.hadoop.io.Text;  
    import org.apache.hadoop.mapreduce.InputSplit;  
    import org.apache.hadoop.mapreduce.JobContext;  
    import org.apache.hadoop.mapreduce.RecordReader;  
    import org.apache.hadoop.mapreduce.TaskAttemptContext;  
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;  
      
    public class FileRecordReader extends RecordReader<text,text>{  
      
        private FileSplit fileSplit;  
        private JobContext jobContext;  
        private Text currentKey = new Text();  
        private Text currentValue = new Text();  
        private boolean finishConverting = false;  
        @Override  
        public void close() throws IOException {  
      
      
        @Override  
        public Text getCurrentKey() throws IOException, InterruptedException {  
            return currentKey;  
        }  
      
        @Override  
        public Text getCurrentValue() throws IOException,  
                InterruptedException {  
            return currentValue;  
        }  
      
        @Override  
        public float getProgress() throws IOException, InterruptedException {  
            float progress = 0;  
            if(finishConverting){  
                progress = 1;  
            }  
            return progress;  
        }  
      
        @Override  
        public void initialize(InputSplit arg0, TaskAttemptContext arg1)  
                throws IOException, InterruptedException {  
            this.fileSplit = (FileSplit) arg0;  
            this.jobContext = arg1;  
            String filename = fileSplit.getPath().getName();  
            this.currentKey = new Text(filename);  
        }  
      
        @Override  
        public boolean nextKeyValue() throws IOException, InterruptedException {  
            if(!finishConverting){  
                int len = (int)fileSplit.getLength();  
    //          byte[] content = new byte[len];  
                Path file = fileSplit.getPath();  
                FileSystem fs = file.getFileSystem(jobContext.getConfiguration());  
                FSDataInputStream in = fs.open(file);  
    //根据实际网页的编码格式修改  
     //         BufferedReader br = new BufferedReader(new InputStreamReader(in,"gbk"));  
                BufferedReader br = new BufferedReader(new InputStreamReader(in,"utf-8"));  
                String line="";  
                String total="";  
                while((line= br.readLine())!= null){  
                    total =total+line+"
    ";  
                }  
                br.close();  
                in.close();  
                fs.close();  
                currentValue new Text(total);  
                finishConverting true;  
                return true;  
            }  
            return false;  
        }  
      
    }  

     完整代码:

    import java.io.IOException;  
    import org.apache.hadoop.fs.Path;  
    import org.apache.hadoop.io.IntWritable;  
    import org.apache.hadoop.io.Text;  
    import org.apache.hadoop.mapreduce.Job;  
    import org.apache.hadoop.mapreduce.Mapper;  
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
    import cn.wanghaomiao.xpath.exception.XpathSyntaxErrorException;  
    import cn.wanghaomiao.xpath.model.JXDocument;  
      
    public class QingxiHtml {  
        public static class doMapper extends Mapper<object, text,="" text=""> {  
            public static final IntWritable one = new IntWritable(1);  
            public static Text word = new Text();  
      
            @Override  
            protected void map(Object key, Text value, Context context)  
                    throws IOException, InterruptedException {  
                String htmlStr = value.toString();  
                JXDocument Document new JXDocument(htmlStr);  
                if (htmlStr.indexOf("mail_track_h2") > 0) {  
                    try {  
                        String leixing = Document  
                                .sel("//span[@class='font12 gray']/a[2]/text()")  
                                .get(0).toString();  
                        String biaoti = Document  
                                .sel("//h2[@class='mail_track_h2']/text()").get(0)  
                                .toString();  
                        String leixinren = Document  
                                .sel("//p[@class='font12 gray time_mail']/span[1]/text()")  
                                .get(0).toString().replaceAll("来信人:", "");  
                        String shijian = Document  
                                .sel("//p[@class='font12 gray time_mail']/span[2]/text()")  
                                .get(0).toString().replaceAll("时间:", "");  
                        String number = Document  
                                .sel("//p[@class='font12 gray time_mail']/span[3]/allText()")  
                                .get(0).toString().replace("网友同问: ", "").replace("网友评价数: ", "");  
                        String problem = Document  
                                .sel("//span[@class='font14 mail_problem']/text()")  
                                .get(0).toString();  
                        if (htmlStr.indexOf("margin-bottom:31px") > 0) {  
                        String offic = Document  
                                    .sel("//div[@class='con_left float_left']/div[2]/span[1]/text()")  
                                    .get(0).toString();  
                        String officpt = Document  
                                    .sel("//div[@class='con_left float_left']/div[2]/span[2]/text()")  
                                    .get(0).toString();  
      
                            String officp = Document  
                                    .sel("//div[@class='con_left float_left']/div[2]/p[1]/text()")  
                                    .get(0).toString();  
                        String dataout = leixing + "	" + biaoti + "	"  
                                    + leixinren + "	" + shijian + "	" + number  
                                    + "	" + problem + "	" + offic + "	"  
                                    + officpt + "	"+ officp;  
                            System.out.println(dataout);  
                            Text oneLines new Text(dataout);  
                            context.write(oneLines, new Text(""));  
                    } else {  
                            String dataout = leixing + "	" + biaoti + "	"  
                                    + leixinren + "	" + shijian + "	" + number  
                                    + "	" + problem;  
                            System.out.println(dataout);  
                            Text oneLines new Text(dataout);  
                            context.write(oneLines, new Text(""));  
                        }  
      
                    } catch (XpathSyntaxErrorException e) {  
                        // TODO Auto-generated catch block  
                        e.printStackTrace();  
                    }  
                }  
            }  
        }  
      
        public static void main(String[] args) throws IOException,  
                ClassNotFoundException, InterruptedException {  
            Job job = Job.getInstance();  
            job.setJobName("QingxiHtml");  
            job.setJarByClass(QingxiHtml.class);  
            job.setMapperClass(doMapper.class);  
      
            job.setOutputKeyClass(Text.class);  
            job.setOutputValueClass(Text.class);  
            job.setInputFormatClass(FileInput.class);  
            Path in new Path("hdfs://localhost:9000//myedu2/in");  
            Path out new Path("hdfs://localhost:9000//myedu2/out/1");  
            FileInputFormat.addInputPath(job, in);  
            FileOutputFormat.setOutputPath(job, out);  
            System.exit(job.waitForCompletion(true) ? 0 : 1);  
        }  
    }  

     

  • 相关阅读:
    【原创】C# 文件操作详解(三)Directory类
    【原创】C# 文件操作详解(一)File类
    【原创】VS使用技巧——工欲善其事必先利其器
    【原创】C# 文件操作详解(四)DirectoryInfo类
    strpos用法
    调试跳转动态打印
    解决DIV超出样式长度自动换行
    PHP时间戳常用转换在(大、小月问题)
    懒人JS
    PHP 快速排序 与二维数组排序
  • 原文地址:https://www.cnblogs.com/sonofdemon/p/12253401.html
Copyright © 2011-2022 走看看