zoukankan      html  css  js  c++  java
  • hadoop上的C++程序开发

    hadoop可以用C++开发,命令运行方式为pipes,例子:hadoop pipes -conf job_config.xml -input input/myfile.txt -output output -program bin/wordcount

    另外,还有一种streaming方式(?)

    运行java 程序,是打成jar包,使用hadoop jar命令,如果"hadoop jar 程序.jar mainclass arguments"  

    引自网络具体讲解:

    HCE, short for Hadoop c++ extension

    据说效率可以比传统Hadoop提高20%以上,计划过几天用倒排索引测试其效率。暂定使用3台节点,每个节点16核cpu。


    一天半的时间学习hadoop和hce的部署,并在CentOS5.4上成功部署伪分布式hce,提交自己编译通过的mapreduce程序wordcount,得到正确结果。


    配置过程以及遇到的问题:

    下载hce源码后,编译过程中遇到如下错误:

    1.多余的名称限定:HCE:Compressor 解决方法: 在代码中去掉限定HCE

    代码位置:src/c++/hce/impl/Compressor


    2.找不到符号:htons 解决方法: 改变引用的头文件。不要使用系统相关的头文件,即 linux/目录下的。
    #include <linux/in.h>
    #include <linux/in6.h>
    注释,增加 #include <netinet/in.h>

    链接时可能遇到找不到 -lncurses的错误
    需要安装ncurses-devel。对于centos,可使用yum安装。

    编译成功后生成build目录下的若干文件


    然后是配置运行阶段:

    配置conf/ 下的core-site.xml mapred-site.xml hdfs-site.xml

    主要是配置各个服务的IP地址和端口,hadoop的各个服务将在配置的地址上开启。

    运行阶段很容易发生无法正常启动某daemon的现象,这里的错误原因可能性比较多,推荐使用一种虽然繁琐但比较保险的做法:按顺序分别启动服务

    首先要格式化hdfs,bin/hadoop namenode -format

    然后按顺序启动daemons,hadoop主要包括四个daemons: namenode, datanode, jobtracker, tasktracker

    按顺序启动:

    bin/hadoop-daemon start namenode

    bin/hadoop-daemon start datanode

    bin/hadoop-daemon start jobtracker

    bin/hadoop-daemon start tasktracker

    可以边启动边去logs里查看日志,看是否启动成功。


    启动成功后,使用bin/hadoop fs 系列命令建立好输入/出目录input/output, 将输入文件上传hdfs。


    然后该编写我们的c++版的mapreduce程序wordcount了,代码如下:

    #include "hadoop/Hce.hh"

    class WordCountMap: public HCE::Mapper {
    public:
    HCE::TaskContext::Counter* inputWords;
    int64_t setup() {
    inputWords = getContext()->getCounter("WordCount",
    "Input Words");
    return 0;
    }
    int64_t map(HCE::MapInput &input) {
    int64_t size = 0;
    const void* value = input.value(size);
    if ((size > 0) && (NULL != value)) {
    char* text = (char*)value;
    const int n = (int)size;
    for (int i = 0; i < n;) {
    // Skip past leading whitespace
    while ((i < n) && isspace(text[i])) i++;
    // Find word end
    int start = i;
    while ((i < n) && !isspace(text[i])) i++;
    if (start < i) {
    emit(text + start, i-start, "1", 1);
    getContext()->incrementCounter(inputWords, 1);
    }
    }
    }
    return 0;
    }
    int64_t cleanup() {
    return 0;
    }
    };

    const int INT64_MAXLEN = 25;
    int64_t toInt64(const char *val) {
    int64_t result;
    char trash;
    int num = sscanf(val, "%ld%c", &result, &trash);
    return result;
    }
    class WordCountReduce: public HCE::Reducer {
    public:
    HCE::TaskContext::Counter* outputWords;
    int64_t setup() {
    outputWords = getContext()->getCounter("WordCount",
    "Output Words");
    return 0;
    }
    int64_t reduce(HCE::ReduceInput &input) {
    int64_t keyLength;
    const void* key = input.key(keyLength);
    int64_t sum = 0;
    while (input.nextValue()) {
    int64_t valueLength;
    const void* value = input.value(valueLength);
    sum += toInt64((const char*)value);
    }
    char str[INT64_MAXLEN];
    int str_len = snprintf(str, INT64_MAXLEN, "%ld", sum);
    getContext()->incrementCounter(outputWords, 1);
    emit(key, keyLength, str, str_len);
    }
    int64_t cleanup() {
    return 0;
    }
    };

    int main(int argc, char *argv[]) {
    return HCE::runTask(
    //TemplateFactory sequence is Mapper, Reducer,
    // Partitioner, Combiner, Committer,
    // RecordReader, RecordWriter
    HCE::TemplateFactory<WordCountMap, WordCountReduce,
    void, void, void, void, void>()
    );
    }

    Makefile如下:

    HADOOP_HOME = ../hadoop-0.20.3/build
    JAVA_HOME = ../java6
    INCLUDEDIR = ../hadoop-0.20.3/build/c++/Linux-amd64-64/include
    LIBDIR = ../hadoop-0.20.3/build/c++/Linux-amd64-64/lib

    CXX=g++
    RM=rm -f
    INCLUDEDIR = -I${HADOOP_HOME}/c++/Linux-amd64-64/include
    LIBDIR = -L${HADOOP_HOME}/c++/Linux-amd64-64/lib
    -L${JAVA_HOME}/jre/lib/amd64/server
    CXXFLAGS = ${INCLUDEDIR} -g -Wextra -Werror
    -Wno-unused-parameter -Wformat
    -Wconversion -Wdeprecated
    LDLIBS = ${LIBDIR} -lhce -lhdfs -ljvm
    all : wordcount-demo
    wordcount-demo : wordcount-demo.o
    $(CXX) -o $@ $^ $(LDLIBS) $(CXXFLAGS)
    clean:
    $(RM) *.o wordcount-demo


    编译成功后就可以提交hce作业了:

    bin/hadoop hce -input /input/test -output /output/out1 -program wordcount-demo -file wordcount-demo -numReduceTasks 1

    这里使用到的输入文件 input/test内容如下:

    The quick brown fox jumps over the lazy dog.
    The quick brown fox jumps over the lazy dog.
    The quick brown fox jumps over the lazy dog.
    The quick brown fox jumps over the lazy dog.
    The quick brown fox jumps over the lazy dog.
    The quick brown fox jumps over the lazy dog.
    The quick brown fox jumps over the lazy dog.
    The quick brown fox jumps over the lazy dog.
    The quick brown fox jumps over the lazy dog.
    The quick brown fox jumps over the lazy dog.


    提交作业后可能遇到错误:job not successful

    查看日志,有如下错误提示:

    stderr logs:

    ..........

    HCE_FATAL 08-10 12:13:51 [/home/shengeng/hce/hadoop_hce_v1/hadoop-0.20.3/src/c++/hce/impl/MapRed/Hce.cc][176][runTask] error when parsing UgiInfo at /home/shengeng/hce/hadoop_hce_v1/hadoop-0.20.3/src/c++/hce/impl/MapRed/HadoopCommitter.cc:247  in virtual bool HCE::HadoopCommitter::needsTaskCommit() syslog logs: ....................... 
    2011-08-10 12:13:51,450 ERROR org.apache.hadoop.mapred.hce.BinaryProtocol: java.io.EOFException at java.io.DataInputStream.readByte(DataInputStream.java:250) at org.apache.hadoop.io.WritableUtils.readVLong(WritableUtils.java:298) at org.apache.hadoop.io.WritableUtils.readVInt(WritableUtils.java:319) at org.apache.hadoop.mapred.hce.BinaryProtocol$UplinkReaderThread.run(BinaryProtocol.java:112) 2011-08-10 12:13:51,450 ERROR org.apache.hadoop.mapred.hce.Application: Aborting because of java.io.EOFException at java.io.DataInputStream.readByte(DataInputStream.java:250) at org.apache.hadoop.io.WritableUtils.readVLong(WritableUtils.java:298) at org.apache.hadoop.io.WritableUtils.readVInt(WritableUtils.java:319) at org.apache.hadoop.mapred.hce.BinaryProtocol$UplinkReaderThread.run(BinaryProtocol.java:112) 2011-08-10 12:13:51,450 INFO org.apache.hadoop.mapred.hce.BinaryProtocol: Sent abort command 2011-08-10 12:13:51,496 WARN org.apache.hadoop.mapred.TaskTracker: Error running child java.io.IOException: hce child exception at org.apache.hadoop.mapred.hce.Application.abort(Application.java:325) at org.apache.hadoop.mapred.hce.HceMapRunner.run(HceMapRunner.java:87) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:369) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307) at org.apache.hadoop.mapred.Child.main(Child.java:170) Caused by: java.io.EOFException at java.io.DataInputStream.readByte(DataInputStream.java:250) at org.apache.hadoop.io.WritableUtils.readVLong(WritableUtils.java:298) at org.apache.hadoop.io.WritableUtils.readVInt(WritableUtils.java:319) at org.apache.hadoop.mapred.hce.BinaryProtocol$UplinkReaderThread.run(BinaryProtocol.java:112) 2011-08-10 12:13:51,500 INFO org.apache.hadoop.mapred.TaskRunner: Runnning cleanup for the task 

    根据日志定位到代码:
    在HadoopCommitter.cc中,
    bool HadoopCommitter::needsTaskCommit()
    
    
    string ugiInfo = taskContext->getJobConf()->get("hadoop.job.ugi"); //这里去找hadoop.job.ugi这个配置项但是默认的hce配置文件中没有此项
      words = HadoopUtils::splitString(ugiInfo, ",");
      HADOOP_ASSERT(words.size() ==2, "error when parsing UgiInfo"); //所以在这里抛出异常了
    在hdfs-site.xml中添加配置项: <property>   <name>hadoop.job.ugi</name>   <value>hadoop,supergroup</value> </property>
    又观察代码可以推断,此配置项在hce中并未生效,在needsTaskCommit()函数中仅仅是去读取了此配置项,但未使用到其值。


     

  • 相关阅读:
    5359. 最大的团队表现值
    Trie树模板
    [NOIP 2009] 细胞分裂(选做)
    【SpringCloud】07.应用间的通信
    【SpringCloud】06.Eureka 总结
    【SpringCloud】05.Eureka的高可用
    【SpringCloud】04.SpringCloud Eureka Server与Client的创建
    【SpringCloud】03.微服务的设计原则
    【SpringCloud】02.微服务与SpringCloud
    【SpringCloud】01.常见软件架构的区别
  • 原文地址:https://www.cnblogs.com/cl1024cl/p/6205577.html
Copyright © 2011-2022 走看看