zoukankan      html  css  js  c++  java
  • Hadoop API:遍历文件分区目录,并根据目录下的数据进行并行提交spark任务

    hadoop api提供了一些遍历文件的api,通过该api可以实现遍历文件目录:

    import java.io.FileNotFoundException;
    import java.io.IOException;
    import java.net.URI;
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    public class BatchSubmitMain {
        public static void main(String[] args) throws Exception {
            String mrTableName = args[0];
            String fglibTableName = args[1];
    
            Configuration conf = new Configuration();
            /*
             * <property> <name>fs.defaultFS</name> <value>hdfs://hcluster</value>
             * </property>
             */
            conf.set("fs.defaultFS", "hdfs://hcluster");
            FileSystem fileSystem = FileSystem.get(conf);
    
            String mrFilePath = "/myuser/hivedb/" + mrTableName;
            String fglibFilePath = "/myuser/hivedb/" + fglibTableName;
    
            System.out.println(mrFilePath);
            List<String> mrObjectIdItems = getObjectIdItems(fileSystem, mrFilePath);
    
            System.out.println(fglibFilePath);
            List<String> fglibObjectIdItems = getObjectIdItems(fileSystem, fglibFilePath);
    
            List<String> objectIdItems = new ArrayList<>();
    
            for (String mrObjectId : mrObjectIdItems) {
                for (String fglibObjectId : fglibObjectIdItems) {
                    if (mrObjectId == fglibObjectId) {
                        objectIdItems.add(mrObjectId);
                    }
                }
            }
    
            String submitShPath = "/app/myaccount/service/submitsparkjob.sh";
    
            CountDownLatch threadSignal = new CountDownLatch(objectIdItems.size());
    
            for (int ii = 0; ii < objectIdItems.size(); ii++) {
                String objectId = objectIdItems.get(ii);
                Thread thread = new ImportThread(objectId, submitShPath, threadSignal);
                thread.start();
            }
    
            threadSignal.await();
    
            System.out.println(Thread.currentThread().getName() + "complete");
        }
    
        private static List<String> getObjectIdItems(FileSystem fileSystem, String filePath) throws FileNotFoundException, IOException {
            List<String> objectItems = new ArrayList<>();
    
            Path path = new Path(filePath);
            // 获取文件列表
            FileStatus[] files = fileSystem.listStatus(path);
            // 展示文件信息
            for (int i = 0; i < files.length; i++) {
                try {
                    if (files[i].isDirectory()) {
                        String[] fileItems = files[i].getPath().getName().split("/");
                        String objectId = fileItems[fileItems.length - 1].replace("objectid=", "");
                        objectItems.add(objectId);
                        System.out.println(objectId);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
    
            return objectItems;
        }
    
        /**
         * @param hdfs
         *            FileSystem 对象
         * @param path
         *            文件路径
         */
        public static void iteratorShowFiles(FileSystem hdfs, Path path) {
            try {
                if (hdfs == null || path == null) {
                    return;
                }
                
                // 获取文件列表
                FileStatus[] files = hdfs.listStatus(path);
    
                // 展示文件信息
                for (int i = 0; i < files.length; i++) {
                    try {
                        if (files[i].isDirectory()) {
                            System.out.print(">>>" + files[i].getPath() + ", dir owner:" + files[i].getOwner());
                            // 递归调用
                            iteratorShowFiles(hdfs, files[i].getPath());
                        } else if (files[i].isFile()) {
                            System.out.print(" " + files[i].getPath() + ",length:" + files[i].getLen() + ", owner:" + files[i].getOwner());
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    }

    并行执行sh的线程:

    import java.util.concurrent.CountDownLatch;
    
    public class ImportThread extends Thread {
        private final JavaShellInvoker javaShellInvoker = new JavaShellInvoker();
    
        private CountDownLatch countDownLatch;
        private String objectId;
        private String submitShPath;
    
        public ImportThread(String objectId, String submitShPath, CountDownLatch countDownLatch) {
            this.objectId = objectId;
            this.submitShPath = submitShPath;
            this.countDownLatch = countDownLatch;
        }
    
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + "start... " + this.submitShPath + " " + this.objectId.toString());// 打印开始标记
    
            try {
                int result = this.javaShellInvoker.executeShell("mrraster", this.submitShPath, this.objectId);
                if (result != 0) {
                    System.out.println(Thread.currentThread().getName() + " result type is error");
                }
            } catch (Exception e) {
                e.printStackTrace();
                System.out.println(Thread.currentThread().getName() + "-error:" + e.getMessage());
            }
    
            this.countDownLatch.countDown();// 计时器减1
            System.out.println(Thread.currentThread().getName() + " complete,last " + this.countDownLatch.getCount() + " threads");// 打印结束标记
        }
    }

    执行sh的java代码:

    import java.io.File;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    public class JavaShellInvoker {
        private static final String executeShellLogFile = "./executeShell_%s_%s.log";
    
        public int executeShell(String shellCommandType, String shellCommand, String args) throws Exception {
            int success = 0;
    
            args = (args == null) ? "" : args;
    
            String now = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
            File logFile = new File(String.format(executeShellLogFile, shellCommandType, now));
    
            ProcessBuilder pb = new ProcessBuilder("sh", shellCommand, args);
            pb.redirectOutput(ProcessBuilder.Redirect.appendTo(logFile));
            pb.redirectError(ProcessBuilder.Redirect.appendTo(logFile));
    
            Process pid = null;
    
            try {
                pid = pb.start();
                success = pid.waitFor();
            } catch (Exception ex) {
                success = 2;
                System.out.println("executeShell-error:" + ex.getMessage());
                throw ex;
            } finally {
                if (pid.isAlive()) {
                    success = pid.exitValue();
                    pid.destroy();
                }
            }
    
            return success;
        }
    }

    submitsparkjob.sh

    #!/bin/sh
    source ../login.sh
    spark-submit --master yarn-cluster --class MySparkJobMainClass --driver-class-path /app/myaccount/service/jars/ojdbc7.jar --jars /app/myaccount/service/jars/ojdbc7.jar --num-executors
     20 --driver-memory 6g --executor-cores 1 --executor-memory 8g MySparkJobJar.jar $1

     执行BatchSubmit.jar的命令:

    hadoop jar BatchSubmit.jar
  • 相关阅读:
    文本特征选择的关键算法总结
    偏置-方差分解(Bias-Variance Decomposition)
    排列木桩
    七夕鹊桥分析
    第五十七课、模型视图设计模式(下)------------------狄泰软件学院
    第五十六课、模型视图设计模式(中)------------------狄泰软件学院
    第五十五课、模型视图设计模式(上)------------------狄泰软件学院
    第八十五课、多线程与界面组件的通信(下)------------------狄泰软件学院
    第八十四课、多线程与界面组件的通信(上)------------------狄泰软件学院
    第八十三课、另一种创建线程的方式------------------狄泰软件学院
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/7816917.html
Copyright © 2011-2022 走看看