zoukankan      html  css  js  c++  java
  • 深入理解Hadoop读书笔记-3

    背景

    公司的物流业务系统目前实现了使用storm集群进行过门事件的实时计算处理,但是还有一个需求,我们需要存储每个标签上传的每条明细数据,然后进行定期的标签报表统计,这个是目前的实时计算框架无法满足的,需要考虑离线存储和计算引擎。

    标签的数据量是巨大的,此时存储在mysql中是不合适的,所以我们考虑了分布式存储系统HDFS。目前考虑的架构是,把每条明细数据存储到HDFS中,利用Hive或者其他类SQL的解析引擎,定期进行离线统计计算。

    查找相关资料后,我下载了深入理解Haddoop这本书,从大数据的一些基础原理开始调研,这一系列的笔记就是调研笔记。

    系列文章:

    深入理解Hadoop读书笔记1

    深入理解Hadoop读书笔记2

    深入理解Hadoop-基础部分

    这篇笔记主要是参考慕课网的 Hadoop3基础与电商行为日志分析 新手也能学会的大数据入门课这门课程,之所以加入基础部分,是因为跟着书籍目录在搭建好Linux下的Hadoop的运行环境后,继续往下看书时,发现在IDEA部分卡住了,所以回过头参考视频来学习如何使用IDEA开发,以及学习HDFS的API和MapReduce的一部分API,通过实操简单入门。

    1. HDFS的API使用

    深入理解Hadoop读书笔记2](https://www.cnblogs.com/ging/p/13565645.html)中讲解过Hadoop在Linux环境下的部署,在虚拟机中安装好Hadoop后,我们需要使用IDEA来进行开发调试。

    下面会讲解Linux下使用IDEA,通过HDFS的JAVA API,来连接并操作虚拟机中的HDFS创建一个文件夹的具体过程。

    IDEA的安装和破解这里略过,默认读者是有一定JAVA开发经验的。

    安装好IDEA后,新建一个maven的项目,这里选择下面的quickstart模板。

    建立好项目后,修改pom文件,增加hadoop-client相关,因为我虚拟机中安装的是2.10.0版本,所以这里也使用相同的客户端版本。

     <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-client</artifactId>
          <version>2.10.0</version>
     </dependency>
    

    然后建立一个Class文件,添加下面的代码,即可实现,远程连接HDFS并在其中新建一个路径为/hdfs/test的文件夹。

    这里有几个要点需要注意:

    1. HDFS进行伪分布式部署的时候,core-site.xml中填写的是localhost,现在需要修改为虚拟机的ip地址,否则使用下面代码连接HDFS时会报下面的异常。

      failed on connection exception: java.net.ConnectException:Connection refused
      
      <configuration>
      <property>
              <name>fs.defaultFS</name>
              <value>hdfs://192.168.202.129:9000</value>
          </property>
      </configuration>
      
      
    2. 用户名需要填写为HDFS中的文件夹拥有者的用户名

      用命令查看下可知,文件夹的所有者为ging

      ging@ubuntu:~/hadoop/hadoop-2.10.0$ bin/hdfs dfs -ls /
      Found 3 items
      drwxr-xr-x   - ging supergroup          0 2020-09-01 23:47 /hdfs
      drwx------   - ging supergroup          0 2020-08-30 20:26 /tmp
      drwxr-xr-x   - ging supergroup          0 2020-08-30 20:27 /user
      

    完整代码如下:

    package org.example;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    import java.net.URI;
    
    public class HDFSApp {
    
        public static void main(String[] args) throws Exception {
            //注意URI中为虚拟机中的HDFS的地址和端口
            FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.202.129:9000"), new Configuration(), "ging");
            //给定一个路径,新建一个文件夹,并打印返回结果
            boolean result = fileSystem.mkdirs(new Path("/hdfs/test"));
            System.out.println(result);
        }
    }
    
    

    返回结果:

    Connected to the target VM, address: '127.0.0.1:59339', transport: 'socket'
    true
    Disconnected from the target VM, address: '127.0.0.1:59339', transport: 'socket'
    

    除了上面用来演示讲解的创建文件夹功能,下面的代码还记录了,常用的基本HDFS的JAVA API,可以参考

    package org.example;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.*;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.util.Progressable;
    import org.junit.After;
    import org.junit.Before;
    import org.junit.Test;
    
    import java.io.*;
    import java.net.URI;
    
    /**
     * Unit test for simple App.
     */
    public class AppTest {
    
        public static final String HDFS = "hdfs://192.168.202.129:9000";
        private Configuration configuration;
        private FileSystem fileSystem;
    
        @Before
        public void setUp() throws Exception {
            System.out.println("---setup---");
            configuration = new Configuration();
            fileSystem = FileSystem.get(new URI(HDFS), configuration, "ging");
        }
    
        @Test
        public void mkdirs() throws IOException {
            boolean result = fileSystem.mkdirs(new Path("/hdfs/test/test"));
            System.out.println(result);
        }
    
        @Test
        public void text() throws Exception {
            FSDataInputStream inputStream = fileSystem.open(new Path("/user/ging/input/core-site.xml"));
            IOUtils.copyBytes(inputStream, System.out, 1024);
        }
    
        @Test
        public void create() throws Exception {
            FSDataOutputStream outputStream = fileSystem.create(new Path("/hdfs/test/test/a.txt"));
            outputStream.writeUTF("hello world");
            outputStream.flush();
            outputStream.close();
        }
    
        @Test
        public void rename() throws Exception {
            Path oldPath = new Path("/hdfs/test/test/a.txt");
            Path newPath = new Path("/hdfs/test/test/b.txt");
            boolean rename = fileSystem.rename(oldPath, newPath);
            System.out.println(rename);
        }
    
    
        @Test
        public void copyFromLocal() throws Exception {
            Path local = new Path("C:\Users\wgg96\Documents\personal-code\suanfa-note\CMakeLists.txt");
            Path remote = new Path("/hdfs/test/test/c.txt");
            fileSystem.copyFromLocalFile(local, remote);
        }
    
        @Test
        public void copyBigFileWithProgress() throws Exception {
            InputStream inputStream = new BufferedInputStream(new FileInputStream(new File("C:\Users\wgg96\Documents\安装包\开发\jdk-8u251-windows-x64.exe")));
    
            FSDataOutputStream outputStream = fileSystem.create(new Path("/hdfs/test/test/d.exe"), new Progressable() {
                @Override
                public void progress() {
                    System.out.print(".");
                }
            });
    
            IOUtils.copyBytes(inputStream, outputStream, 4096);
        }
    
    
        /**
         * TODO 这里在windows系统下跑会报异常
         *
         * @throws Exception
         */
        @Test
        public void copyToLocal() throws Exception {
            fileSystem.copyToLocalFile(new Path("/hdfs/test/test/d.exe"), new Path("/tmp/d.exe"));
        }
    
    
        @Test
        public void listFile() throws Exception {
            FileStatus[] fileStatuses = fileSystem.listStatus(new Path("/hdfs/test/test/"));
            for (FileStatus fileStatus : fileStatuses) {
                String isDir = fileStatus.isDirectory() ? "文件夹" : "文件";
                String permission = fileStatus.getPermission().toString();
                short replication = fileStatus.getReplication();
                String owner = fileStatus.getOwner();
                String group = fileStatus.getGroup();
                String path = fileStatus.getPath().toString();
                System.out.println(isDir + "	" +
                        permission + "	" +
                        replication + "	" +
                        owner + "	" +
                        group + "	" +
                        path + "	"
                );
            }
        }
    
    
        @After
        public void tearDown() {
            configuration = null;
            fileSystem = null;
            System.out.println("---teardown---");
        }
    }
    
    

    2. MapReduce的API使用

    这部分内容,是使用MapReduce编写了一个词频统计任务,代码中使用了两种模式

    • 输入输出使用HDFS -对应了APP
    • 输入输出使用本地路径模式 -对应了APPLocal

    核心代码如下

    Mapper

    package org.example;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    
    /**
     * LongWritable:偏移量
     * Text:每一行的字符串
     * Text:单词
     * IntWritable:单词词频,map阶段,默认为1
     */
    public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //分割每一行
            String oriText = value.toString();
            String[] strings = oriText.split("	");
    
            //拿到一个单词就把词频写到上下文中
            for (String word : strings) {
                context.write(new Text(word), new IntWritable(1));
            }
        }
    }
    

    Reducer

    package org.example;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    import java.util.Iterator;
    
    /**
     * Text:词频统计中的每一个单词
     * Iterable<IntWritable> : 词频统计中对应的多个1,可以理解成一个列表
     * Text:输出每一个单词
     * IntWritable:每个单词的总词频
     */
    public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            Iterator<IntWritable> iterator = values.iterator();
            int finalCount = 0;
            while (iterator.hasNext()) {
                IntWritable intWritable = iterator.next();
                finalCount += intWritable.get();
            }
            context.write(key, new IntWritable(finalCount));
        }
    }
    
    

    APPLocal

    package org.example;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.File;
    import java.io.IOException;
    
    /**
     * Hello world!
     */
    public class AppLocal {
    
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    //        System.setProperty("HADOOP_USER_NAME", "ging");
            Configuration configuration = new Configuration();
    //        configuration.set("fs.defaultFS", "hdfs://192.168.202.129:9000");
            Job job = Job.getInstance(configuration);
    
            job.setJarByClass(AppLocal.class);
            job.setMapperClass(WordCountMapper.class);
            job.setReducerClass(WordCountReducer.class);
    
            //增加combiner操作
            job.setCombinerClass(WordCountReducer.class);
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            File file = new File("/home/ging/Documents/hadoop-learn/output");
            System.out.println(deleteDir(file));
    
            FileInputFormat.setInputPaths(job, new Path("/home/ging/a.txt"));
            FileOutputFormat.setOutputPath(job, new Path("output"));
    
            boolean result = job.waitForCompletion(true);
            System.out.println(result);
        }
    
        /**
         * delete file
         * @param src
         * @return
         */
        private static boolean deleteDir(File src) {
            try {
                if (src.isFile()) {
                    src.delete();
                } else {
                    String[] list = src.list();
                    for (String s : list) {
                        deleteDir(new File(src, s));
                    }
                    src.delete();
                }
                return true;
            } catch (Exception e) {
                e.printStackTrace();
                return false;
            }
        }
    }
    
    

    APP

    package org.example;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.net.URI;
    
    /**
     * Hello world!
     */
    public class App {
    
        public static void main(String[] args) throws Exception {
            System.setProperty("HADOOP_USER_NAME", "ging");
            Configuration configuration = new Configuration();
            configuration.set("fs.defaultFS", "hdfs://192.168.202.129:9000");
            Job job = Job.getInstance(configuration);
    
            job.setJarByClass(App.class);
            job.setMapperClass(WordCountMapper.class);
            job.setReducerClass(WordCountReducer.class);
    
            //增加combiner操作
            job.setCombinerClass(WordCountReducer.class);
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.202.129:9000"), configuration, "ging");
    
            Path output = new Path("/hdfs/test/output");
            if (fileSystem.exists(output)) {
                fileSystem.delete(output, true);
            }
    
            FileInputFormat.setInputPaths(job, new Path("/hdfs/test/input"));
            FileOutputFormat.setOutputPath(job, output);
    
            boolean result = job.waitForCompletion(true);
            System.out.println(result);
        }
    }
    
    
  • 相关阅读:
    USB小白学习之路(6) IIC EEPROM读取解析
    USB小白学习之路(5) HID鼠标程序
    USB小白学习之路(4)HID键盘程序
    USB小白学习之路(3) 通过自定义请求存取外部RAM
    USB小白学习之路(2)端点IN/OUT互换
    USB小白学习之路(1) Cypress固件架构解析
    LeetCode -- 14 最长公共前缀
    初识docker——对docker的理解
    洛谷 P5461 赦兔战俘
    知识碎片 —— 数组 与 伪数组
  • 原文地址:https://www.cnblogs.com/ging/p/13602346.html
Copyright © 2011-2022 走看看