zoukankan      html  css  js  c++  java
  • Hadoop 系列(三)Java API

    Hadoop 系列(三)Java API

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs</artifactId>
        <version>2.9.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.9.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>2.9.2</version>
    </dependency>
    

    一、HDFS 操作

    @Test
    public void upload() throws Exception {
    
        Configuration conf = new Configuration();  // (1) 
        //conf.set("fs.defaultFS", "hdfs://master:9000/");
    
        Path dst = new Path("hdfs://master:9000/upload/MPSetup4.log");
        FileSystem fs = FileSystem.get(new URI("hdfs://master:9000/"), conf, "hadoop"); // (2)
        FSDataOutputStream os = fs.create(dst);
        FileInputStream is = new FileInputStream("c:/MPSetup.log");
    
        IOUtils.copy(is, os);
    }
    
    1. Configuration 配置文件默认读取 resources 目录下的 core-site.xml、hdfs-site.xml、mapred-site.xml、yarn-site.xml 文件。可以将 Hadoop 安装目录下的这些配制文件直接拷贝过来,也可以直接 conf.set() 设置参数。

    2. FileSystem.get() 必须要以 hadoop 的身份运行,否则会出现权限不足的问题。可以配置 -DHADOOP_USER_NAME=hadoop 参数。

    下面提供一个 HdfsUtil 工具类:

    public class HdfsUtil {
        FileSystem fs = null;
    
        @Before
        public void init() throws Exception{
            System.setProperty("hadoop.home.dir", "D:/Program_Files/apache/hadoop-common-bin/");
            //1. 读取classpath下的xxx-site.xml 配置文件,并解析其内容,封装到conf对象中
            Configuration conf = new Configuration();
    
            //2. 也可以在代码中对conf中的配置信息进行手动设置,会覆盖掉配置文件中的读取的值
            conf.set("fs.defaultFS", "hdfs://master:9000/");
    
            //3. 根据配置信息,去获取一个具体文件系统的客户端操作实例对象
            fs = FileSystem.get(new URI("hdfs://master:9000/"), conf, "hadoop");
        }
    
        /** 上传文件,封装好的写法 */
        @Test
        public void upload2() throws Exception, IOException{
            fs.copyFromLocalFile(new Path("c:/MPSetup.log"),
                    new Path("hdfs://master:9000/aaa/bbb/ccc/MPSetup.log"));
        }
    
    
        /** 下载文件 */
        @Test
        public void download() throws Exception {
            fs.copyToLocalFile(new Path("hdfs://master:9000/aaa/bbb/ccc/MPSetup.log"),
                    new Path("d:/MPSetup2.txt"));
    
        }
    
        /** 查看文件信息 */
        @Test
        public void listFiles() throws FileNotFoundException, IllegalArgumentException, IOException {
    
            // listFiles列出的是文件信息,而且提供递归遍历
            RemoteIterator<LocatedFileStatus> files = fs.listFiles(new Path("/"), true);
    
            while(files.hasNext()) {
                LocatedFileStatus file = files.next();
                Path filePath = file.getPath();
                String fileName = filePath.getName();
                System.out.println(fileName);
            }
    
            System.out.println("---------------------------------");
    
            //listStatus 可以列出文件和文件夹的信息,但是不提供自带的递归遍历
            FileStatus[] listStatus = fs.listStatus(new Path("/"));
            for(FileStatus status: listStatus){
                String name = status.getPath().getName();
                System.out.println(name + (status.isDirectory()?" is dir":" is file"));
            }
        }
    
        /** 创建文件夹 */
        @Test
        public void mkdir() throws IllegalArgumentException, Exception {
            fs.mkdirs(new Path("/aaa/bbb/ccc"));
        }
    
        /** 删除文件或文件夹 */
        @Test
        public void rm() throws IllegalArgumentException, IOException {
            fs.delete(new Path("/aa"), true);
        }
    }
    

    二、RPC 调用

    (1) LoginServiceInterface 接口

    package com.github.binarylei.hadoop.rpc;
    
    public interface LoginServiceInterface {
        
        public static final long versionID = 1L;
        public String login(String username, String password);
    
    }
    
    public class LoginServiceImpl implements LoginServiceInterface {
    
        @Override
        public String login(String username, String password) {       
            return username + " login in successfully!";
        }
    }
    

    (2) RPCServer

    // 目前只能上传到 Linux 上运行 ??????
    public class RPCServer {
    
        private static String host = "master";
        private static int port = 10001;
    
        public static void main(String[] args) throws HadoopIllegalArgumentException, IOException {
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS", "hdfs://master:9000/");
            Builder builder = new Builder(conf);
            
            builder.setBindAddress("master")
                    .setPort(port)
                    .setProtocol(LoginServiceInterface.class)
                    .setInstance(new LoginServiceImpl());
            
            Server server = builder.build();
            
            server.start();
        }
    }
    
    1. 将打包后的 hadoop-api-1.0.0.jar 上传到 Linux,启动 RPC 服务,执行

      hadoop jar hadoop-api-1.0.0.jar com.github.binarylei.hadoop.rpc.RPCServer

      2018-05-13 18:20:16,606 INFO ipc.CallQueueManager: Using callQueue: class java.util.concurrent.LinkedBlockingQueue queueCapacity: 100 scheduler: class org.apache.hadoop.ipc.DefaultRpcScheduler
      2018-05-13 18:20:17,631 INFO ipc.Server: Starting Socket Reader #1 for port 10001
      2018-05-13 18:20:19,613 INFO ipc.Server: IPC Server Responder: starting
      2018-05-13 18:20:19,618 INFO ipc.Server: IPC Server listener on 10001: starting

    (3) RPCClient

    public class RPCClient {
    
        private static String host = "master";
        private static int port = 10001;
    
        public static void main(String[] args) throws Exception {
            System.setProperty("hadoop.home.dir", "D:/Program_Files/apache/hadoop-common-bin/");
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS", "hdfs://master:9000/");
    
            LoginServiceInterface proxy = RPC.getProxy(
                    LoginServiceInterface.class,
                    1L,
                    new InetSocketAddress(host, port),
                    conf);
            
            String result = proxy.login("hadoop-test", "test");
            
            System.out.println(result);
        }
    }
    
    1. 直接在 Windows 上运行,结果如下:

      hadoop-test login in successfully!

    三、MapReduce

    下面模仿 wordcount,写一个 MapReduce

    (1) WCMapper

    //4个泛型中,前两个是指定mapper输入数据的类型,KEYIN是输入的key的类型,VALUEIN是输入的value的类型
    //map 和 reduce 的数据输入输出都是以 key-value对的形式封装的
    //默认情况下,框架传递给我们的mapper的输入数据中,key是要处理的文本中一行的起始偏移量,这一行的内容作为value
    public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
    
        //mapreduce框架每读一行数据就调用一次该方法
        @Override
        protected void map(LongWritable key, Text value,Context context)
                throws IOException, InterruptedException {
            //具体业务逻辑就写在这个方法体中,而且我们业务要处理的数据已经被框架传递进来,在方法的参数中 key-value
            //key 是这一行数据的起始偏移量     value 是这一行的文本内容
    
            //将这一行的内容转换成string类型
            String line = value.toString();
    
            //对这一行的文本按特定分隔符切分
            String[] words = StringUtils.split(line, " ");
    
            //遍历这个单词数组输出为kv形式  k:单词   v : 1
            for(String word : words){
                context.write(new Text(word), new LongWritable(1));
            }
        }
    }
    

    (2) WCReducer

    public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
    
        //框架在map处理完成之后,将所有kv对缓存起来,进行分组,然后传递一个组<key,valus{}>,调用一次reduce方法
        //<hello,{1,1,1,1,1,1.....}>
        @Override
        protected void reduce(Text key, Iterable<LongWritable> values,Context context)
                throws IOException, InterruptedException {
    
            long count = 0;
            //遍历value的list,进行累加求和
            for(LongWritable value:values){
                count += value.get();
            }
    
            //输出这一个单词的统计结果
    
            context.write(key, new LongWritable(count));
        }
    }
    

    (3) WCReducer

    /**
     * 用来描述一个特定的作业
     * 比如,该作业使用哪个类作为逻辑处理中的map,哪个作为reduce
     * 还可以指定该作业要处理的数据所在的路径
     * 还可以指定改作业输出的结果放到哪个路径
     * ....
     * @author duanhaitao@itcast.cn
     */
    public class WCRunner {
    
        public static void main(String[] args) throws Exception {
    
            //System.setProperty("hadoop.home.dir", "D:/Program_Files/apache/hadoop-common-bin/");
            Configuration conf = new Configuration();
            Job wcjob = Job.getInstance(conf);
    
            //设置整个job所用的那些类在哪个jar包
            wcjob.setJarByClass(WCRunner.class);
    
            //本job使用的mapper和reducer的类
            wcjob.setMapperClass(WCMapper.class);
            wcjob.setReducerClass(WCReducer.class);
    
            //指定reduce的输出数据kv类型
            wcjob.setOutputKeyClass(Text.class);
            wcjob.setOutputValueClass(LongWritable.class);
    
            //指定mapper的输出数据kv类型
            wcjob.setMapOutputKeyClass(Text.class);
            wcjob.setMapOutputValueClass(LongWritable.class);
    
            //指定要处理的输入数据存放路径
            FileInputFormat.setInputPaths(wcjob, new Path("hdfs://master:9000/wc/input/"));
    
            //指定处理结果的输出数据存放路径
            FileOutputFormat.setOutputPath(wcjob, new Path("hdfs://master:9000/wc/output5/"));
    
            //将job提交给集群运行
            wcjob.waitForCompletion(true);
        }
    }
    

    四、Hadoop 运行(Windows)

    问题 1:缺少 winutils.exe 和 hadoop.dll

    # 缺少 winutils.exe
    Could not locate executable null inwinutils.exe in the hadoop binaries
    # 缺少 hadoop.dll
    Unable to load native-hadoop library for your platform… using builtin-Java classes where applicable
    

    解决办法:

    1. 下载地址:https://github.com/srccodes/hadoop-common-2.2.0-bin
    2. 解压后将 hadoop-common-2.2.0-bin/bin 目录下的文件全部拷贝到 HADOOP_HOME/bin 目录下,并配置 HADOOP_HOME 环境变量。
    3. 将 hadoop-common-2.2.0-bin/bin/hadoop.dll 拷贝到 C:WindowsSystem32 目录下。

    问题 2:Exception in thread "main" java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z

    解决办法:

    1. 首先确保 C:WindowsSystem32 目录下已经有 hadoop.dll 文件

    2. 在自己的工程中拷贝一份 org.apache.hadoop.io.nativeio.NativeIO 类,修改如下:

      public static boolean access(String path, AccessRight desiredAccess)
                      throws IOException {
          return true;
          //return access0(path, desiredAccess.accessRight());
      }
      

    参考:

    1. 《Hadoop 运行问题》:https://blog.csdn.net/congcong68/article/details/42043093
    2. 《winutils.exe 下载地址》:https://github.com/srccodes/hadoop-common-2.2.0-bin

    每天用心记录一点点。内容也许不重要,但习惯很重要!

  • 相关阅读:
    Java实现 蓝桥杯VIP 算法训练 传球游戏
    Java实现 蓝桥杯VIP 算法训练 Hanoi问题
    Java实现 蓝桥杯VIP 算法训练 蜜蜂飞舞
    Java实现 蓝桥杯VIP 算法训练 奇偶判断
    Java实现 蓝桥杯VIP 算法训练 传球游戏
    Java实现 蓝桥杯VIP 算法训练 Hanoi问题
    Java实现 蓝桥杯VIP 算法训练 Hanoi问题
    Java实现 蓝桥杯VIP 算法训练 蜜蜂飞舞
    Java实现 蓝桥杯VIP 算法训练 蜜蜂飞舞
    Qt: 访问容器(三种方法,加上for循环就四种了)good
  • 原文地址:https://www.cnblogs.com/binarylei/p/10460865.html
Copyright © 2011-2022 走看看