zoukankan      html  css  js  c++  java
  • hadoop上C++开发两种方式的例子

    百度在使用Hadoop过程中同样发现了Hadoop因为Java语言带来的低效问题,并对Hadoop进行扩展

    而在此之前,百度也尝试了 Hadoop PIPES 和 Hadoop Streamming,但是发现这些问题:
    - 这两种方案都无法很好地控制Child JVM(Map TaskTracker和Reduce TaskTracker)内存的使用,这部分都由JVM自己控制,而你能做的就只是使用-Xmx设置内存限制而已;
    - 这两种方案都只能影响到Mapper和Reducer回调函数,而真正影响性能的sort和shuffle过程依然在Java实现的TaskTracker中执行完成;
    - 数据流问题。两种方案中,数据处理流都必须由TaskTracker流向Mapper或者Reducer然后再流回来。而无论是使用pipeline还是socket方式传递数据,都难以避免数据的移动。对于大规模数据处理,其代价是不可忽视的。

    究其根本,实际上是C++模块所承担的逻辑太少。于是百度提出了更彻底的方案,即"Hadoop C++ Extention",该方案中C++代码对Hadoop入侵得更多。它将原来TaskTracker中完成的数据处理工作都交给C++模块去完成,而只让其负责协议通信和控制等。如此一来,上面的问题就都解决了:
    - TaskTracker JVM只负责少量通信工作,其内存需求很小并且可以预见,从而容易控制,譬如设为-Xmx100m就足够了;
    - sort和shuffle过程都使用C++模块实现,性能得到提高;
    - 数据在其整个生命周期都只在C++模块中,避免不必要的移动。
    这就犹如将C++模块的战线往前推进了。当然,也许在很多人看来,这只是五十步与百步的区别,但是这多出来的五十步,

    两种的方式的具体例子:

    一,Pipes方式:

    首先,建立相应的目录:

    > hadoop fs –mkdir name

    > hadoop fs –mkdir name/input

    >hadoop fs –put file1.txt file2.txt name/input

    1、编写程序(wordcount.cpp

    #include<algorithm>

    #include<limits>

    #include<string>

    #include"stdint.h"

    #include"hadoop/Pipes.hh"

    #include"hadoop/TemplateFactory.hh"

    #include"hadoop/StringUtils.hh"

    usingnamespace std;

    class WordCountMapper:publicHadoopPipes::Mapper

    {

    public:

    WordCountMapper(HadoopPipes::TaskContext&context){}

    void map(HadoopPipes::MapContext& context)

    {

    string line =context.getInputValue();

    vector<string>word = HadoopUtils::splitString(line," ");

    for (unsignedint i=0; i<word.size(); i++)

    {

    context.emit(word[i],HadoopUtils::toString(1));

    }

    }

    };

    class WordCountReducer:publicHadoopPipes::Reducer

    {

    public:

    WordCountReducer(HadoopPipes::TaskContext&context){}

    void reduce(HadoopPipes::ReduceContext& context)

    {

    int count = 0;

    while (context.nextValue())

    {

    count +=HadoopUtils::toInt(context.getInputValue());

    }

    context.emit(context.getInputKey(),HadoopUtils::toString(count));

    }

    };

    int main(int argc,char **argv)

    {

    returnHadoopPipes::runTask(HadoopPipes::TemplateFactory<WordCountMapper,WordCountReducer>());

    }

    2、编写makefile

    CC = g++

    HADOOP_INSTALL =../../data/users/hadoop/hadoop/

    PLATFORM = Linux-amd64-64

    CPPFLAGS = -m64-I$(HADOOP_INSTALL)/c++/$(PLATFORM)/include

    wordcount:wordcount.cpp

    $(CC) $(CPPFLAGS) $< -Wall -L$(HADOOP_INSTALL)/c++/$(PLATFORM)/lib-lhadooppipes -lhadooputils -lpthread -g -O2 -o $@

    3、编译程序并且放入hadoop系统

    > make wordcount

    > hadoop fs –put wordcount name/worcount

    4、编写配置文件(job_config.xml

    <?xml version="1.0"?>

    <configuration>

    <property>

    <name>mapred.job.name</name>

    <value>WordCount</value>

    </property>

    <property>

    <name>mapred.reduce.tasks</name>

    <value>10</value>

    </property>

    <property>

    <name>mapred.task.timeout</name>

    <value>180000</value>

    </property>

    <property>

    <name>hadoop.pipes.executable</name>

    <value>/user/hadoop/name/wordcount</value>

    <description> Executable path is given as"path#executable-name"

    sothat the executable will havea symlink in working directory.

    This can be used for gdbdebugging etc.

    </description>

    </property>

    <property>

    <name>mapred.create.symlink</name>

    <value>yes</value>

    </property>

    <property>

    <name>hadoop.pipes.java.recordreader</name>

    <value>true</value>

    </property>

    <property>

    <name>hadoop.pipes.java.recordwriter</name>

    <value>true</value>

    </property>

    </configuration>

    <property>
    <name>mapred.child.env</name>
    <value>LD_LIBRARY_PATH=/data/lib</value> <!--如果用到动态库: lib库的路径,要保证每台机器上都有 -->
    <description>User added environment variables for the task tracker child
    processes. Example :
    1) A=foo This will set the env variable A to foo
    2) B=$B:c This is inherit tasktracker's B env variable.
    </description>
    </property>

    <property>
    <name>mapred.cache.files</name>
    <value>/user/hadoop/name/data#data</value> <!--如果用到外部文件:hadoop上的data路径,程序中fopen("data/file.txt", "r") -->
    </property>

    5、运行程序

    > hadoop pipes -conf ./job_config.xml -input/user/hadoop/name/input/* -output /user/hadoop/name/output -program/user/hadoop/name/wordcount

    (注:output文件夹在运行前不能建立,系统会自己建立)

    这个例子很简单,只是统计词频,但是,实际的数据挖掘比较复杂,尤其涉及到中文,很多情况下要进行分词,那就要初始化一些分词句柄及空间,然后分词处理,其实可以将MapReduce程序看成普通的C++程序,要初始化东西,放到构造函数,具体处理放到MapReduce里。


    二,Streaming方式:

    1、 首先编写map程序(map.cpp

    #include <string>

    #include <iostream>

    using namespace std;

    int main()

    {

    string line;

    while(cin>>line)//如果是中文的话,用fgets(char*, int n, stdin)读进来,再分词处理

    {

    cout<<line<<" "<<1<<endl;

    }

    return 0;

    }

    >>g++ -o map map.cpp

    2、 编写reduce程序(reduce.cpp

    #include <map>

    #include <string>

    #include <iostream>

    using namespace std;

    int main()

    {

    string key;

    string value;

    map<string,int> word_count;

    map<string,int> :: iterator it;

    while(cin>>key)

    {

    cin>>value;

    it= word_count.find(key);

    if(it!= word_count.end())

    {

    ++(it->second);

    }

    else

    {

    word_count.insert(make_pair(key,1));

    }

    }

    for(it= word_count.begin(); it != word_count.end(); ++it)

    cout<<it->first<<" "<<it->second<<endl;

    return 0;

    }

    >>g++ -o reduce reduce.cpp

    3、 需要统计的文件,并提交至hadoop

    File1.txthello hadoop helloworld

    File2.txtthis is a firsthadoop

    >>hadoop fs –put File1.txt File2.txt ans

    4、 运行程序

    >>hadoop jar /data/users/hadoop/hadoop/contrib/streaming/hadoop-streaming-0.20.9.jar-file map -file reduce -input ans/* -output output1 -mapper /data/name/hadoop_streaming/map -reducer /data/name/hadoop_streaming/reduce
  • 相关阅读:
    IPC(进程间通信)
    进程和线程是什么
    虚拟内存
    寄存器是什么?内存、寄存器和存储器的区别
    计算机资源 —硬件资源分配
    如何将一个网页中自己想要的数据导入到Excel表格中
    Putty的安装和使用
    SQL中的ON DUPLICATE KEY UPDATE使用详解
    sql:主键(primary key)和唯一索引(unique index)区别
    直接扩频通信(上)理论基础
  • 原文地址:https://www.cnblogs.com/cl1024cl/p/6205576.html
Copyright © 2011-2022 走看看