zoukankan      html  css  js  c++  java
  • 大数据之路week07--day01(HDFS学习,Java代码操作HDFS,将HDFS文件内容存入到Mysql)

    一、HDFS概述

    数据量越来越多,在一个操作系统管辖的范围存不下了,那么就分配到更多的操作系统管理的磁盘中,但是不方便管理和维护,因此迫切需要一种系统来管理多台机器上的文件,这就是分布式文件管理系统 。

    是一种允许文件通过网络在多台主机上分享的文件系统,可让多机器上的多用户分享文件和存储空间。

    通透性。让实际上是通过网络来访问文件的动作,由程序与用户看来,就像是访问本地的磁盘一般。

    容错。即使系统中有某些节点宕机,整体来说系统仍然可以持续运作而不会有数据损失【通过副本机制实现】。

    分布式文件管理系统很多,hdfs只是其中一种,不合适小文件。

                                 HDFS结构

    二、NameNode

    NameNode是整个文件系统的管理节点。它维护着整个文件系统的文件目录树,文件/目录的元信息和每个文件对应的数据块列表。接收用户的操作请求。

    文件包括:

      fsimage:元数据镜像文件。存储某一时段NameNode内存元数据信息。

      edits:操作日志文件,namenode启动后一些新增元信息日志。

      fstime:保存最近一次checkpoint的时间

    以上这些文件是保存在linux的文件系统中。

      hdfs-site.xml的dfs.namenode.name.dir属性

    三、DataNode

    提供真实文件数据的存储服务。

    文件块(block):最基本的存储单位。对于文件内容而言,一个文件的长度大小是size,那么从文件的0偏移开始,按照固定的大小,顺序对文件进行划分并编号,划分好的每一个块称一个Block。2.0以后HDFS默认Block大小是128MB,以一个256MB文件,共有256/128=2个Block.(注意,这里不是说,重要大于128M就会产生一个block块,默认是128M,用户可以自己更改的,但是我们一般不去更改)

      hdfs-site.xml中dfs.blocksize属性

    不同于普通文件系统的是,HDFS中,如果一个文件小于一个数据块的大小,并不占用整个数据块存储空间 Replication。多复本。默认是三个。

       hdfs-site.xml的dfs.replication属性

    四、Client读取多副本文件过程

    五、HDFS的Trash回收站

    和Linux系统(桌面环境)的回收站设计一样,HDFS会为每一个用户创建一个回收站目录:/user/用户名/.Trash/,每一个被用户通过Shell删除的文件/目录,fs.trash.interval是在指在这个回收周期之内,文件实际上是被移动到trash的这个目录下面,而不是马上把数据删除掉。等到回收周期真正到了以后,hdfs才会将数据真正删除。默认的单位是分钟,1440分钟=60*24,刚好是一天。 配置:在每个节点(不仅仅是主节点)上添加配置 core-site.xml,增加如下内容

    <property>

      <name>fs.trash.interval</name>

      <value>1440</value>

    </property>

    注意:如果删除的文件过大,超过回收站大小的话会提示删除失败

      需要指定参数 -skipTrash

    六、通过Java代码去操作HDFS(这里使用Myeclipse 或者 IDEA  或者 eclipse都是可以的)

    1、创建一个maven项目

    2、修改pom.xml文件,将这几个有关依赖添加进去,保存。

     1 <dependency>
     2             <groupId>junit</groupId>
     3             <artifactId>junit</artifactId>
     4             <version>4.11</version>
     5             <scope>test</scope>
     6         </dependency>
     7 
     8         <dependency>
     9             <groupId>mysql</groupId>
    10             <artifactId>mysql-connector-java</artifactId>
    11             <version>5.1.17</version>
    12         </dependency>
    13 
    14         <dependency>
    15             <groupId>org.apache.hadoop</groupId>
    16             <artifactId>hadoop-common</artifactId>
    17             <version>2.6.0</version>
    18         </dependency>
    19 
    20         <dependency>
    21             <groupId>org.apache.hadoop</groupId>
    22             <artifactId>hadoop-client</artifactId>
    23             <version>2.6.0</version>
    24         </dependency>
    25 
    26         <dependency>
    27             <groupId>org.apache.hadoop</groupId>
    28             <artifactId>hadoop-hdfs</artifactId>
    29             <version>2.6.0</version>
    30         </dependency>

    3、编写Java代码,连接到HDFS(我这里没有导包,注意不要导错包,Hadoop下的)

     1 public class hdfsDemo2 {
     2     public static void main(String[] args) throws Exception {
     3         //
     4         URI uri = new URI("hdfs://192.168.230.50:9000"); //输入你的namenode的节点信息
     5         Configuration conf = new Configuration();
     6         FileSystem fs = FileSystem.get(uri, conf);
     7 //        method2(fs);
     8 
     9         method1(fs);
    10     }

    4、简单的操作,对HDFS创建文件夹以及删除文件夹

     1 import org.apache.hadoop.conf.Configuration;
     2 import org.apache.hadoop.fs.FSDataInputStream;
     3 import org.apache.hadoop.fs.FileSystem;
     4 import org.apache.hadoop.fs.Path;
     5 
     6 import java.io.*;
     7 import java.net.URI;
     8 
     9 public class hdfsDemo2 {
    10     public static void main(String[] args) throws Exception {
    11         //
    12         URI uri = new URI("hdfs://192.168.230.50:9000");
    13         Configuration conf = new Configuration();
    14         FileSystem fs = FileSystem.get(uri, conf);
    15 //        method2(fs);
    16 
    17         method1(fs);
    18     }
    19 
    20 
    21 
    22     private static void method2(FileSystem fs) throws IOException {
    23         boolean b1 = fs.mkdirs(new Path("/data/"));
    24         System.out.println(b1);
    25 
    26         boolean b2 = fs.delete(new Path("/data/"),true);
    27         System.out.println(b2);
    28     }
    29 
    30 
    31 
    32     private static void method1(FileSystem fs) throws IOException, UnsupportedEncodingException, FileNotFoundException {
    33         //从hdfs上读取数据
    34         FSDataInputStream in = fs.open(new Path("/usr/test/empldata.csv"));
    35         BufferedReader br = new BufferedReader(new InputStreamReader(in, "utf-8"));
    36         String line = null;
    37         while((line=br.readLine())!=null){
    38             System.out.println(line);
    39         }
    40         in.close();
    41 
    42         //从本地上传到hdfs上面
    43         /*FileInputStream fi = new FileInputStream("F:\新桌面\dianxin_data");
    44         FSDataOutputStream fo = fs.create(new Path("/usr/test/hdfstest.txt"));
    45         IOUtils.copyBytes(fi, fo, 1024, true);*/
    46     }
    47 }

    七、将HDFS的文件读出到JVM,再存入到Mysql数据库中

    1、先在数据库中建表并插入数据

     1 DROP TABLE IF EXISTS  `emp`;
     2 CREATE TABLE `emp` (
     3   `EMPNO` int(4) NOT NULL,
     4   `ENAME` varchar(10) DEFAULT NULL,
     5   `JOB` varchar(9) DEFAULT NULL,
     6   `MGR` varchar(10) DEFAULT NULL,
     7   `HIREDATE` date DEFAULT NULL,
     8   `SAL` int(7) DEFAULT NULL,
     9   `COMM` int(7) DEFAULT NULL,
    10   `DEPTNO` int(2) DEFAULT NULL,
    11   PRIMARY KEY (`EMPNO`)
    12 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    13 字段中文名字依次是:工号,姓名,工作岗位,部门经理,受雇日期,薪金,奖金,部门编号
    14 insert into `emp`(`EMPNO`,`ENAME`,`JOB`,`MGR`,`HIREDATE`,`SAL`,`COMM`,`DEPTNO`) values
    15 ('7369','SMITH','CLERK','7902','1980-12-17','800',null,'50'),
    16 ('7499','ALLEN','SALESMAN','7698','1981-02-20','1600','300','50'),
    17 ('7521','WARD','SALESMAN','7698','1981-02-22','1250','500','30'),
    18 ('7566','JONES','MANAGER','7839','1981-04-02','2975',null,'20'),
    19 ('7654','MARTIN','SALESMAN','7698','1981-09-28','1250','1400','30'),
    20 ('7698','BLAKE','MANAGER','7839','1981-05-01','2850',null,'30'),
    21 ('7782','CLARK','MANAGER','7839','1981-06-09','2450',null,'10'),
    22 ('7788','SCOTT','ANALYST','7566','1987-04-19','3000',null,'20'),
    23 ('7839','KING','PRESIDENT',null,'1981-11-17','5000',null,'10'),
    24 ('7844','TURNER','SALESMAN','7698','1981-09-08','1500','0','30'),
    25 ('7876','ADAMS','CLERK','7788','1987-05-23','1100',null,'20'),
    26 ('7900','JAMES','CLERK','7698','1981-12-03','950',null,'30'),
    27 ('7902','FORD','ANALYST','7566','1981-12-03','3000',null,'20'),
    28 ('7934','MILLER','CLERK','7782','1982-01-23','1300',null,'10');
    29 
    30 DROP TABLE IF EXISTS  `dept`;
    31 CREATE TABLE `dept` (
    32   `DEPTNO` int(2) NOT NULL,
    33   `DNAME` varchar(14) DEFAULT NULL,
    34   `LOC` varchar(13) DEFAULT NULL,
    35   PRIMARY KEY (`DEPTNO`)
    36 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    37 
    38 insert into `dept`(`DEPTNO`,`DNAME`,`LOC`) values
    39 ('10','ACCOUNTING','NEW YORK'),
    40 ('20','RESEARCH','DALLAS'),
    41 ('30','SALES','CHICAGO'),
    42 ('40','OPERATIONS','BOSTON');

    2、我将要插入的数据:

     1 7369,SMITH,CLERK,7902,1980-12-17,800,null,20
     2 7499,ALLEN,SALESMAN,7698,1981-02-20,1600,300,30
     3 7521,WARD,SALESMAN,7698,1981-02-22,1250,500,30
     4 7566,JONES,MANAGER,7839,1981-04-02,2975,null,20),
     5 7654,MARTIN,SALESMAN,7698,1981-09-28,1250,1400,30
     6 7698,BLAKE,MANAGER,7839,1981-05-01,2850,null,30
     7 7782,CLARK,MANAGER,7839,1981-06-09,2450,null,10
     8 7788,SCOTT,ANALYST,7566,1987-04-19,3000,null,20
     9 7839,KING,PRESIDENT,null,1981-11-17,5000,null,10
    10 7844,TURNER,SALESMAN,7698,1981-09-08,1500,0,30
    11 7876,ADAMS,CLERK,7788,1987-05-23,1100,null,20
    12 7900,JAMES,CLERK,7698,1981-12-03,950,null,30
    13 7902,FORD,ANALYST,7566,1981-12-03,3000,null,20
    14 7934,MILLER,CLERK,7782,1982-01-23,1300,null,10

    3、先将数据文件通过命令 hadoop fs -put  上传到HDFS中去。

    4、编写Java代码

     1 import org.apache.hadoop.conf.Configuration;
     2 import org.apache.hadoop.fs.FSDataInputStream;
     3 import org.apache.hadoop.fs.FileSystem;
     4 import org.apache.hadoop.fs.Path;
     5 
     6 import java.io.BufferedReader;
     7 import java.io.InputStreamReader;
     8 import java.net.URI;
     9 import java.sql.Connection;
    10 import java.sql.DriverManager;
    11 import java.sql.Statement;
    12 
    13 public class hdfsDemo {
    14     public static void main(String[] args) throws Exception{
    15 
    16         //hdfs的配置
    17         URI uri = new URI("hdfs://192.168.230.50:9000");
    18         Configuration conf = new Configuration();
    19         FileSystem fs = FileSystem.get(uri,conf);
    20 
    21         //mysql的配置
    22         Class.forName("com.mysql.jdbc.Driver");
    23         Connection conn = DriverManager.getConnection("jdbc:mysql://192.168.230.50:3306/test", "root", "123456");
    24         System.out.println(conn);
    25         Statement st = conn.createStatement();
    26 
    27         FSDataInputStream fdis = fs.open(new Path("/usr/test/empldata.csv"));
    28         BufferedReader br = new BufferedReader(new InputStreamReader(fdis));
    29         String line = null;
    30         while((line = br.readLine())!=null){
    31             String[] split = line.split(",");
    32             String EMPNO = split[0];
    33             String ENAME = split[1];
    34             String JOB = split[2];
    35             String MGR = split[3];
    36             String HIREDATE = split[4];
    37             String SAL = split[5];
    38             String COMM = split[6];
    39             String DEPTNO = split[7];
    40 
    41             String sql = "insert into test.emp(EMPNO,ENAME,JOB,MGR,HIREDATE,SAL,COMM,DEPTNO) values " +
    42                     "("+"'"+EMPNO+"'"+","+"'"+ENAME+"'"+","+"'"+JOB+"'"+","+"'"+MGR+"'"+","+"'"+HIREDATE+"'"+","+"'"+SAL+"'"+","+"'"+COMM+"'"+","+"'"+DEPTNO+"'"+")";
    43             System.out.println(sql);
    44             st.execute(sql);
    45         }
    46 
    47 
    48         fdis.close();
    49         conn.close();
    50 
    51 
    52 
    53     }
    54 }

    八、数据存储->读文件(重要!!!

     读文件流程分析:

    1.首先调用FileSystem对象的open方法,其实是一个DistributedFileSystem的实例

    2.DistributedFileSystem通过rpc获得文件的第一个block的locations,同一block按照副本数会返回多个locations,这些locations按照hadoop拓扑结构排序,距离客户端近的排在前面.

    3.前两步会返回一个FSDataInputStream对象,该对象会被封装成DFSInputStream对象,DFSInputStream可以方便的管理datanode和namenode数据流。客户端调用read方法,DFSInputStream最会找出离客户端最近的datanode并连接。

    4.数据从datanode源源不断的流向客户端。

    5.如果第一块的数据读完了,就会关闭指向第一块的datanode连接,接着读取下一块。这些操作对客户端来说是透明的,客户端的角度看来只是读一个持续不断的流。

    6.如果第一批block都读完了,DFSInputStream就会去namenode拿下一批blocks的location,然后继续读,如果所有的块都读完,这时就会关闭掉所有的流。

    如果在读数据的时候,DFSInputStream和datanode的通讯发生异常,就会尝试正在读的block的排第二近的datanode,并且会记录哪个datanode发生错误,剩余的blocks读的时候就会直接跳过该datanode。DFSInputStream也会检查block数据校验和,如果发现一个坏的block,就会先报告到namenode节点,然后DFSInputStream在其他的datanode上读该block的镜像

    该设计的方向就是客户端直接连接datanode来检索数据并且namenode来负责为每一个block提供最优的datanode,namenode仅仅处理block location的请求,这些信息都加载在namenode的内存中,hdfs通过datanode集群可以承受大量客户端的并发访问。

    九、数据存储->写文件(重要!!!

    1.客户端通过调用DistributedFileSystem的create方法创建新文件

    2.DistributedFileSystem通过RPC调用namenode去创建一个没有blocks关联的新文件,创建前,namenode会做各种校验,比如文件是否存在,客户端有无权限去创建等。如果校验通过,namenode就会记录下新文件,否则就会抛出IO异常.

    3.前两步结束后会返回FSDataOutputStream的对象,象读文件的时候相似,FSDataOutputStream被封装成DFSOutputStream.DFSOutputStream可以协调namenode和datanode。客户端开始写数据到DFSOutputStream,DFSOutputStream会把数据切成一个个小packet,然后排成队列data quene。 4.DataStreamer会去处理接受data queue,他先问询namenode这个新的block最适合存储的在哪几个datanode里,比如副本数是3,那么就找到3个最适合的datanode,把他们排成一个pipeline.DataStreamer把packet按队列输出到管道的第一个datanode中,第一个datanode又把packet输出到第二个datanode中,以此类推。

    5.DFSOutputStream还有一个对列叫ack queue,也是有packet组成,等待datanode的收到响应,当pipeline中的所有datanode都表示已经收到的时候,这时akc queue才会把对应的packet包移除掉。 如果在写的过程中某个datanode发生错误,会采取以下几步:1) pipeline被关闭掉;2)为了防止丢包ack queue里的packet会同步到data queue里;3)把产生错误的datanode上当前在写但未完成的block删掉;4)block剩下的部分被写到剩下的两个正常的datanode中;5)namenode找到另外的datanode去创建这个块的复制。当然,这些操作对客户端来说是无感知的。

    6.客户端完成写数据后调用close方法关闭写入流

    7.DataStreamer把剩余得包都刷到pipeline里然后等待ack信息,收到最后一个ack后,通知datanode把文件标示为已完成。

  • 相关阅读:
    vs2003无法打开或创建Web应用程序解决办法(HTTP/1.1 500server error错误处理方法)
    【宋红康学习日记1】关于环境变量设置出现的问题——找不到或无法加载主类 java
    【宋红康学习日记2】简单的语法知识
    【宋红康学习日记5】数组
    【宋红康学习日记4】流程控制
    【宋红康程序思想学习日记3】杨辉三角
    【宋红康程序思想学习日记1】运用位运算思想实现两个数的互换
    noaman日志第一条:20151024;“Hello.World”
    【宋红康学习日记3】运算符
    【宋红康程序思想学习日记4】数组简单操作
  • 原文地址:https://www.cnblogs.com/wyh-study/p/12050842.html
Copyright © 2011-2022 走看看