zoukankan      html  css  js  c++  java
  • Hadoop集群搭建及MapReduce应用

    一、Hadoop集群的搭建与配置

    1、节点准备

    集群规划:

    主机名 IP 安装的软件 运行的进程

    weekend 01 192.168.1.60 jdk、hadoop NameNode、DFSZKFailoverController

    weekend 02 192.168.1.61 jdk、hadoop NameNode、DFSZKFailoverController

    weekend 03 192.168.1.62 jdk、hadoop ResourceManager

    weekend 04 192.168.1.63 jdk、hadoop ResourceManager

    weekend 05 192.168.1.64 jdk、hadoop、zookeeper DataNode、NodeManager、JournalNode、QuorumPeerMain

    weekend 06 192.168.1.65 jdk、hadoop、zookeeper DataNode、NodeManager、JournalNode、QuorumPeerMain

    weekend 07 192.168.1.66 jdk、hadoop、zookeeper DataNode、NodeManager、JournalNode、QuorumPeerMain

    2、Java环境安装

    此部分主要描述Java的安装及环境配置。

    上传JDK软件包及hadoop相关软件包并解压,如下:

    [hadoop@weekend01 src]$ pwd

    /usr/local/src

    [hadoop@weekend01 src]$ ls

    hadoop-2.6.4 jdk1.8.0_92 zookeeper-3.4.6

    hbase-1.2.1 mongodb-linux-x86_64-rhel62-3.2.6 zookeeper-3.4.6.tar.gz

    授权给hadoop用户可以编辑/etc/profile文件,(此步骤非必要)

    添加如下内容至/etc/profile文件中

    export JAVA_HOME=/usr/local/src/jdk1.8.0_92

    export HADOOP_HOME=/usr/local/src/hadoop-2.6.4

    export CLASSPATH=.:$JAVA_HOME/lib:$JAVA_HOME/jre/lib$CLASSPATH

    export HBASE_HOME=/usr/local/src/hbase-1.2.1

    export ZK_HOME=/usr/local/src/zookeeper-3.4.6

    export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$HBASE_HOME/bin:$PATH:$ZK_HOME/bin

    保存并退出并执行[hadoop@weekend01 ~]$ source /etc/profile(如不行,reboot就可以)然后验证java环境,和hadoop命令,

    [hadoop@weekend01 ~]$ java -version

    java version "1.8.0_92"

    Java(TM) SE Runtime Environment (build 1.8.0_92-b14)

    Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode)

    [hadoop@weekend01 ~]$ hadoop version

    Hadoop 2.6.4

    Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r 5082c73637530b0b7e115f9625ed7fac69f937e6

    Compiled by jenkins on 2016-02-12T09:45Z

    Compiled with protoc 2.5.0

    From source with checksum 8dee2286ecdbbbc930a6c87b65cbc010

    This command was run using /usr/local/src/hadoop-2.6.4/share/hadoop/common/hadoop-common-2.6.4.jar

    到此就成功配置了JDK和hadoop的环境

    3、SSH配置

    hadoop控制脚本依赖ssh来执行针对整个集群的操作,因此为了支持无缝工作,此部分主要描述SSH的配置。

    [hadoop@weekend01 ~]$ ssh-keygen

    下面直接回车就可以

    然后将公钥分别考别到自己和其他6个节点

    [hadoop@weekend01 ~]$ ssh-copy-id weekend01

    测试

    [hadoop@weekend01 ~]$ ssh weekend01

    Last login: Tue May 31 16:30:47 2016 from slave2

    [hadoop@weekend01 ~]$ exit

    logout

    Connection to weekend01 closed.

    [hadoop@weekend01 ~]$ ssh weekend02

    Last login: Tue Nov 8 11:33:42 2016 from master

    [hadoop@weekwnd02 ~]$ exit

    logout

    Connection to weekend02 closed.

    4、Hadoop配置

    此部分主要描述对Hadoop各个配置文件的修改。

    [hadoop@weekend01 hadoop]$ cat hdfs-site.xml

    <configuration>

    <property>

    <name>dfs.namenode.dir</name>

    <value>/weekend01/namenomde/dir</value>

    </property>

    <property>

    <name>dfs.blocksize</name>

    <value>268435456</value>

    </property>

    <property>

    <name>dfs.namenode.handler.count</name>

    <value>100</value>

    </property>

    <property>

    <name>dfs.datanode.data.dir</name>

    <value>/weekend01.datanode/dir</value>

    </property>

    </configuration>

    [hadoop@weekend01 hadoop]$ cat -n hadoop-env.sh

    26 export JAVA_HOME=/usr/local/src/jdk1.8.0_92

    [hadoop@weekend01 hadoop]$ cat core-site.xml

    <configuration>

    <!-- 指定hdfs的nameservice为weekend01 -->

    <property>

    <name>fs.defaultFS</name>

    <value>hdfs://192.168.1.60:9000</value>

    </property>

    <property>

    <name>io.file.buffer.size</name>

    <value>131072</value>

    </property>

    <!-- 指定hadoop临时目录 -->

    <!--<property>

    <name>hadoop.tmp.dir</name>

    <value>/home/hadoop/app/hadoop-2.4.1/tmp</value>

    </property>

    -->

    <!-- 指定zookeeper地址 -->

    <!--<property>

    <name>ha.zookeeper.quorum</name>

    <value>weekend05:2181,weekend06:2181,weekend07:2181</value>

    </property>-->

    </configuration>

    [hadoop@weekend01 hadoop]$ cat mapred-site.xml

    <configuration>

    <!-- 指定mr框架为yarn方式 -->

    <property>

    <name>mapreduce.framework.name</name>

    <value>yarn</value>`

    </property>

    </configuration>

    [hadoop@weekend01 hadoop]$ cat slaves

    weekend05

    weekend06

    weekend07

    [hadoop@weekend01 hadoop]$ cat yarn-site.xml

    <configuration>

    <!-- Site specific YARN configuration properties -->

    <property>

    <name>yarn.acl.enable</name>

    <value>false</value>

    </property>

    <property>

    <name>yarn.admin.acl</name>

    <value>*</value>

    </property>

    <property>

    <name>yarn.log-aggreationanable</name>

    <value>false</value>

    </property>

    <property>

    <name>yarn.resourcemanager.address</name>

    <value>192.168.1.60:9000</value>

    </property>

    <property>

    <name>yarn.resourcemanager.scheduler.address</name>

    <value>192.168.1.60:9000</value>

    </property>

    <property>

    <name>yarn.resourcemanager.resourcetracker.address</name>

    <value>192.168.1.60:9000</value>

    </property>

    <property>

    <name>yarn.resourcemanager.admin.address</name>

    <value>192.168.1.60:9000</value>

    </property>

    <property>

    <name>yarn.resourcemanager.webapp.address</name>

    <value>192.168.1.60:9000</value>

    </property>

    <property>

    <name>yarn.resourcemanager.hostname</name>

    <value>weekend01</value>

    </property>

    <property>

    <name>yarn.resourcemanager.scheduler,class</name>

    <value>CapacityScheduler</value>

    </property>

    <property>

    <name>yarn.</name>

    <value></value>

    </property>

    <property>

    <name>yarn.resourcemanager.</name>

    <value></value>

    </property>

    <property>

    <name>yarn.resourcemanager.</name>

    <value></value>

    </property>

    <property>

    <name>yarn.resourcemanager.</name>

    <value></value>

    </property>

    <property>

    <name>yarn.resourcemanager.</name>

    <value></value>

    </property>

    </configuration>

    5、Hadoop测试

    此部分主要对Hadoop进行测试。

    格式化HDFS文件系统,在weekend01上

    [hadoop@weekend01 ~]$ hdfs namenode -format

    …..

    16/11/08 11:00:37 INFO common.Storage: Storage directory /tmp/hadoop-hadoop/dfs/name has been successfully formatted #这句话是格式化成功的标志

    …….

    然后在weekend05、weekend06、weekend07上分别启动zookeeper集群和journalnode进程

    [hadoop@weekend05 ~]$ zkServer.sh start

    JMX enabled by default

    Using config: /usr/local/src/zookeeper-3.4.6/bin/../conf/zoo.cfg

    Starting zookeeper ... STARTED

    [hadoop@weekend05 ~]$ hadoop-daemon.sh start journalnode

    starting journalnode, logging to /usr/local/src/hadoop-2.6.4/logs/hadoop-hadoop-journalnode-weekend05.out

    [hadoop@weekend05 ~]$ jps #看到这三个进程标志着启动成功

    4833 JournalNode

    4881 Jps

    4779 QuorumPeerMain

    然后在weekend01上格式化ZKFC

    [hadoop@weekend01 ~]$ hdfs zkfc -formatZK

    [hadoop@weekend01 ~]$ jps

    4420 SecondaryNameNode

    5148 Jps

    4141 NameNode

    [hadoop@weekend03 ~]$ start-yarn.sh

    [hadoop@weekend03 ~]$ jps

    4837 ResourceManager

    4902 Jps

    [hadoop@weekend01 ~]$ start-dfs.sh

    [hadoop@weekend05 ~]$ jps

    4833 JournalNode

    4948 DataNode

    5530 Jps

    4779 QuorumPeerMain

    5403 NodeManager

    [hadoop@weekend06 ~]$ jps

    2464 QuorumPeerMain

    5448 NodeManager

    2520 JournalNode

    4953 DataNode

    5581 Jps

    [hadoop@weekend07 ~]$ jps

    4864 QuorumPeerMain

    5713 Jps

    5581 NodeManager

    5086 DataNode

    4974 JournalNode

    image

    二、MapReduce应用

    1、应用描述

    使用hadoop进行数据统计,并做去重处理,该实验由于采用高可用避免了集群的单点故障,可以有效避免由于namenode单点故障引起的集群崩溃

    2、数据准备

    image

    呼出终端,输入下面指令:

    bin/hadoop fs -mkdir hdfsInput

    执行这个命令时可能会提示类似安全的问题,如果提示了,请使用

    bin/hadoop dfsadmin -safemode leave

    来退出安全模式。

    意思是在HDFS远程创建一个输入目录,我们以后的文件需要上载到这个目录里面才能执行。

    在终端依次输入下面指令:

    cd hadoop-1.2.1

    bin/hadoop fs -put file/myTest*.txt hdfsInput

    image

    3、设计思路

    1)将文件拆分成splits,由于测试用的文件较小,所以每个文件为一个split,并将文件按行分割形成<key,value>对,key为偏移量(包括了回车符),value为文本行。这一步由MapReduce框架自动完成,如下图:

    image

    2)将分割好的<key,value>对交给用户定义的map方法进行处理,生成新的<key,value>对,如下图所示:

    image

    3)得到map方法输出的<key,value>对后,Mapper会将它们按照key值进行排序,并执行Combine过程,将key值相同的value值累加,得到Mapper的最终输出结果。如下图:

    image

    4)Reducer先对从Mapper接收的数据进行排序,再交由用户自定义的reduce方法进行处理,得到新的<key,value>对,并作为WordCount的输出结果,如下图:

    image

    4、程序代码

    package com.felix;

    import java.io.IOException;

    import java.util.Iterator;

    import java.util.StringTokenizer;

    import org.apache.hadoop.fs.Path;

    import org.apache.hadoop.io.IntWritable;

    import org.apache.hadoop.io.LongWritable;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapred.FileInputFormat;

    import org.apache.hadoop.mapred.FileOutputFormat;

    import org.apache.hadoop.mapred.JobClient;

    import org.apache.hadoop.mapred.JobConf;

    import org.apache.hadoop.mapred.MapReduceBase;

    import org.apache.hadoop.mapred.Mapper;

    import org.apache.hadoop.mapred.OutputCollector;

    import org.apache.hadoop.mapred.Reducer;

    import org.apache.hadoop.mapred.Reporter;

    import org.apache.hadoop.mapred.TextInputFormat;

    import org.apache.hadoop.mapred.TextOutputFormat;

    /**

    *

    * 描述:WordCount explains by Felix

    * @author Hadoop Dev Group

    */

    public class WordCount

    {

    /**

    * MapReduceBase类:实现了Mapper和Reducer接口的基类(其中的方法只是实现接口,而未作任何事情)

    * Mapper接口:

    * WritableComparable接口:实现WritableComparable的类可以相互比较。所有被用作key的类应该实现此接口。

    * Reporter 则可用于报告整个应用的运行进度,本例中未使用。

    *

    */

    public static class Map extends MapReduceBase implements

    Mapper<LongWritable, Text, Text, IntWritable>

    {

    /**

    * LongWritable, IntWritable, Text 均是 Hadoop 中实现的用于封装 Java 数据类型的类,这些类实现了WritableComparable接口,

    * 都能够被串行化从而便于在分布式环境中进行数据交换,你可以将它们分别视为long,int,String 的替代品。

    */

    private final static IntWritable one = new IntWritable(1);

    private Text word = new Text();

    /**

    * Mapper接口中的map方法:

    * void map(K1 key, V1 value, OutputCollector<K2,V2> output, Reporter reporter)

    * 映射一个单个的输入k/v对到一个中间的k/v对

    * 输出对不需要和输入对是相同的类型,输入对可以映射到0个或多个输出对。

    * OutputCollector接口:收集Mapper和Reducer输出的<k,v>对。

    * OutputCollector接口的collect(k, v)方法:增加一个(k,v)对到output

    */

    public void map(LongWritable key, Text value,

    OutputCollector<Text, IntWritable> output, Reporter reporter)

    throws IOException

    {

    String line = value.toString();

    StringTokenizer tokenizer = new StringTokenizer(line);

    while (tokenizer.hasMoreTokens())

    {

    word.set(tokenizer.nextToken());

    output.collect(word, one);

    }

    }

    }

    public static class Reduce extends MapReduceBase implements

    Reducer<Text, IntWritable, Text, IntWritable>

    {

    public void reduce(Text key, Iterator<IntWritable> values,

    OutputCollector<Text, IntWritable> output, Reporter reporter)

    throws IOException

    {

    int sum = 0;

    while (values.hasNext())

    {

    sum += values.next().get();

    }

    output.collect(key, new IntWritable(sum));

    }

    }

    public static void main(String[] args) throws Exception

    {

    /**

    * JobConf:map/reduce的job配置类,向hadoop框架描述map-reduce执行的工作

    * 构造方法:JobConf()、JobConf(Class exampleClass)、JobConf(Configuration conf)等

    */

    JobConf conf = new JobConf(WordCount.class);

    conf.setJobName("wordcount"); //设置一个用户定义的job名称

    conf.setOutputKeyClass(Text.class); //为job的输出数据设置Key类

    conf.setOutputValueClass(IntWritable.class); //为job输出设置value类

    conf.setMapperClass(Map.class); //为job设置Mapper类

    conf.setCombinerClass(Reduce.class); //为job设置Combiner类

    conf.setReducerClass(Reduce.class); //为job设置Reduce类

    conf.setInputFormat(TextInputFormat.class); //为map-reduce任务设置InputFormat实现类

    conf.setOutputFormat(TextOutputFormat.class); //为map-reduce任务设置OutputFormat实现类

    /**

    * InputFormat描述map-reduce中对job的输入定义

    * setInputPaths():为map-reduce job设置路径数组作为输入列表

    * setInputPath():为map-reduce job设置路径数组作为输出列表

    */

    FileInputFormat.setInputPaths(conf, new Path(args[0]));

    FileOutputFormat.setOutputPath(conf, new Path(args[1]));

    JobClient.runJob(conf); //运行一个job

    }

    }

    package com.hadoop.sample;

    import java.io.IOException;

    import java.util.StringTokenizer;

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.fs.Path;

    import org.apache.hadoop.io.IntWritable;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Job;

    import org.apache.hadoop.mapreduce.Mapper;

    import org.apache.hadoop.mapreduce.Reducer;

    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import org.apache.hadoop.util.GenericOptionsParser;

    public class WordCount {

    //继承mapper接口,设置map的输入类型为<Object,Text>

    //输出类型为<Text,IntWritable>

    public static class Map extends Mapper<Object,Text,Text,IntWritable>{

    //one表示单词出现一次

    private static IntWritable one = new IntWritable(1);

    //word存储切下的单词

    private Text word = new Text();

    public void map(Object key,Text value,Context context) throws IOException,InterruptedException{

    //对输入的行切词

    StringTokenizer st = new StringTokenizer(value.toString());

    while(st.hasMoreTokens()){

    word.set(st.nextToken());//切下的单词存入word

    context.write(word, one);

    }

    }

    }

    //继承reducer接口,设置reduce的输入类型<Text,IntWritable>

    //输出类型为<Text,IntWritable>

    public static class Reduce extends Reducer<Text,IntWritable,Text,IntWritable>{

    //result记录单词的频数

    private static IntWritable result = new IntWritable();

    public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException{

    int sum = 0;

    //对获取的<key,value-list>计算value的和

    for(IntWritable val:values){

    sum += val.get();

    }

    //将频数设置到result

    result.set(sum);

    //收集结果

    context.write(key, result);

    }

    }

    /**

    * @param args

    */

    public static void main(String[] args) throws Exception{

    // TODO Auto-generated method stub

    Configuration conf = new Configuration();

    //检查运行命令

    String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();

    if(otherArgs.length != 2){

    System.err.println("Usage WordCount <int> <out>");

    System.exit(2);

    }

    //配置作业名

    Job job = new Job(conf,"word count");

    //配置作业各个类

    job.setJarByClass(WordCount.class);

    job.setMapperClass(Map.class);

    job.setCombinerClass(Reduce.class);

    job.setReducerClass(Reduce.class);

    job.setOutputKeyClass(Text.class);

    job.setOutputValueClass(IntWritable.class);

    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

    System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

    }

    5、运行结果用户登录

    在终端输入下面指令:

    bin/hadoop jar hadoop-examples-1.2.1.jar wordcount hdfsInput hdfsOutput

    注意,这里的示例程序是1.2.1版本的,可能每个机器有所不一致,那么请用*通配符代替版本号

    bin/hadoop jar hadoop-examples-*.jar wordcount hdfsInput hdfsOutput

    应该出现下面结果:

    image

    Hadoop命令会启动一个JVM来运行这个MapReduce程序,并自动获得Hadoop的配置,同时把类的路径(及其依赖关系)加入到Hadoop的库中。以上就是Hadoop Job的运行记录,从这里可以看到,这个Job被赋予了一个ID号:job_201202292213_0002,而且得知输入文件有两个(Total input paths to process : 2),同时还可以了解map的输入输出记录(record数及字节数),以及reduce输入输出记录。

    查看HDFS上hdfsOutput目录内容:

    在终端输入下面指令:

    bin/hadoop fs -ls hdfsOutput

    image

    从上图中知道生成了三个文件,我们的结果在"part-r-00000"中。

    使用下面指令查看结果输出文件内容

    bin/hadoop fs -cat output/part-r-00000

    image

    注:文中部分图引自其他博文,此文仅作为实验笔记!如有侵犯请邮箱至1255560195@qq.com,未经允许不得转载!

  • 相关阅读:
    golng切片实现分页
    go mgo包 简单封装 mongodb 数据库驱动
    docker 制作自己的镜像
    mongodb 数据库操作——备份 还原 导出 导入
    Override
    Parallel.For循环与普通的for循环的比较
    C#死锁案例代码
    C#的构造函数在基类和父类中执行顺序
    C#构造函数
    C# 多线程的死锁
  • 原文地址:https://www.cnblogs.com/zd520pyx1314/p/7246491.html
Copyright © 2011-2022 走看看