zoukankan      html  css  js  c++  java
  • hadoop hdfs 上传下载文件

    上传文件:

    package uploadfile;
    
    import java.io.*;
    import java.net.URI;
    import java.util.Date;
    
    import org.apache.hadoop.fs.*;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.conf.Configuration;
    
    public class UploadFile {
    
    	public static void main(String[] args) {
    		
    		try{
    			
    			long begin = System.currentTimeMillis();
    			System.out.println("*******************开始 时间 " + new Date() + "************************");
    			String localSrc = args[0];  //D://cloudra cdh4.txt
    			String dst=args[1];
    	
    			System.out.println(localSrc);
    			System.out.println(dst);
    			
    			InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
    			
    //			byte[] a=new byte[1024];
    //			if(in.read(a)!=-1){
    //				String as = new String(a);
    //				System.out.println(as);
    //			}
    			
    			Configuration conf = new Configuration();
    			FileSystem fs = FileSystem.get(URI.create(dst),conf);
    	
    			OutputStream out = fs.create(new Path(dst));
    	
    			IOUtils.copyBytes(in,out,4096,true);
    			
    			long end = System.currentTimeMillis();
    			long excutetime = end - begin ;
    			System.out.println("*******************结束 时间 " + new Date() + "************************");
    			
    			System.out.println();
    			System.out.println("====================");
    			System.out.println("消耗时间为:" + excutetime + " 毫秒");
    			System.out.println("          " + excutetime/1000 + "秒");
    			System.out.println("          " + excutetime/60000 + "分");
    			System.out.println("====================");
    			System.out.println();
    			
    		}catch(Exception e){
    			e.printStackTrace();
    		}
    	}
    
    }
    


    下载文件:

    package uploadfile;
      
    import java.io.BufferedOutputStream;
    import java.io.FileOutputStream;
    import java.io.OutputStream;
    import java.net.URI;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    
    /**  
     * *  
     * * 
     * *Description: 查看Hadoop文件系统中的文件,利用hadoop FileSystem接口中的FSDataInputStream  
     * * FSDataInputStream还具有流定位的能力,可以从文件的任意位置开始读取  
     * *  
     * * @author charles.wang  
     * * @created May 26, 2012 12:28:49 PM  
     * */ 
    public class DownloadFile {      
    	
    	/**      * @param args      */     
    	public static void main(String[] args) throws Exception{    
    		
    		//第一个参数传递进来的是Hadoop文件系统中的某个文件的URI,以hdfs://ip 的theme开头         
    		String uri = args[0];      
    		
    		//读取Hadoop文件系统的配置         
    		Configuration conf = new Configuration();         
    		conf.set("Hadoop.job.ugi", "hadoop-user,hadoop-user");    
    		
    		//FileSystem是用户操作HDFS的核心类,它获得URI对应的HDFS文件系统         
    		FileSystem fs = FileSystem.get(URI.create(uri),conf);         
    		FSDataInputStream in = null;    
    		
    		try{         
    			
    			//实验一:输出全部文件内容             
    			System.out.println("实验一:输出全部文件内容");     
    			
    			//让FileSystem打开一个uri对应的FSDataInputStream文件输入流,读取这个文件             
    			in = fs.open( new Path(uri) ); 
    			
    			//用Hadoop的IOUtils工具方法来让这个文件的指定字节复制到标准输出流上             
    			//IOUtils.copyBytes(in, System.out,50,false);   //输出到控制台上  
    			OutputStream out = new BufferedOutputStream(new FileOutputStream("D:\\aassdcc.txt"));
    			IOUtils.copyBytes(in, out,4096,true);
    			
    			System.out.println("Download success!!!");                  
    			
    //			//实验二:展示FSDataInputStream文件输入流的流定位能力,用seek进行定位             
    //			System.out.println("实验二:展示FSDataInputStream文件输入流的流定位能力,用seek进行定位");   
    //			
    //			//假如我们要吧文件输出3次             
    //			//第一次输入全部内容,第二次输入从第20个字符开始的内容,第3次输出从第40个字符开始的内容             
    //			for (int i=1;i<=3;i++){                 
    //				in.seek(0+20*(i-1));                 
    //				System.out.println("流定位第 "+i+" 次:" );                 
    //				IOUtils.copyBytes(in, System.out,4096,false);    
    //				
    //			}         
    		}catch(Exception e){
    			e.printStackTrace();
    		}finally{             
    			IOUtils.closeStream(in);        
    		}      
    	}  
    }
    	

    总结:

    这里采用的方法是通过 FsUrlStreamHandlerFactory 实例调用URL 中的setURLStreamHandlerFactory 方法。由于JAVA 虚拟机只能用一次上述方法,因此 通常在静态方法中调用上述方法。这个限制意味首如果程序的其他组件--如不受你控制的第三方组件--已经声明了一个URL实例,你将无法再使用上述方法从Hadoop 中读取数据。

    我们可以调用Hadoop 中简洁的IOUtils 类,并在finally子句中关闭数据流,同时也可以在输入流和输出流之间复制数据。copyBytes方法的最后两个参数,第一个用于设置复制的缓冲区大小,第二个用于设置复制结束后是否关闭数据流。

  • 相关阅读:
    pycharm 社区版运行flask app相关配置
    飞冰框架学习记录
    从上一次到现在总结2
    从上一次到今天的总结1
    mybatis 遇到空串无法判断
    Shell 脚本入门
    数据库批量插入数据
    Navicat for mysql 实现数据库自动备份
    自定义校验注解
    C++ 提高编程
  • 原文地址:https://www.cnblogs.com/java20130722/p/3206949.html
Copyright © 2011-2022 走看看