zoukankan      html  css  js  c++  java
  • hdfs文件按修改时间下载

    应用于:对于不同用户创建的表目录,进行文件的下载,程序中执行hadoop cat命令 下载文件到本地,随后通过ftp传至目标服务器,并将hdfs文件目录的修改时间存入mysql中。每次修改前将mysql中记录的数据,与本批次下载的HDFS文件路径修改时间对比,如果改变,则决定是否下载文件:

    入口:

     1 package edm.spark.download.edm.spark.download;
     2 
     3 import java.io.BufferedReader;
     4 import java.io.InputStreamReader;
     5 import java.util.Date;
     6 import java.util.List;
     7 import org.apache.hadoop.fs.Path;
     8 
     9 import edm.spark.download.edm.spark.util.HdfsFileProcessor;
    10 import edm.spark.download.edm.spark.util.JdbcDirectUtils;
    11 
    12 public class FileDownload {
    13 
    14     public static void main(String[] args) throws Exception {
    15         String local_path = args[0];//"/home/hdfs/ysy/";
    16         String hdfs_path = args[1];//"hdfs://hdp/user/";
    17         ;
    18         HdfsFileProcessor fileProcessor = new HdfsFileProcessor();
    19         List<String> userLists = fileProcessor.getUserUnderFolder(hdfs_path);
    20         List<Path> listPath = fileProcessor.getFileUnderFolder(userLists);
    21         if (null != listPath && listPath.size() > 0) {
    22             for (Path path : listPath) {
    23                 String pathName = path.toString();
    24                 String[] nameList = pathName.split("/");
    25                 String time = JdbcDirectUtils.DateTimeFormat(new Date());
    26                 String tableName = nameList[nameList.length - 1] + "_" + time
    27                         + ".txt";
    28                 String userName = nameList[nameList.length - 3];
    29                 Process ps = null;
    30                 try {
    31                     // 提交本地进程
    32                     ps = Runtime.getRuntime().exec(
    33                             local_path + "download.sh " + pathName + " "
    34                                     + tableName + " " + userName);
    35                     System.out.println(local_path + "download.sh " + pathName
    36                             + " " + tableName);
    37                     // 更新mysql中记录的时间
    38                     JdbcDirectUtils jdbcForTime = new JdbcDirectUtils();
    39                     long dateTime = jdbcForTime
    40                             .queryDate("select modify_time,path from download_time where path="
    41                                     + "'" + path.toString() + "'");
    42                     long insertTime = fileProcessor.getModifycationTime(path);
    43                     if (dateTime != 0) {
    44                         jdbcForTime.updateDateTime(insertTime, pathName);
    45                     } else {
    46                         // 第一次插入写入当前文件目录时间
    47                         jdbcForTime.insertDate(insertTime, path.toString());
    48                     }
    49                     jdbcForTime.destroy();
    50                 } catch (Exception e) {
    51                     e.printStackTrace();
    52                 }
    53                 BufferedReader br = new BufferedReader(new InputStreamReader(
    54                         ps.getInputStream()));
    55                 String line;
    56                 StringBuffer sb = new StringBuffer();
    57                 while ((line = br.readLine()) != null) {
    58                     sb.append(line).append("
    ");
    59                 }
    60                 String result = sb.toString();
    61                 System.out.println(result);
    62                 ps.destroy();
    63             }
    64         } else {
    65             System.out.println("no file to download");
    66 
    67         }
    68         // submit download cmd
    69     }
    70 }

    HdfsFileProcessor:

      1 package edm.spark.download.edm.spark.util;
      2 
      3 import java.io.IOException;
      4 import java.sql.SQLException;
      5 import java.util.List;
      6 
      7 import org.apache.hadoop.conf.Configuration;
      8 import org.apache.hadoop.fs.FileStatus;
      9 import org.apache.hadoop.fs.FileSystem;
     10 import org.apache.hadoop.fs.Path;
     11 import org.apache.hadoop.ipc.RemoteException;
     12 import org.apache.hadoop.security.AccessControlException;
     13 import org.slf4j.Logger;
     14 import org.slf4j.LoggerFactory;
     15 
     16 import com.google.common.collect.Lists;
     17 
     18 public class HdfsFileProcessor {
     19     
     20     static final Logger logger = LoggerFactory.getLogger(HdfsFileProcessor.class);
     21     
     22     protected FileSystem fileSystem;
     23     
     24     private Configuration conf;
     25     
     26     public HdfsFileProcessor(){
     27         init();
     28     }
     29     
     30     public void init(){
     31         conf = new Configuration();
     32         conf.addResource("resources/hdfs-site.xml");
     33         conf.addResource("resources/core-site.xml");
     34         try {
     35             fileSystem = FileSystem.get(conf);
     36         } catch (IOException e) {
     37             logger.error("init error.......",e);
     38             e.printStackTrace();
     39         }
     40     }
     41     
     42     public final boolean checkFile(String filePath){
     43         boolean exists = false;
     44         try{
     45             Path path = new Path(filePath);
     46             exists = fileSystem.exists(path);
     47         }catch(IOException e){
     48             logger.error("",e);
     49         }catch(Exception e){
     50             logger.error("",e);
     51         }
     52         return exists;
     53     }
     54     
     55     public List<Path> getFileUnderFolder(List<String> names) throws IOException, SQLException{
     56         JdbcDirectUtils jdbcForTime = new JdbcDirectUtils();
     57         List<Path> paths = Lists.newArrayList();
     58         for(String name : names){
     59             Path folderPath = new Path("hdfs://hdp/user/" + name +"/");
     60             if(fileSystem.exists(folderPath)){
     61                 try{
     62                     FileStatus[] fileStatus = fileSystem.listStatus(folderPath);
     63                     for(int i = 0; i< fileStatus.length;i++){
     64                         FileStatus fileStatu = fileStatus[i];
     65                         Path path = fileStatu.getPath();
     66                         if(path.toString().contains("tosas")){
     67                             FileStatus[] tableStatus = fileSystem.listStatus(path);
     68                             for(int j = 0; j < tableStatus.length;j++){
     69                                 FileStatus tableStatu = tableStatus[i];
     70                                 Path tablePath = tableStatu.getPath();
     71                                 long modifycationTime = fileSystem.getFileStatus(tablePath).getModificationTime();
     72                                 long dataTime = jdbcForTime.queryDate("select modify_time,path from download_time where path="
     73                                         +"'"
     74                                         +tablePath.toString()
     75                                         +"'");
     76                                 if(modifycationTime > dataTime){
     77                                     paths.add(tablePath);
     78                                 }
     79                             }
     80                         }
     81                     }
     82                 }catch(RemoteException e){
     83                     logger.error("",e);
     84                 }catch(AccessControlException e){
     85                     logger.error("",e);
     86                 }
     87             }
     88         }
     89         
     90         return paths;
     91     }
     92     
     93     /**
     94      * 查找文件目录属于哪个用户
     95      * @param path
     96      * @return
     97      * @throws IOException
     98      */
     99     public long getModifycationTime(Path path) throws IOException{
    100         long modifycationTime = fileSystem.getFileStatus(path).getModificationTime();
    101         return modifycationTime;
    102     }
    103     
    104     public List<String> getUserUnderFolder(String Path) throws Exception{
    105         List<String> userList = Lists.newArrayList();
    106         Path userPath = new Path(Path);
    107         if(fileSystem.exists(userPath)){
    108             FileStatus[] fileStatus = fileSystem.listStatus(userPath);
    109             for(int i = 0 ;i< fileStatus.length;i++){
    110                 FileStatus fileStatu = fileStatus[i];
    111                 String path = fileStatu.getPath().toString();
    112                 String pathes[] = path.split("/");
    113                 if(pathes.length > 4){
    114                     userList.add(pathes[4]);
    115                 }
    116             }
    117         }
    118         return userList;
    119         
    120     }
    121     
    122     public void destory() throws IOException{
    123         if(fileSystem != null){
    124             fileSystem.close();
    125         }
    126         fileSystem = null;
    127     }
    128 }

    JdbcDirectUtils:

      1 package edm.spark.download.edm.spark.util;
      2 
      3 import java.io.IOException;
      4 import java.sql.DriverManager;
      5 import java.sql.ResultSet;
      6 import java.sql.SQLException;
      7 import java.text.SimpleDateFormat;
      8 import java.util.Date;
      9 import java.util.Map;
     10 
     11 import com.google.common.collect.Maps;
     12 import com.mysql.jdbc.Connection;
     13 import com.mysql.jdbc.Statement;
     14 
     15 public class JdbcDirectUtils {
     16     
     17     private static Connection conn ;
     18     
     19     private Statement stmt;
     20     
     21     private String file_dir = "/template/download_mysql.txt";
     22     
     23     private Map<String,String> jdbcConfMap = Maps.newHashMap();
     24     
     25     private LoadHdfsConf mysqlConf;
     26     
     27     public JdbcDirectUtils(){
     28         initDriver();
     29     }
     30     
     31     public void initDriver(){
     32         try{
     33             if(conn == null){
     34                 mysqlConf = new LoadHdfsConf();
     35                 jdbcConfMap = mysqlConf.readHdfsFile(file_dir);
     36                 Class.forName("com.mysql.jdbc.Driver");
     37                 String url = "jdbc:mysql://" + jdbcConfMap.get("url") + ":"
     38                         + jdbcConfMap.get("port") + "/"
     39                         + jdbcConfMap.get("schema") + "?user="
     40                         + jdbcConfMap.get("user") + "@password="
     41                         + jdbcConfMap.get("password")
     42                         + "&useUnicode=true&characterEncoding="
     43                         + jdbcConfMap.get("characterEncoding");
     44                 conn = (Connection) DriverManager.getConnection(url);
     45                 
     46             }
     47         }catch(ClassNotFoundException e){
     48             e.printStackTrace();
     49         }catch(IOException e){
     50             e.printStackTrace();
     51         }catch(SQLException e){
     52             e.printStackTrace();
     53         }
     54     }
     55     
     56     /**
     57      * 查询最新更新记录
     58      * @param date
     59      * @param path
     60      * @throws SQLException
     61      */
     62     public void updateDateTime(long date,String path) throws SQLException{
     63         stmt.executeUpdate("update download_time set modify_time=" + date + "where path="+"'" + path + "'");
     64     }
     65     
     66     public long queryDate(String sql) throws SQLException{
     67         ResultSet rs = stmt.executeQuery(sql);
     68         long dateTime = 0;
     69         while(rs.next()){
     70             dateTime = rs.getLong("modify_time");
     71         }
     72         return dateTime;
     73     }
     74     
     75     public void insertDate(Long date,String path) throws SQLException{
     76         stmt.executeUpdate("insert into download_time(path,modify_time) values " + "('" + path + "'" + "," + date + ")");
     77     }
     78     
     79     /**
     80      * String格式转Long
     81      * @param date
     82      * @return
     83      */
     84     public long convert2Long(String date){
     85         long time = 0;
     86         String format = "yyyyMMdd";
     87         SimpleDateFormat sf = new SimpleDateFormat(format);
     88         try{
     89             time = sf.parse(date).getTime();
     90         }catch(java.text.ParseException e){
     91             e.printStackTrace();
     92         }
     93         return time;
     94     }
     95     
     96     public static String DateTimeFormat(Date date){
     97         SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
     98         String time = sdf.format(date);
     99         return time;
    100     }
    101     
    102     public void destroy() throws SQLException{
    103         if(conn != null){
    104             conn.close();
    105         }
    106         conn = null;
    107     }
    108 }

    LoadHdfsConf:

    package edm.spark.download.edm.spark.util;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.util.List;
    import java.util.Map;
    
    import org.apache.commons.io.IOUtils;
    import org.apache.commons.lang3.StringUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.google.common.collect.Maps;
    
    public class LoadHdfsConf {
        static final Logger logger = LoggerFactory.getLogger(LoadHdfsConf.class);
        
        protected FileSystem fileSystem;
        
        public final boolean checkFile(String filePath){
            boolean exists = false;
            try{
                Path path = new Path(filePath);
                exists = fileSystem.equals(path);
            }catch(Exception e){
                logger.error("",e);
            }
            return exists;
        }
        
        public Map<String,String> readHdfsFile(String hdfsPath) throws IOException{
            Configuration conf = new Configuration();
            conf.addResource("resources/hdfs-site.xml");
            conf.addResource("resources/core-site.xml");
            fileSystem = FileSystem.get(conf);
            Path path = new Path(hdfsPath);
            InputStream in = fileSystem.open(path);
            List<String> lines = IOUtils.readLines(in);
            if(null == lines || lines.isEmpty()){
                return null;
            }
            Map<String,String> map = Maps.newHashMap();
            int rowNum = 0;
            for(String line : lines){
                rowNum++;
                String[] content = line.split("=");
                String code = content[0];
                String value = content[1];
                if(StringUtils.isEmpty(line) || StringUtils.isEmpty(value)){
                    logger.error("{}",rowNum,line);
                    continue;
                }
                map.put(code, value);
            }
            return map;
        }
        
        
    }
  • 相关阅读:
    JavaSE 基础 第54节 继承Thread类创建线程
    flask 项目部分业务逻辑
    js生成随机的uuid
    增量式爬虫
    分布式爬虫
    scrapy 的分页爬取 CrawlSpider
    scrapy 请求传参
    scrapy增加爬取效率
    scrapy框架
    模拟登陆request-session
  • 原文地址:https://www.cnblogs.com/yangsy0915/p/6510586.html
Copyright © 2011-2022 走看看