zoukankan      html  css  js  c++  java
  • day07 eclipse使用本地 库文件 访问HDFS

    常用命令 

    1. hdfs dfsadmin -report   查看系统的各台机器状态

     HDFS的概念和特性
    首先,它是一个文件系统,用于存储文件,通过统一的命名空间——目录树来定位文件
    
    其次,它是分布式的,由很多服务器联合起来实现其功能,集群中的服务器有各自的角色;
    
    重要特性如下:
    (1)HDFS中的文件在物理上是分块存储(block),块的大小可以通过配置参数( dfs.blocksize)来规定,默认大小在hadoop2.x版本中是128M,老版本中是64M
    
    (2)HDFS文件系统会给客户端提供一个统一的抽象目录树,客户端通过路径来访问文件,形如:hdfs://namenode:port/dir-a/dir-b/dir-c/file.data
    3)目录结构及文件分块信息(元数据)的管理由namenode节点承担
    ——namenode是HDFS集群主节点,负责维护整个hdfs文件系统的目录树,以及每一个路径(文件)所对应的block块信息(block的id,及所在的datanode服务器)
    
    
    (4)文件的各个block的存储管理由datanode节点承担
    ---- datanode是HDFS集群从节点,每一个block都可以在多个datanode上存储多个副本(副本数量也可以通过参数设置dfs.replication)
    
    (5)HDFS是设计成适应一次写入,多次读出的场景,且不支持文件的修改

    1.  把haddop文件放在某I个位置用来引用

    2.

    3.

    4.进行jar包的添加

      其中把所有的common lib添加,然后添加

     接着把hdfs的所有lib添加  和  添加

    ------------------------------------------在Windows系统端配置HDFS环境------------------

    PS:因为系统的不同平台问题,  首先把  windos系统编译好的bin和lib文件替换掉,然后再到系统环境变量 里配置HADOOP_HOME环境变量,可以测试代码
    package cn.itcast.bigdata.hdfs;
    
    import java.io.IOException;
    import java.net.URI;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.junit.Before;
    import org.junit.Test;
    /**
     * 
     * 客户端去操作hdfs时,是有一个用户身份的
     * 默认情况下,hdfs客户端api会从jvm中获取一个参数来作为自己的用户身份:-DHADOOP_USER_NAME=hadoop
     * 
     * 也可以在构造客户端fs对象时,通过参数传递进去
     * @author
     *
     */
    public class Study {
        FileSystem fs = null;
        Configuration conf = null;
        @Before
        public void init() throws Exception{
            conf = new Configuration();
            //拿到一个文件系统的客户端实例
        //    fs = FileSystem.get(conf );
            conf.set("fs.defaultFS", "hdfs://192.168.8.10:9000");    //其实这个ip就是我的bee1
            
            //拿到一个文件系统操作的客户端实例对象
            /*fs = FileSystem.get(conf);*/
            //可以直接传入 uri和用户身份
            fs = FileSystem.get(new URI("hdfs://192.168.8.10:9000"),conf,"root"); //最后一个参数为用户名
        }
        
        @Test
        public void testUpload() throws Exception, IOException{
            Thread.sleep(2000);
            fs.copyFromLocalFile(false,new Path("D:/contact.xml"), new Path("/contact.xml.copy"));//这个false,是因为使用了win10系统
            fs.close();
        }
    }

    ******HDFS原理篇******

    4. hdfs的工作机制

    1.         HDFS集群分为两大角色:NameNodeDataNode  (Secondary Namenode)

    2.         NameNode负责管理整个文件系统的元数据(文件放置的位置,和切分大小)

    3.         DataNode 负责管理用户的文件数据块

    4.         文件会按照固定的大小(blocksize)切成若干块后分布式存储在若干台datanode

    5.         每一个文件块可以有多个副本,并存放在不同的datanode

    6.         Datanode定期Namenode汇报自身所保存的文件block信息,而namenode则会负责保持文件的副本数量

    7.         HDFS的内部工作机制对客户端保持透明,客户端请求访问HDFS都是通过向namenode申请来进行

    -

     

     HDFS的概念和特性

    首先,它是一个文件系统,用于存储文件,通过统一的命名空间——目录树来定位文件

     

    其次,它是分布式的,由很多服务器联合起来实现其功能,集群中的服务器有各自的角色;

     

    重要特性如下:

    (1)HDFS中的文件在物理上是分块存储(block,块的大小可以通过配置参数( dfs.blocksize)来规定,默认大小在hadoop2.x版本中是128M,老版本中是64M

    (2)HDFS文件系统会给客户端提供一个统一的抽象目录树,客户端通过路径来访问文件,形如:hdfs://namenode:port/dir-a/dir-b/dir-c/file.data

    (3)目录结构及文件分块信息(元数据)的管理由namenode节点承担

    ——namenodeHDFS集群主节点,负责维护整个hdfs文件系统的目录树,以及每一个路径(文件)所对应的block块信息(blockid,及所在的datanode服务器)

    (4)文件的各个block的存储管理由datanode节点承担

    ---- datanodeHDFS集群从节点,每一个block都可以在多个datanode上存储多个副本(副本数量也可以通过参数设置dfs.replication

    (5)HDFS是设计成适应一次写入,多次读出的场景,且不支持文件的修改

    -----------------

    4.2 HDFS写数据流程

     

    4.2.1 概述

     

    客户端要向HDFS写数据,首先要跟namenode通信以确认可以写文件并获得接收文件block的datanode,然后,客户端按顺序将文件逐个block传递给相应datanode,并由接收到block的datanode负责向其他datanode复制block的副本

     

    4.2.2 详细步骤图

    4.2.3 详细步骤解析

    PS:比如说要上次一个300M的电影。 
    1.想namenode请求说,你要上传的文件与路径。 namenode查找过以后说ok,然后就可以响应上传了
    2.请求第一个block,请返回打ndatanode,返回相应相应的设备(这里选择的选择策略还是比较复杂的,总之是为了数据的安全性)
    3.上面都好了,就可以传输了,他们会建好管道线,以packet传(每一个64k),中间有读写的缓冲层。

    1、根namenode通信请求上传文件,namenode检查目标文件是否已存在,父目录是否存在

    2、namenode返回是否可以上传

    3、client请求第一个 block该传输到哪些datanode服务器上

    4、namenode返回3个datanode服务器ABC

    5、client请求3台dn中的一台A上传数据(本质上是一个RPC调用,建立pipeline),A收到请求会继续调用B,然后B调用C,将真个pipeline建立完成,逐级返回客户端

    6、client开始往A上传第一个block(先从磁盘读取数据放到一个本地内存缓存),以packet为单位,A收到一个packet就会传给B,B传给C;A每传一个packet会放入一个应答队列等待应答

    7、当一个block传输完成之后,client再次请求namenode上传第二个block的服务器。

    --------------------

     

    4.3. HDFS读数据流程

     

    4.3.1 概述

     

    客户端将要读取的文件路径发送给namenode,namenode获取文件的元信息(主要是block的存放位置信息)返回给客户端,客户端根据返回的信息找到相应datanode逐个获取文件的block并在客户端本地进行数据追加合并从而获得整个文件

     

    4.3.2 详细步骤图

     

     

    PS:客户端请求文件时,namenode会根据元数据进行查找  block的位置然后返回给客户端,然后客户端就可以根据文件的位置进行下载了。

    5. NAMENODE工作机制-------主要看/home/hadoop/hdpdata1文件

    5.1 NAMENODE职责

    NAMENODE职责:

    1 .   负责客户端请求的响应

    2 .  元数据的管理(查询,修改)

    5.2 元数据管理

    namenode对数据的管理采用了三种存储形式

    1.内存元数据(NameSystem)

    2.磁盘元数据镜像文件

    3.数据操作日志文件(可通过日志运算出元数据)

    5.2.1 元数据存储机制

    A、内存中有一份完整的元数据(内存meta data) 序列化

    B、磁盘有一个“准完整”的元数据镜像(fsimage)文件(在namenode的工作目录中)

    C、用于衔接内存metadata和持久化元数据镜像fsimage之间的操作日志(edits文件注:当客户端对hdfs中的文件进行新增或者修改操作,操作记录首先被记入edits日志文件中,当客户端操作成功后,相应的元数据会更新到内存meta.data

    注:当客户端对hdfs中的文件进行新增或者修改操作,操作记录首先被记入edits日志文件中,当客户端操作成功后,相应的元数据会更新到内存meta.data

    5.2.2 元数据手动查看

    可以通过hdfs的一个工具来查看edits中的信息

    bin/hdfs oev -i edits -o edits.xml
    bin/hdfs oiv -i fsimage_0000000000000000087 -p XML -o fsimage.xml

    5.2.3 元数据的checkpoint

           每隔一段时间,会由secondary namenode将namenode上积累的所有edits和一个最新的fsimage下载到本地,

    并加载到内存进行merge(这个过程称为checkpoint)

    PS: namenode是怎样工作的呢? 看上图,每当有新的文件上传的时候,namenode都会查询和返回,这件放置的位置都放在fsimage这个镜像文件中,但不是实时的。 
    namenode会把正在执行的操作,放入edits这个日志文件中。为了安全备份考虑,secondary namenode 它为 镜像文件和日志记录重新生成新的镜像,来跟新以前的镜像。
    namenode丢失的话,可以从secondary namenode找回来。但是找到也是之前的数据,终有一些是丢失的,所以要在多台设备上配置dfs.name.dir,保证数据的不丢失
    PS:可以看到namenode有128g大小 , hdfs适合放置大文件。
    --------------------------------------------------------------------------
    PS : hdpdata是防止 元数据的位置,如下图
    tree hdpdata

       dfs.namenode.name.dir属性可以配置多个目录,

    如/data1/dfs/name,/data2/dfs/name,/data3/dfs/name,....。各个目录存储的文件结构和内容都完全一样,相当于备份,这样做的好处是当其中一个目录损坏了,也不会影响到Hadoop的元数据,特别是当其中一个目录是NFS(网络文件系统Network File System,NFS)之上,即使你这台机器损坏了,元数据也得到保存。
    下面对$dfs.namenode.name.dir/current/目录下的文件进行解释。


    1、VERSION文件是Java属性文件,内容大致如下:


    #Fri Nov 15 19:47:46 CST 2013
    namespaceID=934548976
    clusterID=CID-cdff7d73-93cd-4783-9399-0a22e6dce196
    cTime=0
    storageType=NAME_NODE
    blockpoolID=BP-893790215-192.168.24.72-1383809616115
    layoutVersion=-47


    其中
      (1)namespaceID是文件系统的唯一标识符,在文件系统首次格式化之后生成的;
      (2)storageType说明这个文件存储的是什么进程的数据结构信息(如果是DataNode,storageType=DATA_NODE);
      (3)cTime表示NameNode存储时间的创建时间,由于我的NameNode没有更新过,所以这里的记录值为0,以后对NameNode升级之后,cTime将会记录更新时间戳;
      (4)ayoutVersion表示HDFS永久性数据结构的版本信息, 只要数据结构变更,版本号也要递减,此时的HDFS也需要升级,否则磁盘仍旧是使用旧版本的数据结构,这会导致新版本的NameNode无法使用;
      (5)clusterID是系统生成或手动指定的集群ID,在-clusterid选项中可以使用它;如下说明

                    a、使用如下命令格式化一个Namenode:

                    $HADOOP_HOME/bin/hdfs namenode -format [-clusterId <cluster_id>]

                    选择一个唯一的cluster_id,并且这个cluster_id不能与环境中其他集群有冲突。如果没有提供cluster_id,则会自动生成一个唯一的ClusterID。

                     b、使用如下命令格式化其他Namenode:

                     $HADOOP_HOME/bin/hdfs namenode -format -clusterId <cluster_id>

                     c、升级集群至最新版本。在升级过程中需要提供一个ClusterID,例如:

                     $HADOOP_PREFIX_HOME/bin/hdfs start namenode --config $HADOOP_CONF_DIR  -upgrade -clusterId <cluster_ID>

                     如果没有提供ClusterID,则会自动生成一个ClusterID。

      (6)blockpoolID:是针对每一个Namespace所对应的blockpool的ID,上面的这个BP-893790215-192.168.24.72-1383809616115就是在我的ns1的namespace下的存储块池的ID,这个ID包括了其对应的NameNode节点的ip地址。
      
    2、$dfs.namenode.name.dir/current/seen_txid非常重要,是存放transactionId的文件,format之后是0,它代表的是namenode里面的edits_*文件的尾数,namenode重启的时候,会按照seen_txid的数字,循序从头跑edits_0000001~到seen_txid的数字。所以当你的hdfs发生异常重启的时候,一定要比对seen_txid内的数字是不是你edits最后的尾数,不然会发生建置namenode时metaData的资料有缺少,导致误删Datanode上多余Block的资讯。

    3、$dfs.namenode.name.dir/current目录下在format的同时也会生成fsimage和edits文件,及其对应的md5校验文件。

     

    补充:seen_txid

    文件中记录的是edits滚动的序号,每次重启namenode时,namenode就知道要将哪些edits进行加载edits

      

    6. DATANODE的工作机制

    6.1 概述

    1、Datanode工作职责:

          a. 存储管理用户的文件块数据

           b.定期向namenode汇报自身所持有的block信息(通过心跳信息上报)

    (这点很重要,因为,当集群中发生某些block副本失效时,集群如何恢复block初始副本数量的问题)

    <property>

             <name>dfs.blockreport.intervalMsec</name>

             <value>3600000</value>  /**36000向namenode汇报一次信息*/

             <description>Determines block reporting interval in milliseconds.</description>

    </property>

    2、Datanode掉线判断时限参数

            datanode进程死亡或者网络故障造成datanode无法与namenode通信,namenode不会立即把该节点判定为死亡,要经过一段时间,这段时间暂称作超时时长。HDFS默认的超时时长为10分钟+30秒。如果定义超时时间为timeout,则超时时长的计算公式为:

             timeout  = 2 * heartbeat.recheck.interval + 10 * dfs.heartbeat.interval。

             而默认的heartbeat.recheck.interval 大小为5分钟,dfs.heartbeat.interval默认为3秒。

             需要注意的是hdfs-site.xml 配置文件中的heartbeat.recheck.interval的单位为毫秒,dfs.heartbeat.interval的单位为秒。所以,举个例子,如果heartbeat.recheck.interval设置为5000(毫秒),dfs.heartbeat.interval设置为3(秒,默认),则总的超时时间为40秒。

    /*内部的配置如图*/

    <property>

            <name>heartbeat.recheck.interval</name>

            <value>2000</value>

    </property>

    <property>

            <name>dfs.heartbeat.interval</name>

            <value>1</value>

    </property>

    6.2 观察验证DATANODE功能-----复习时可以忽略

    上传一个文件,观察文件的block具体的物理存放情况:

    在每一台datanode机器上的这个目录中能找到文件的切块:

    /home/hadoop/app/hadoop-2.4.1/tmp/dfs/data/current/BP-193442119-192.168.2.120-1432457

    ---------------------------HDFS访问API----------------------------------------------------------------------------------------------------

    package cn.itcast.bigdata.hdfs;
    
    import java.io.FileInputStream;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import java.net.URI;
    import java.util.Iterator;
    import java.util.Map.Entry;
    
    import org.apache.commons.io.IOUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.BlockLocation;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.LocatedFileStatus;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.fs.RemoteIterator;
    import org.junit.Before;
    import org.junit.Test;
    /**
     * 
     * 客户端去操作hdfs时,是有一个用户身份的
     * 默认情况下,hdfs客户端api会从jvm中获取一个参数来作为自己的用户身份:-DHADOOP_USER_NAME=root
     * 
     * 也可以在构造客户端fs对象时,通过参数传递进去
     * @author
     *
     */
    public class Study {
        FileSystem fs = null;
        Configuration conf = null;
        @Before
        public void init() throws Exception{
            conf = new Configuration();
            //拿到一个文件系统的客户端实例
        //    fs = FileSystem.get(conf );
            conf.set("fs.defaultFS", "hdfs://192.168.8.10:9000");
            
            //拿到一个文件系统操作的客户端实例对象
            /*fs = FileSystem.get(conf);*/
            //可以直接传入 uri和用户身份
            fs = FileSystem.get(new URI("hdfs://192.168.8.10:9000"),conf,"root"); //最后一个参数为用户名
        }
        /**
         * 上传文件
         * @throws Exception
         * @throws IOException
         */
        @Test
        public void testUpload() throws Exception, IOException{
            Thread.sleep(2000);
            fs.copyFromLocalFile(false,new Path("D:/contact.xml"), new Path("/contact.xml.copy"));//这个false,是因为使用了win10系统
            fs.close();
        }
        /**
         * 下载文件
         * @throws Exception
         */
        @Test
        public void testDownload() throws Exception {
            
            fs.copyToLocalFile(false,new Path("/contact.xml.copy"), new Path("d:/"));
            fs.close();
        }
        /**
         * 测试conf的参数,输出的都是  配置的信息
         */
        @Test
        public void testConf(){
            Iterator<Entry<String, String>> it = conf.iterator();
            while(it.hasNext()){
                Entry<String, String> entry = it.next();
                System.out.println(entry.getKey()+":"+entry.getValue());
            }
        }
        /***
         * 创建文件
         * @throws Exception
         */
        @Test
        public void testMkdir() throws Exception {
            boolean flag = fs.mkdirs(new Path("/testmkdir/aaa/bbb"));
            System.out.println(flag);
        }
        
        @Test
        public void testDelete() throws Exception {
            boolean flag = fs.delete(new Path("/testmkdir/aaa"), true);
            System.out.println(flag);
        }
        
        /**
         * 递归列出指定目录下所有子文件夹中的文件
         * @throws Exception
         */
        @Test
        public void testLs() throws Exception {
            RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);//true是否迭代循环
            while(listFiles.hasNext()){
                LocatedFileStatus fileStatus = listFiles.next();//文件状态
                System.out.println("blocksize: " +fileStatus.getBlockSize());//
                System.out.println("owner: " +fileStatus.getOwner());
                System.out.println("Replication: " +fileStatus.getReplication());
                System.out.println("Permission: " +fileStatus.getPermission());
                System.out.println("Name: " +fileStatus.getPath().getName());
                System.out.println("------------------");
                BlockLocation[] blockLocations = fileStatus.getBlockLocations();
                for(BlockLocation b:blockLocations){
                    System.out.println("块起始偏移量: " +b.getOffset());
                    System.out.println("块长度:" + b.getLength());
                    //块所在的datanode节点
                    String[] datanodes = b.getHosts();
                    for(String dn:datanodes){
                    System.out.println("datanode:" + dn);
                    }
                }
            }
        }
        /**
         * 手动遍历
         * @throws Exception
         */
        @Test
        public void testLs2() throws Exception {//手动遍历,一层一层遍历
            
            FileStatus[] listStatus = fs.listStatus(new Path("/"));
            for(FileStatus file :listStatus){
                
                System.out.println("name: " + file.getPath().getName());
                System.out.println((file.isFile()?"file":"directory"));
                
            }
            
        }
        
    ----------------------------通过流的方式访问hdfs
    /**
    * 用流的方式来操作hdfs上的文件
    * 可以实现读取指定偏移量范围的数据
    * @author
    *
    */-------------------------------------------------------下面是另一个文件了
    /** * 通过流的方式上传文件到hdfs * @throws Exception */ @Test public void testUpload1() throws Exception { FSDataOutputStream outputStream = fs.create(new Path("/angelababy.love"), true);//true是否重写 FileInputStream inputStream = new FileInputStream("d:/angelababy.love"); IOUtils.copy(inputStream, outputStream); }
        /**
         * 通过流的方式获取hdfs上数据
         * @throws Exception
         */
        @Test
        public void testDownLoad1() throws Exception {
            FSDataInputStream inputStream = fs.open(new Path("/angelababy.love"));        
            FileOutputStream outputStream = new FileOutputStream("d:/angelababy.txt");
            IOUtils.copy(inputStream, outputStream);
        }
        
        /**
         * 读取流的一部分
         * @throws Exception
         */
        @Test
        public void testRandomAccess() throws Exception{
            
            FSDataInputStream inputStream = fs.open(new Path("/angelababy.love"));
        
            inputStream.seek(12);
            
            FileOutputStream outputStream = new FileOutputStream("d:/angelababy.love.txt");
            
            IOUtils.copy(inputStream, outputStream);
            
        }
        
        /**
         * 显示hdfs上文件的内容
         * @throws IOException 
         * @throws IllegalArgumentException 
         */
        @Test
        public void testCat() throws IllegalArgumentException, IOException{
            
            FSDataInputStream in = fs.open(new Path("/angelababy.love"));
            
            IOUtils.copy(in, System.out);
            
    //        IOUtils.copyBytes(in, System.out, 1024);
        }
        
    }

     

    ---------------------------------这个没有做

    9. 案例2:开发JAVA采集程序

    9.1 需求

    从外部购买数据,数据提供方会实时将数据推送到6台FTP服务器上,我方部署6台接口采集机来对接采集数据,并上传到HDFS中

    提供商在FTP上生成数据的规则是以小时为单位建立文件夹(2016-03-11-10),每分钟生成一个文件(00.dat,01.data,02.dat,........)

    提供方不提供数据备份,推送到FTP服务器的数据如果丢失,不再重新提供,且FTP服务器磁盘空间有限,最多存储最近10小时内的数据

    由于每一个文件比较小,只有150M左右,因此,我方在上传到HDFS过程中,需要将15分钟时段的数据合并成一个文件上传到HDFS

    为了区分数据丢失的责任,我方在下载数据时最好进行校验

     



    PS :就是收集起群上日志合并文件,根据下面的文件创建目录,弄个程序一直生成日志,最后跑脚本
    ===============================
    #!/bin/bash #set java env export JAVA_HOME=/home/hadoop/app/jdk1.7.0_51 export JRE_HOME=${JAVA_HOME}/jre export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib export PATH=${JAVA_HOME}/bin:$PATH #set hadoop env export HADOOP_HOME=/home/hadoop/app/hadoop-2.6.4 export PATH=${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:$PATH #版本1的问题: #虽然上传到Hadoop集群上了,但是原始文件还在。如何处理? #日志文件的名称都是xxxx.log1,再次上传文件时,因为hdfs上已经存在了,会报错。如何处理? #如何解决版本1的问题 # 1、先将需要上传的文件移动到待上传目录 # 2、在讲文件移动到待上传目录时,将文件按照一定的格式重名名 # /export/software/hadoop.log1 /export/data/click_log/xxxxx_click_log_{date} #日志文件存放的目录 log_src_dir=/home/hadoop/logs/log/ #待上传文件存放的目录 log_toupload_dir=/home/hadoop/logs/toupload/ #日志文件上传到hdfs的根路径 hdfs_root_dir=/data/clickLog/20151226/ #打印环境变量信息 echo "envs: hadoop_home: $HADOOP_HOME" #读取日志文件的目录,判断是否有需要上传的文件 echo "log_src_dir:"$log_src_dir ls $log_src_dir | while read fileName do if [[ "$fileName" == access.log.* ]]; then # if [ "access.log" = "$fileName" ];then date=`date +%Y_%m_%d_%H_%M_%S` #将文件移动到待上传目录并重命名 #打印信息 echo "moving $log_src_dir$fileName to $log_toupload_dir"xxxxx_click_log_$fileName"$date" mv $log_src_dir$fileName $log_toupload_dir"xxxxx_click_log_$fileName"$date #将待上传的文件path写入一个列表文件willDoing echo $log_toupload_dir"xxxxx_click_log_$fileName"$date >> $log_toupload_dir"willDoing."$date fi done #找到列表文件willDoing ls $log_toupload_dir | grep will |grep -v "_COPY_" | grep -v "_DONE_" | while read line do #打印信息 echo "toupload is in file:"$line #将待上传文件列表willDoing改名为willDoing_COPY_ mv $log_toupload_dir$line $log_toupload_dir$line"_COPY_" #读列表文件willDoing_COPY_的内容(一个一个的待上传文件名) ,此处的line 就是列表中的一个待上传文件的path cat $log_toupload_dir$line"_COPY_" |while read line do #打印信息 echo "puting...$line to hdfs path.....$hdfs_root_dir" hadoop fs -put $line $hdfs_root_dir done mv $log_toupload_dir$line"_COPY_" $log_toupload_dir$line"_DONE_" done
  • 相关阅读:
    2.六角星绘制
    1.五角星绘制
    Redis
    javaScript
    反射
    区分'方法'和'函数'
    递归,二分法
    匿名函数,排序函数,过滤函数,映射函数,
    生成器,生成器函数,推导式,生成器表达式.
    函数,闭包,迭代器
  • 原文地址:https://www.cnblogs.com/bee-home/p/7866397.html
Copyright © 2011-2022 走看看