zoukankan      html  css  js  c++  java
  • java海量数据处理(千万级别)(2)海量数据FTP下载

    这个也是曾经做过的一个程序,目的主要是去ftp主机(最多100左右)去取xx数据文件.

     

                千万级别仅仅是个概念,代表数据量等于千万或者大于千万的数据

                本分享不牵扯分布式採集存储之类的.是在一台机器上处理数据,假设数据量非常大非常大的话,能够考虑分布式处理,假设以后我有这方面的经验,会及时分享的.

     

    1、程序採用的ftp工具, apache 的 commons-net-ftp-2.0.jar

    2、千万级别ftp核心关键的部分--列文件夹到文件,仅仅要是这块做好了,基本上性能就沒有太大的问题了.

      能够通过apache 发送ftp命令 "NLST" 的方式列文件夹到文件里去

      # ftp列文件夹运行的命令 以环境变量的配置优先,不配置则使用默认的列文件夹方式 NLST

    Java代码 
    1. # DS_LIST_CMD = NLST   
    2. public File sendCommandAndListToFile(String command,String localPathName) throws   IOException   
    3.       {  
    4.               try {  
    5.                       return client.createFile(command, localPathName);  
    6.               } catch (IOException e) {  
    7.                       log.error(e);  
    8.           throw new IOException("the command "+command +" is incorrect");  
    9.               }  
    10.       }  

                当然应该还有其它形式的,大家能够自己研究一下

     

                十万级别以上的数据量的话千万不要使用下面这样的方式,假设用的话 ==== 找死

                 FTPFile[] dirList = client.listFiles();

      

    3、分批次从文件里读取 要下载的文件名称.  载入到内存中处理,或者读取一个文件名称就下载一个文件,不要把全部的数据都载入到内存,假设非常多的话会出问题

       

                为啥要分批次?

                由于是大数据量,假设有1000W条记录,列出来的文件夹文件的大小 1G以上吧

     

     

    4、文件下载的核心代码----关于文件的断点续传, 获得ftp文件的大小和本地文件的大小进行推断,然后使用ftp提供的断点续传功能

                下载文件一定要使用二进制的形式

                client.enterLocalPassiveMode();// 设置为被动模式

                ftpclient.binary();  // 一定要使用二进制模式

     

       

       

    Java代码 
    1. /** 下载所需的文件并支持断点续传,下载后删除FTP文件,以免反复 
    2.          * @param pathName 远程文件 
    3.          * @param localPath 本地文件 
    4.          * @param registerFileName 记录本文件名称称文件夹 
    5.          * @param size 上传文件大小 
    6.          * @return true 下载及删除成功 
    7.          * @throws IOException  
    8.          * @throws Exception 
    9.          */  
    10.         public boolean downLoad(String pathName, String localPath) throws IOException {  
    11.                 boolean flag = false;  
    12.                 File file = new File(localPath+".tmp");//设置暂时文件  
    13.                 FileOutputStream out = null;  
    14.                 try{  
    15.                     client.enterLocalPassiveMode();// 设置为被动模式  
    16.                     client.setFileType(FTP.BINARY_FILE_TYPE);//设置为二进制传输  
    17.                         if(lff.getIsFileExists(file)){//推断本地文件是否存在,假设存在而且长度小于FTP文件的长度时断点续传;返之新增  
    18.                                 long size = this.getSize(pathName);  
    19.                                 long localFileSize = lff.getSize(file);          
    20.                                 if(localFileSize > size){  
    21.                                         return false;  
    22.                                 }  
    23.                                 out = new FileOutputStream(file,true);  
    24.                                 client.setRestartOffset(localFileSize);  
    25.                                 flag = client.retrieveFile(new String(pathName.getBytes(),client.getControlEncoding()),out);  
    26.                                   
    27.                                 out.flush();  
    28.                         } else{  
    29.                                 out = new FileOutputStream(file);  
    30.                                 flag = client.retrieveFile(new String(pathName.getBytes(),client.getControlEncoding()),out);  
    31.                                   
    32.                                 out.flush();  
    33.                         }  
    34.                           
    35.                 }catch(IOException e){  
    36.                         log.error(e);  
    37.             log.error("file download error !");  
    38.                         throw e;  
    39.                 }finally{  
    40.                         try{  
    41.                                 if(null!=out)  
    42.                                 out.close();  
    43.                                 if(flag)  
    44.                                         lff.rename(file, localPath);  
    45.                         }catch(IOException e){  
    46.                                 throw e;  
    47.                         }  
    48.                 }  
    49.                 return flag;  
    50.         }  
    51.     /** 
    52.          * 获取文件长度 
    53.          * @param fileNamepath 本机文件 
    54.          * @return 
    55.          * @throws IOException 
    56.          */  
    57.         public long getSize(String fileNamepath) throws IOException{  
    58.                 FTPFile [] ftp = client.listFiles(new String(fileNamepath.getBytes(),client.getControlEncoding()));  
    59.                 return ftp.length==0 ? 0 : ftp[0].getSize();  
    60.         }  
    61.   
    62.         检測本地文件是否已经下载,假设下载文件的大小.  
    63.   
    64.        /** 
    65.          *本地文件的 获取文件的大小 
    66.          * @param file 
    67.          * @return 
    68.          */  
    69.         public long getSize(File file){  
    70.                 long size = 0;  
    71.                 if(getIsFileExists(file)){  
    72.                         size = file.length();  
    73.                 }  
    74.                 return size;  
    75.         }  

     

    5、由于程序要跑最多100多个线程,在线程监控上做了一些处理,能够检測那些死掉的线程,并及时的拉起来。

                t.setUncaughtExceptionHandler(new ThreadException(exList));

    原理:给每一个线程增加 UncaughtExceptionHandler,死掉的时候把线程相应的信息添加到一个list里面,然后让主线程每隔一段时间扫描一下list,假设有数据,直接又一次建一个线程运行就可以

     

    6、假设程序是常驻内存的话,别忘记了在finally中关闭掉 不用的ftp连接

     

    7、做大数据库採集程序必须考虑到的一件事情   磁盘空间已满的处理

           java虚拟机对于磁盘空间已满,在英文环境下的  linux aix 机器上 一般报

           There is not enough space in the file system

           中文环境下 一般报 "磁盘空间已满"

           大家能够使用以下的代码进行验证

           

                             

    Java代码 
    1. //检測磁盘控件是否已满的异常  
    Java代码 
    1. //linux aix There is not enough space in the file system  
    2.                      // window There is not enough space in the file system  
    3.                      if(e.toString().contains("enough space")||e.toString().contains("磁盘空间已满"))  
    4.                      {  
    5.                              log.error("channel "+channel_name + " There is not enough space on the disk ");  
    6.                              Runtime.getRuntime().exit(0);  
    7.                      }  

     

                <千万级别数据生成xml文件>  

                      原创文章 by dyllove98 @ http://jlins.iteye.com

                     原创文章 by dyllove98 @ http://blog.csdn.net/dyllove98

                            转载请表明出处。

  • 相关阅读:
    React Native 四:图片
    hadoop集群ambari搭建(1)之ambari-server安装
    OpenGL核心之视差映射
    大数据分析:结合 Hadoop或 Elastic MapReduce使用 Hunk
    在Android实现client授权
    HDU 5072 Coprime (单色三角形+容斥原理)
    mmtests使用简介
    win7 以管理员身份运行cmd, windows services 的创建和删除
    php使用curl设置超时的重要性
    window下查看端口命令
  • 原文地址:https://www.cnblogs.com/mfryf/p/3138708.html
Copyright © 2011-2022 走看看