一、Hadoop集群的搭建与配置
1、节点准备
集群规划:
主机名 IP 安装的软件 运行的进程
weekend 01 192.168.1.60 jdk、hadoop NameNode、DFSZKFailoverController
weekend 02 192.168.1.61 jdk、hadoop NameNode、DFSZKFailoverController
weekend 03 192.168.1.62 jdk、hadoop ResourceManager
weekend 04 192.168.1.63 jdk、hadoop ResourceManager
weekend 05 192.168.1.64 jdk、hadoop、zookeeper DataNode、NodeManager、JournalNode、QuorumPeerMain
weekend 06 192.168.1.65 jdk、hadoop、zookeeper DataNode、NodeManager、JournalNode、QuorumPeerMain
weekend 07 192.168.1.66 jdk、hadoop、zookeeper DataNode、NodeManager、JournalNode、QuorumPeerMain
2、Java环境安装
此部分主要描述Java的安装及环境配置。
上传JDK软件包及hadoop相关软件包并解压,如下:
[hadoop@weekend01 src]$ pwd
/usr/local/src
[hadoop@weekend01 src]$ ls
hadoop-2.6.4 jdk1.8.0_92 zookeeper-3.4.6
hbase-1.2.1 mongodb-linux-x86_64-rhel62-3.2.6 zookeeper-3.4.6.tar.gz
授权给hadoop用户可以编辑/etc/profile文件,(此步骤非必要)
添加如下内容至/etc/profile文件中
export JAVA_HOME=/usr/local/src/jdk1.8.0_92
export HADOOP_HOME=/usr/local/src/hadoop-2.6.4
export CLASSPATH=.:$JAVA_HOME/lib:$JAVA_HOME/jre/lib$CLASSPATH
export HBASE_HOME=/usr/local/src/hbase-1.2.1
export ZK_HOME=/usr/local/src/zookeeper-3.4.6
export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$HBASE_HOME/bin:$PATH:$ZK_HOME/bin
保存并退出并执行[hadoop@weekend01 ~]$ source /etc/profile(如不行,reboot就可以)然后验证java环境,和hadoop命令,
[hadoop@weekend01 ~]$ java -version
java version "1.8.0_92"
Java(TM) SE Runtime Environment (build 1.8.0_92-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode)
[hadoop@weekend01 ~]$ hadoop version
Hadoop 2.6.4
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r 5082c73637530b0b7e115f9625ed7fac69f937e6
Compiled by jenkins on 2016-02-12T09:45Z
Compiled with protoc 2.5.0
From source with checksum 8dee2286ecdbbbc930a6c87b65cbc010
This command was run using /usr/local/src/hadoop-2.6.4/share/hadoop/common/hadoop-common-2.6.4.jar
到此就成功配置了JDK和hadoop的环境
3、SSH配置
hadoop控制脚本依赖ssh来执行针对整个集群的操作,因此为了支持无缝工作,此部分主要描述SSH的配置。
[hadoop@weekend01 ~]$ ssh-keygen
下面直接回车就可以
然后将公钥分别考别到自己和其他6个节点
[hadoop@weekend01 ~]$ ssh-copy-id weekend01
测试
[hadoop@weekend01 ~]$ ssh weekend01
Last login: Tue May 31 16:30:47 2016 from slave2
[hadoop@weekend01 ~]$ exit
logout
Connection to weekend01 closed.
[hadoop@weekend01 ~]$ ssh weekend02
Last login: Tue Nov 8 11:33:42 2016 from master
[hadoop@weekwnd02 ~]$ exit
logout
Connection to weekend02 closed.
4、Hadoop配置
此部分主要描述对Hadoop各个配置文件的修改。
[hadoop@weekend01 hadoop]$ cat hdfs-site.xml
<configuration>
<property>
<name>dfs.namenode.dir</name>
<value>/weekend01/namenomde/dir</value>
</property>
<property>
<name>dfs.blocksize</name>
<value>268435456</value>
</property>
<property>
<name>dfs.namenode.handler.count</name>
<value>100</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>/weekend01.datanode/dir</value>
</property>
</configuration>
[hadoop@weekend01 hadoop]$ cat -n hadoop-env.sh
26 export JAVA_HOME=/usr/local/src/jdk1.8.0_92
[hadoop@weekend01 hadoop]$ cat core-site.xml
<configuration>
<!-- 指定hdfs的nameservice为weekend01 -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://192.168.1.60:9000</value>
</property>
<property>
<name>io.file.buffer.size</name>
<value>131072</value>
</property>
<!-- 指定hadoop临时目录 -->
<!--<property>
<name>hadoop.tmp.dir</name>
<value>/home/hadoop/app/hadoop-2.4.1/tmp</value>
</property>
-->
<!-- 指定zookeeper地址 -->
<!--<property>
<name>ha.zookeeper.quorum</name>
<value>weekend05:2181,weekend06:2181,weekend07:2181</value>
</property>-->
</configuration>
[hadoop@weekend01 hadoop]$ cat mapred-site.xml
<configuration>
<!-- 指定mr框架为yarn方式 -->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>`
</property>
</configuration>
[hadoop@weekend01 hadoop]$ cat slaves
weekend05
weekend06
weekend07
[hadoop@weekend01 hadoop]$ cat yarn-site.xml
<configuration>
<!-- Site specific YARN configuration properties -->
<property>
<name>yarn.acl.enable</name>
<value>false</value>
</property>
<property>
<name>yarn.admin.acl</name>
<value>*</value>
</property>
<property>
<name>yarn.log-aggreationanable</name>
<value>false</value>
</property>
<property>
<name>yarn.resourcemanager.address</name>
<value>192.168.1.60:9000</value>
</property>
<property>
<name>yarn.resourcemanager.scheduler.address</name>
<value>192.168.1.60:9000</value>
</property>
<property>
<name>yarn.resourcemanager.resourcetracker.address</name>
<value>192.168.1.60:9000</value>
</property>
<property>
<name>yarn.resourcemanager.admin.address</name>
<value>192.168.1.60:9000</value>
</property>
<property>
<name>yarn.resourcemanager.webapp.address</name>
<value>192.168.1.60:9000</value>
</property>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>weekend01</value>
</property>
<property>
<name>yarn.resourcemanager.scheduler,class</name>
<value>CapacityScheduler</value>
</property>
<property>
<name>yarn.</name>
<value></value>
</property>
<property>
<name>yarn.resourcemanager.</name>
<value></value>
</property>
<property>
<name>yarn.resourcemanager.</name>
<value></value>
</property>
<property>
<name>yarn.resourcemanager.</name>
<value></value>
</property>
<property>
<name>yarn.resourcemanager.</name>
<value></value>
</property>
</configuration>
5、Hadoop测试
此部分主要对Hadoop进行测试。
格式化HDFS文件系统,在weekend01上
[hadoop@weekend01 ~]$ hdfs namenode -format
…..
16/11/08 11:00:37 INFO common.Storage: Storage directory /tmp/hadoop-hadoop/dfs/name has been successfully formatted #这句话是格式化成功的标志
…….
然后在weekend05、weekend06、weekend07上分别启动zookeeper集群和journalnode进程
[hadoop@weekend05 ~]$ zkServer.sh start
JMX enabled by default
Using config: /usr/local/src/zookeeper-3.4.6/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[hadoop@weekend05 ~]$ hadoop-daemon.sh start journalnode
starting journalnode, logging to /usr/local/src/hadoop-2.6.4/logs/hadoop-hadoop-journalnode-weekend05.out
[hadoop@weekend05 ~]$ jps #看到这三个进程标志着启动成功
4833 JournalNode
4881 Jps
4779 QuorumPeerMain
然后在weekend01上格式化ZKFC
[hadoop@weekend01 ~]$ hdfs zkfc -formatZK
[hadoop@weekend01 ~]$ jps
4420 SecondaryNameNode
5148 Jps
4141 NameNode
[hadoop@weekend03 ~]$ start-yarn.sh
[hadoop@weekend03 ~]$ jps
4837 ResourceManager
4902 Jps
[hadoop@weekend01 ~]$ start-dfs.sh
[hadoop@weekend05 ~]$ jps
4833 JournalNode
4948 DataNode
5530 Jps
4779 QuorumPeerMain
5403 NodeManager
[hadoop@weekend06 ~]$ jps
2464 QuorumPeerMain
5448 NodeManager
2520 JournalNode
4953 DataNode
5581 Jps
[hadoop@weekend07 ~]$ jps
4864 QuorumPeerMain
5713 Jps
5581 NodeManager
5086 DataNode
4974 JournalNode
二、MapReduce应用
1、应用描述
使用hadoop进行数据统计,并做去重处理,该实验由于采用高可用避免了集群的单点故障,可以有效避免由于namenode单点故障引起的集群崩溃
2、数据准备
呼出终端,输入下面指令:
bin/hadoop fs -mkdir hdfsInput
执行这个命令时可能会提示类似安全的问题,如果提示了,请使用
bin/hadoop dfsadmin -safemode leave
来退出安全模式。
意思是在HDFS远程创建一个输入目录,我们以后的文件需要上载到这个目录里面才能执行。
在终端依次输入下面指令:
cd hadoop-1.2.1
bin/hadoop fs -put file/myTest*.txt hdfsInput
3、设计思路
1)将文件拆分成splits,由于测试用的文件较小,所以每个文件为一个split,并将文件按行分割形成<key,value>
对,key为偏移量(包括了回车符),value为文本行。这一步由MapReduce框架自动完成,如下图:
2)将分割好的<key,value>
对交给用户定义的map方法进行处理,生成新的<key,value>
对,如下图所示:
3)得到map方法输出的<key,value>
对后,Mapper会将它们按照key值进行排序,并执行Combine过程,将key值相同的value值累加,得到Mapper的最终输出结果。如下图:
4)Reducer先对从Mapper接收的数据进行排序,再交由用户自定义的reduce方法进行处理,得到新的<key,value>
对,并作为WordCount的输出结果,如下图:
4、程序代码
package com.felix;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
/**
*
* 描述:WordCount explains by Felix
* @author Hadoop Dev Group
*/
public class WordCount
{
/**
* MapReduceBase类:实现了Mapper和Reducer接口的基类(其中的方法只是实现接口,而未作任何事情)
* Mapper接口:
* WritableComparable接口:实现WritableComparable的类可以相互比较。所有被用作key的类应该实现此接口。
* Reporter 则可用于报告整个应用的运行进度,本例中未使用。
*
*/
public static class Map extends MapReduceBase implements
Mapper<LongWritable, Text, Text, IntWritable>
{
/**
* LongWritable, IntWritable, Text 均是 Hadoop 中实现的用于封装 Java 数据类型的类,这些类实现了WritableComparable接口,
* 都能够被串行化从而便于在分布式环境中进行数据交换,你可以将它们分别视为long,int,String 的替代品。
*/
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
/**
* Mapper接口中的map方法:
* void map(K1 key, V1 value, OutputCollector<K2,V2> output, Reporter reporter)
* 映射一个单个的输入k/v对到一个中间的k/v对
* 输出对不需要和输入对是相同的类型,输入对可以映射到0个或多个输出对。
* OutputCollector接口:收集Mapper和Reducer输出的<k,v>对。
* OutputCollector接口的collect(k, v)方法:增加一个(k,v)对到output
*/
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException
{
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens())
{
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}
public static class Reduce extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable>
{
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException
{
int sum = 0;
while (values.hasNext())
{
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception
{
/**
* JobConf:map/reduce的job配置类,向hadoop框架描述map-reduce执行的工作
* 构造方法:JobConf()、JobConf(Class exampleClass)、JobConf(Configuration conf)等
*/
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("wordcount"); //设置一个用户定义的job名称
conf.setOutputKeyClass(Text.class); //为job的输出数据设置Key类
conf.setOutputValueClass(IntWritable.class); //为job输出设置value类
conf.setMapperClass(Map.class); //为job设置Mapper类
conf.setCombinerClass(Reduce.class); //为job设置Combiner类
conf.setReducerClass(Reduce.class); //为job设置Reduce类
conf.setInputFormat(TextInputFormat.class); //为map-reduce任务设置InputFormat实现类
conf.setOutputFormat(TextOutputFormat.class); //为map-reduce任务设置OutputFormat实现类
/**
* InputFormat描述map-reduce中对job的输入定义
* setInputPaths():为map-reduce job设置路径数组作为输入列表
* setInputPath():为map-reduce job设置路径数组作为输出列表
*/
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf); //运行一个job
}
}
package com.hadoop.sample;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
//继承mapper接口,设置map的输入类型为<Object,Text>
//输出类型为<Text,IntWritable>
public static class Map extends Mapper<Object,Text,Text,IntWritable>{
//one表示单词出现一次
private static IntWritable one = new IntWritable(1);
//word存储切下的单词
private Text word = new Text();
public void map(Object key,Text value,Context context) throws IOException,InterruptedException{
//对输入的行切词
StringTokenizer st = new StringTokenizer(value.toString());
while(st.hasMoreTokens()){
word.set(st.nextToken());//切下的单词存入word
context.write(word, one);
}
}
}
//继承reducer接口,设置reduce的输入类型<Text,IntWritable>
//输出类型为<Text,IntWritable>
public static class Reduce extends Reducer<Text,IntWritable,Text,IntWritable>{
//result记录单词的频数
private static IntWritable result = new IntWritable();
public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException{
int sum = 0;
//对获取的<key,value-list>计算value的和
for(IntWritable val:values){
sum += val.get();
}
//将频数设置到result
result.set(sum);
//收集结果
context.write(key, result);
}
}
/**
* @param args
*/
public static void main(String[] args) throws Exception{
// TODO Auto-generated method stub
Configuration conf = new Configuration();
//检查运行命令
String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
if(otherArgs.length != 2){
System.err.println("Usage WordCount <int> <out>");
System.exit(2);
}
//配置作业名
Job job = new Job(conf,"word count");
//配置作业各个类
job.setJarByClass(WordCount.class);
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
5、运行结果用户登录
在终端输入下面指令:
bin/hadoop jar hadoop-examples-1.2.1.jar wordcount hdfsInput hdfsOutput
注意,这里的示例程序是1.2.1版本的,可能每个机器有所不一致,那么请用*通配符代替版本号
bin/hadoop jar hadoop-examples-*.jar wordcount hdfsInput hdfsOutput
应该出现下面结果:
Hadoop命令会启动一个JVM来运行这个MapReduce程序,并自动获得Hadoop的配置,同时把类的路径(及其依赖关系)加入到Hadoop的库中。以上就是Hadoop Job的运行记录,从这里可以看到,这个Job被赋予了一个ID号:job_201202292213_0002,而且得知输入文件有两个(Total input paths to process : 2),同时还可以了解map的输入输出记录(record数及字节数),以及reduce输入输出记录。
查看HDFS上hdfsOutput目录内容:
在终端输入下面指令:
bin/hadoop fs -ls hdfsOutput
从上图中知道生成了三个文件,我们的结果在"part-r-00000"中。
使用下面指令查看结果输出文件内容
bin/hadoop fs -cat output/part-r-00000
注:文中部分图引自其他博文,此文仅作为实验笔记!如有侵犯请邮箱至1255560195@qq.com,未经允许不得转载!