zoukankan      html  css  js  c++  java
  • 《Hadoop学习之路》学习实践二——配置idea远程调试hadoop

    背景:在上篇文章中按照大神“扎心了老铁”的博客,在服务器上搭建了hadoop的伪分布式环境。大神的博客上是使用eclipse来调试,但是我入门以来一直用的是idea,eclipse已经不习惯,于是便摸索着配置了idea远程调试hadoop的环境。

    步骤一:

    下载与服务器上的hadoop同版本的hadoop安装包,我下载的是hadoop-2.7.7.tar.gz,下载后解压到某个目录,比如D:Softwarehadoop-2.7.7,不需要其他配置和安装

    步骤二:

    下载hadooop.dll和winutils.exe,这两个文件主要是调试运行的时候要用,不然运行会报错。我是在网上下载的2.7.x版本的压缩包。解压后放置在D:Softwarehadoop-2.7.7/bin/文件夹下:

    同时将hadoop.dll文件复制到C:WindowsSystem32下,并且重启电脑,否则会报错

    Exception in thread "main"java.lang.UnsatisfiedLinkError:org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z

    步骤三:

    配置系统环境变量:

    配置环境变量Path,在现有Path后追加 %HADOOP_HOME%in;%HADOOP_HOME%sbin;

    步骤四:

    在idea中新建一个maven项目,配置pom.xml依赖

    <dependencies>
            <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>2.7.7</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>2.7.7</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-hdfs</artifactId>
                <version>2.7.7</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-mapreduce-client-core</artifactId>
                <version>2.7.7</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
                <version>2.7.7</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-mapreduce-client-common</artifactId>
                <version>2.7.7</version>
            </dependency>
    
        </dependencies>

    步骤五:将服务器上集群中hadoop/etc/hadoop/下的core-site.xml和hdfs-site.xml和log4j.properties配置复制到idea项目下的resources路径下

    注意1:core-site.xml中的fs.defaultFS的值的ip不可用localhost,否则服务器不暴露ip,从PC本地上无法连上hadoop服务;ip在服务器和本地同步修改,服务器端修改后需重启hadoop服务

    步骤六:代码测试

    测试1-网上的hdfs读写的例子up2hdfs.java,直接运行即可

    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    
    
    
    import java.io.*;
    import java.net.URI;
    
    public class up2hdfs {
    
    
        private static String HDFSUri = "hdfs://10.164.96.220:9000";
    
        /**
         * 1、获取文件系统
         *
         * @retrun FileSystem 文件系统
         */
    
        public static FileSystem getFileSystem(){
    
            //读取配置文件
            Configuration conf = new Configuration();
    
            //文件系统
            FileSystem fs = null;
            String hdfsUri = HDFSUri;
            if (StringUtils.isBlank(hdfsUri)){
                //返回默认文件系统,如果在hadoop集群下运行,使用此种方法可直接获取默认文件系统;
                try{
                    fs = FileSystem.get(conf);
                }catch(IOException e){
                    e.printStackTrace();
                }
            }else{
                //返回指定的文件系统,如果在本地测试,需要此种方法获取文件系统;
                try{
                    URI uri = new URI(hdfsUri.trim());
                    fs = FileSystem.get(uri,conf);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            return fs ;
        }
    
    
        /**
         * 2、创建文件目录
         * @param path 文件路径
         */
        public static void mkdir(String path){
    
            try {
                FileSystem fs = getFileSystem();
                System.out.println("FilePath"+path);
                //创建目录
                fs.mkdirs(new Path(path));
                //释放资源
                fs.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 3、判断目录是否存在
         *
         * @param filePath 目录路径
         * @param create 若不存在是否创建
         *
         */
        public static boolean existDir(String filePath,boolean create){
    
            boolean flag = false;
    
            if (StringUtils.isNotEmpty(filePath)){
                return flag;
            }
    
            try{
                Path path = new Path(filePath);
                //FileSystem对象
                FileSystem fs = getFileSystem();
                if (create){
                    if (!fs.exists(path)){
                        fs.mkdirs(path);
                    }
                }
    
                if (fs.isDirectory(path)){
                    flag = true;
                }
    
            }catch (Exception e){
                e.printStackTrace();
    
            }
    
            return flag;
    
        }
    
        /**
         * 4、本地文件上传至HDFS
         *
         * @param srcFile 源文件路径
         * @param destPath 目的文件路径
         */
    
        public static void copyFileToHDFS(String srcFile,String destPath) throws Exception{
    
            FileInputStream fis = new FileInputStream(new File(srcFile));//读取本地文件
            Configuration config = new Configuration();
            FileSystem fs = FileSystem.get(URI.create(HDFSUri+destPath),config);
            OutputStream os = fs.create(new Path(destPath));
            //cpoy
            IOUtils.copyBytes(fis,os,4096,true);
    
            System.out.println("copy 完成 ......");
            fs.close();
        }
    
        /**
         * 5、从HDFS下载文件到本地
         *
         * @param srcFile 源文件路径
         * @param destPath 目的文件路径
         *
         */
        public static void getFile(String srcFile,String destPath)throws Exception{
    
            //HDFS文件地址
            String file = HDFSUri+srcFile;
            Configuration config = new Configuration();
            //构建filesystem
            FileSystem fs = FileSystem.get(URI.create(file),config);
            //读取文件
            InputStream is = fs.open(new Path(file));
            IOUtils.copyBytes(is,new FileOutputStream(new File(destPath)),2048,true);
            System.out.println("下载完成......");
            fs.close();
        }
    
        /**
         * 6、删除文件或者文件目录
         *
         * @param path
         */
        public static void rmdir(String path){
    
            try {
                //返回FileSystem对象
                FileSystem fs = getFileSystem();
    
                String hdfsUri = HDFSUri;
                if (StringUtils.isNotBlank(hdfsUri)){
    
                    path = hdfsUri+path;
                }
                System.out.println("path"+path);
                //删除文件或者文件目录 delete(Path f)此方法已经弃用
                System.out.println(fs.delete(new Path(path),true));
    
                fs.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        }
    
        /**
         * 7、读取文件的内容
         *
         * @param filePath
         * @throws IOException
         */
        public static void readFile(String filePath)throws IOException{
    
            Configuration config = new Configuration();
            String file = HDFSUri+filePath;
            FileSystem fs = FileSystem.get(URI.create(file),config);
            //读取文件
            InputStream is =fs.open(new Path(file));
            //读取文件
            IOUtils.copyBytes(is, System.out, 2048, false); //复制到标准输出流
            fs.close();
        }
    
    
        /**
         * 主方法测试
         */
        public static void main(String[] args) throws Exception {//200         //连接fs
    
            FileSystem fs = getFileSystem();
            System.out.println(fs.getUsed());
            //创建路径
            mkdir("/dit2");
            //验证是否存在
            System.out.println(existDir("/dit2",false));
            //上传文件到HDFS
            copyFileToHDFS("G:\testFile\HDFSTest.txt","/dit/HDFSTest.txt");
            //下载文件到本地
            getFile("/dit/HDFSTest.txt","G:\HDFSTest.txt");
            // getFile(HDFSFile,localFile);
            //删除文件
            rmdir("/dit2");
            //读取文件
            readFile("/dit/HDFSTest.txt");
        }
    
    }

    测试2-网上的mapreduce的例子

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    public class WcMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
    // IntWritable one=new IntWritable(1);
            String line=value.toString();
            StringTokenizer st=new StringTokenizer(line);
    //StringTokenizer "kongge"
            while (st.hasMoreTokens()){
                String word= st.nextToken();
                context.write(new Text(word),new IntWritable(1));   //output
            }
        }
    }
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    import java.io.IOException;
    
    /**
     * Created by iespark on 2/26/16.
     */
    public class McReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> iterable, Context context) throws IOException, InterruptedException {
            int sum=0;
            for (IntWritable i:iterable){
                sum=sum+i.get();
            }
            context.write(key,new IntWritable(sum));
        }
    }
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    /**
     * Created by iespark on 2/26/16.
     */
    public class JobRun {
        public static void main(String[] args){
            System.setProperty("hadoop.home.dir", "D:\Software\hadoop-2.7.7");
            Configuration conf=new Configuration();
            try{
                Job job = Job.getInstance(conf, "word count");
    //            Configuration conf, String jobName;
                job.setJarByClass(JobRun.class);
                job.setMapperClass(WcMapper.class);
                job.setReducerClass(McReducer.class);
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(IntWritable.class);
                //解决No job jar file set. User classes may not be found. See Job or Job#setJar(String)报错的问题
    //            job.setJar("E:\idea2017workspace\myhadoop\out\artifacts\myhadoop_jar\myhadoop.jar");
    
                FileInputFormat.addInputPath(job,new Path(args[0]));
                FileSystem fs= FileSystem.get(conf);
                Path op1=new Path(args[1]);
                if(fs.exists(op1)){
                    fs.delete(op1, true);
                    System.out.println("存在此输出路径,已删除!!!");
                }
                FileOutputFormat.setOutputPath(job,op1);
                System.exit(job.waitForCompletion(true)?0:1);
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

    JobRun.java运行时需要输入参数,hdfs计算输入和输出的路径,或者在run configurations中配置,如下图,注意路径的hdfs地址需要为core-site.xml中的地址

  • 相关阅读:
    实用JS代码
    javascript-table出现滚动条表格自动对齐
    Javascript-关于for in和forEach
    Javascript-关于break、continue、return语句
    如何编写可怕的Java代码?
    请停止编写这么多的for循环!
    如何优雅地在Stack Overflow提问?
    为什么阿里巴巴Java开发手册中强制要求不要在foreach循环里进行元素的remove和add操作?
    Java异常处理只有Try-Catch吗?
    看完这篇还不会用Git,那我就哭了!
  • 原文地址:https://www.cnblogs.com/suntingme/p/10339110.html
Copyright © 2011-2022 走看看