在一个经典的数据架构中,Hadoop是处理复杂数据流的核心。数据从各种系统中收集而来,并汇总导入到Hadoop分布式文件系统HDFS中,然后通过MapReduce或者其它基于MapReduce封装的语言如Hive,Pig等进行处理,将处理后的数据导出即可。具体例子而言,如果一个大型网站需要做网站点击率的分析,它将多个服务器采集的页面访问日志汇总,推送至HDFS中,启动MapReduce作业,接下来数据将被解析,汇总以及IP地址进行关联计算,生成的结果可以导入到关系型数据库中。
启动Hadoop
在一台已经安装Hadoop并配置了环境变量的机器中启动hadoop。
su hadoop
#进入hadoop命令模式hadoop namenode -format
#初始化hadoop:hadoop部署好之后需要进行格式化工作,同时初始化操作日志,因此对于第一次使用HDFS时,需要执行-format命令才可以正常使用namenode节点start-all.sh
#启动hadoopjps
#使用jps检查是否启动进程
使用Hadoop shell命令导入导出数据到HDFS
- HDFS提供shell命令实现访问文件系统的功能,shell脚本名称为hadoop,通常安装在
$HADOOP_BIN
目录下,将$HADOOP_BIN
配置到$PATH
环境变量中,这样所有命令都可以通过hadoop fs -command
执行,通过hadoop fs -help command
获得某个命令的具体说明。 hadoop fs -mkdir -p /data/weblogs
#在HDFS创建名为weblogs的新文件夹,-p
表示级联创建(在创建目录weblogs时,若data不存在,顺带创建data目录)hadoop fs -copyFromLocal <localsrc> URI
将文件从本地文件系统复制到HDFS目标文件夹下。e.g.:hadoop fs -copyFromLocal weblogs.txt /data/weblogs
hadoop fs -ls <args>
args为文件,列出文件状态;args为目录列出目录下的文件- 工作原理:Hadoop shell轻量地封装在HDFS FileSystem API之上。在执行hadoop命令时,如果传进去的参数是
fs
,实际执行的是org.apache.hadoop.fs.FsShell这个类,FsShell实例化了一个org.apache.hadoop.fs.FileSystem对象,并且将命令行参数与类方法映射起来。例如,执行hadoop fs -mkdir /data/weblogs
相当与调用FileSystem.mkdirs(new Path("/data/weblogs"))
- 详细的命令使用参见:hadoop fs
Pig脚本使用getmerge
命令
- 使用上面的
hadoop fs
命令的get
和copyToLocal
只能对文件进行复制,无法对文件夹进行复制,当然可以使用其getmerge
合并多个文件并下载到本地文件系统中 - 使用Pig脚本执行
getmerge
。建立weblogs_md5_group_pig.sh脚本:
weblogs = load '/data/weblogs/weblog_entries.txt' as#逐行读取HDFS上weblog_entries.txt文件 (md5:chararray, url:chararray, date:chararray, time:chararray, ip:chararray); md5_grp = group weblogs by md5 parallel 4;#按照md5值进行分组 store md5_grp into '/data/weblogs/weblogs_md5_groups.bcp';#parallel是Pig脚本用来设置reduce个数的方法,由于启动4个reduce任务,所以会在输出目录中生成4个文件
distcp实现集群间数据复制
- Hadoop分布式复制
distcp
是Hadoop集群间复制大量数据的高效工作,distcp
是通过启动MapReduce实现数据复制的。 hadoop distcp hdfs://namenodeA/data/weblogs hdfs://namenodeB/data/weblogs
#将集群A的weblogs文件夹复制到集群B上hadoop distcp -overwrite hdfs://namenodeA/data/weblogs hdfs://namenodeB/data/weblogs
#将集群A的weblogs文件夹复制到集群B并覆盖已存在文件hadoop distcp -update hdfs://namenodeA/data/weblogs hdfs://namenodeB/data/weblogs
#同步集群A和集群B之间的weblogs文件夹- 实现原理:在原集群,文件夹的内容被复制为一个临时的大文件,将会启动一个只有map的MapReduce作业实现两个集群之间的数据复制。
使用Sqooq从Mysql数据库导入到HDFS
- Sqooq和distcp相似,都是构建在MapReduce之上,利用了MapReduce的并行性和容错性,与集群间复制不同,Sqooq设计通过JDBC驱动连接实现Hadoop集群与关系数据库之间的数据复制。
- 在mysql中创建logs数据库和表weblogs:
CREATE DATABASE logs; use logs; create table weblogs ( md5 VARCHAR(32), url VARCHAR(64), request_date DATE, request_time TIME, ip VARCHAR(15) ); show tables;
- 使用如下命令将logs数据库的表数据导入到HDFS中:
./sqoop import --connect jdbc:mysql://localhost:3306/logs --username root --password 123456 --table weblogs --target-dir /data/weblogs/import
- 工作原理:Sqooq连接数据库的JDBC驱动在
--connect
语句中定义,并从$SQOOP_HOME/lib
目录中加载相应的包,其中$SQOOP_HOME
为Sqooq安装的绝对路径。--username
和--password
用于验证mysql实例的权限,--target-dir
选项指定导出数据库的存放位置,-m 1
指定选定map的数量。注意:mysql.user表必须包含Hadoop集群每个节点的主机域名和相应的用户名,否则Sqooq会抛出异常。
使用Sqooq从HDFS导出到Mysql
- 创建表:
use logs; create table weblogs_from_hdfs ( md5 VARCHAR(32), url VARCHAR(64), request_date DATE, request_time TIME, ip VARCHAR(15) );
- 从HDFS导出到weblog_entries.txt文件到Mysql:
./sqoop export --connect jdbc:mysql://localhost:3306/logs --username root --password 123456 --table weblogs_from_hdfs --export-dir '/data/weblogs/weblog_entries.txt' -m 1 --fields-terminated-by ' '
- 上面的这个例子使用
--table
参数决定HDFS导出的数据被储存在哪张Mysql表中,Sqooq通过表的元数据信息,列数量和列类型来校验HDFS需要导出目录中的数据并生成相应的插入语句。导出作业可以被想象为逐行读取HDFS的文件变每行产生一个INSERT INTO
的sql语句进行插入。
本博客部分来源于实验楼