zoukankan      html  css  js  c++  java
  • HiBench算法简介


    Hibench 包含9个典型的hadoop负载(micro benchmarks,hdfs benchmarks,web search bench marks,machine learning benchmarks和data analytics benchmarks)


    • micro benchmarks 
      Sort:使用hadoop randomtextwriter生成数据,并对数据进行排序。 
      Wordcount:统计输入数据中每个单词的出现次数,输入数据使用hadoop randomtextwriter生成。 
      TeraSort:输入数据由hadoop teragen产生,通过key值进行排序。

    • hdfs benchmarks 

    • web search bench marks 
      Nutch indexing:大规模收索引擎,这个是负载测试nutch(apache的一个开源搜索引擎)的搜索子系统,使用自动生成的web数据,web数据中的连接和单词符合zipfian分布(一个单词出现的次数与它在频率表的排名成反比) 

    • machine learning benchmarks 
      Mahout bayesian classification(bayes):大规模机器学习,这个负载测试mahout(apache开源机器学习库)中的naive bayesian 训练器,输入的数据是自动生成的文档,文档中的单词符合zipfian分布。 
      Mahout k-means clustering(kmeans):测试mahout中的k-means聚类算法,输入的数据集由基于平均分布和高斯分布的genkmeansdataset产生。

    • data analytics benchmarks 
      Hive query benchmarks(hivebench):包含执行的典型olap查询的hive查询(aggregation和join),使用自动生成的web数据,web数据的链接符合zipfian分布。

    注:使用的生成数据程序在hadoop-mapreduce-examples-2.6.0 jar 包内,可以使用反编译工具查看。



    1. 主要流程为conf下配置测试项,测试语言和DataSize,然后运行bin下run-all.sh完成一次测试,此流程为手动完成,可以编写脚本重复此步骤完成多次测试减少手动操作; 
    #       Time: 20160930,created by sunfei
    #       Describe: automatic run the hibench
    #       Functions :
    #            search(): Find the style of application in the  99-user_defined_properties.conf,eg:tiny,small..
    #                               exec_application_noSQL(): run the application for times,and no use hive
    #                               exec_application_SQL(): run the application for times,and use hive
    #                               save_result(): save the result of application
    #                               main_function(): the main function of running all the appliction
    #                               main(): the main function of running different kind application
            cpu=`grep -c 'model name' /proc/cpuinfo`
            load_15=`uptime | awk '{print $NF}'`
            average_load=`echo "scale=2;a=${load_15}/${cpu};if(length(a)==scale(a)) print 0;print a" | bc`
            date >> datetime-load.txt
            ${average_load} >> cpu-load.txt
            paste datetime-load.txt cpu-load.txt >> load-day.txt
            sed -n '/hibench.scale.profile/p' ${config} >> hibench.txt
            while read line
                            if [ ${line:0:13} = "hibench.scale" ];then
                                            echo -e "33[32m match sucessfull! 33[0m"
            if [ "$var" = "${1}" ];then
                    echo -e "33[31m The style of application can't same,do you want to continue? yes | no 33[0m"
                    read -p "Input your chose :" chose
                    if [ "${chose}" = "no" ];then
                            exit 1
                            echo -e "33[32m The ${1}  style of application will be run! 33[0m"
            if [ -f "hibench.txt" ];then
                            rm -rf "hibench.txt"
                            echo -e "33[32m The hibench.txt has deleted! 33[0m"
            echo -e "33[32m The application will run the "${1}" style 33[0m"
        sed -i "s/${var}/${1}/" ${config}
            for ((i=1;i<=${1};i++))
                            let "var=$i%1"
                            if [ "$var" -eq 0 ];then
                                            hadoop fs -rm  -r hdfs://archive.cloudera.com:8020/user/hdfs/.Trash/*
                                            hadoop fs -rm -r hdfs://archive.cloudera.com:8020/HiBench/*
                            echo -e  "33[32m **********************The current times is ********************:33[0m" ${i}
                            echo -e  "33[32m ********************** The current time is "${i}" ,and it has exec finished successfully! ********************:33[0m"
            echo -e "33[32m *********The application has finished,please modify the configuration!***** 33[0m"
            for ((i=1;i<=${1};i++))
                            echo "drop table uservisits;drop table uservisits_aggre;drop table rankings;drop table rankings_uservisits_join;drop table uservisits_copy;exit;" | /usr/bin/hive
                            let "var=$i%1"
                            if [ "$var" -eq 0 ];then
                                            hadoop fs -rm  -r hdfs://archive.cloudera.com:8020/user/hdfs/.Trash/*
                                            hadoop fs -rm -r hdfs://archive.cloudera.com:8020/HiBench/*
                            echo -e  "33[32m **********************The current times is ********************:33[0m" ${i}
                            echo -e  "33[32m **********************The current time is "${i}" ,and it has exec finished successfully! ********************:33[0m"
            echo -e "33[32m *********The application has finished,please modify the configuration!***** 33[0m"
            if [ -f result.txt ];then
                            rm -rf result.txt
                             echo -e "33[32m The hibench.txt has deleted! 33[0m"
            #select the words in the report
            var1=`date +"%m/%d/%Y-%k:%M:%S"`
            case ${1} in
                    echo -e "33[32m The name of application is wrong,please change it! 33[0m"
            while read line
                            echo $line | sed -n "/${word}/p" >> ${var4}
            done <$filepath
            echo -e "33[32m The job has finished! 33[0m"
            #Input the name of application need to exec
            for appName in aggregation join scan pagerank sleep sort wordcount bayes terasort kmeans
                    echo "The name of application is :"${appName}
                    echo ${appName} > ${appConfig}
                            for style in tiny small large huge gigantic
                                    search ${style}
                                    if [ "aggregation" = ${appName} ] || [ "join" = ${appName} ] || [ "scan" = ${appName} ];then
                                                            exec_application_SQL ${1}
                                                            exec_application_noSQL ${1}
                    save_result ${appName}
            # run the application
            read -p "Input the times of exec: " times
            if [ "${times}" -eq 0 -o "${times}" -gt 60 ];then
                    echo -e "33[31m The times of application can't be empty or gt 60 ! Do you want to continue ? yes | no33[0m"
                    read -p "Input your chose :" chose
                    if [ "${chose}" = "no" ];then
                            exit 1
                            echo -e "33[32m The application will be run ${times} times ! 33[0m"
            echo -e "33[33m Select the style of application : 33[0m 33[31m All | Signal 33[0m"
            read -p "Input your chose :" style
            if [ "${style}" = "" ];then
                    echo -e "33[31m The style of application can't be empty 33[0m"
                    exit 1
            elif [ "${style}" != "All" -a "${style}" != "Signal" ];then
                    echo -e "33[31m The style of application is wrong,please correct! 33[0m"
                    exit 1
                    echo -e "33[32m The style of application is ok ! 33[0m"
            if [ "All" = "${style}" ];then
                    main_function ${times}
                    echo -e "33[033m Input the name of apliaction,eg:33[0m 33[31m aggregation | join | scan | kmeans | pagerank | sleep | sort | wordcount | bayes | terasort33[0m"
                    read -p "Input you chose :" application
                    if [ "${application}" = "" ];then
                                    echo -e "33[31m The name of application can't be empty! 33[0m"
                                    exit 1
                    echo "********************The ${application} will be exec**********************"
                    read -p "Do you want exec all the style of application,eg:tiny,small,large,huge,gigantic? yes | no " chose
                    if [ "${chose}" = "" ];then
                            echo -e "33[31m The style of application can't be empty! 33[0m"
                            exit 1
                    elif [ "yes" != ${chose} ] && [ "no" != ${chose} ];then
                            echo -e "33[31m The style of application is wrong,please correct! 33[0m"
                            exit 1
                            echo -e "33[32m The style of application is ok ! 33[0m"
                    read -p "Input the sytle of application,eg:( tiny small large huge gigantic )!" appStyle
                    echo "***************************The ${appStyle} style will be exec***************************"
                    for appName in ${application}
                            echo ${appName} > ${appConfig}
                            if [ "yes" = "${chose}" ];then
                                    for var in tiny small large huge gigantic
                                            echo "******************The ${appName} will be exec!************************************"
                                            search ${var}
                                            if [ "aggregation" = ${appName} ] || [ "join" = ${appName} ] || [ "scan" = ${appName} ];then
                                                            exec_application_SQL ${times}
                                                            exec_application_noSQL ${times}
                            #       read -p "Input the sytle of application,eg:( tiny small large huge gigantic )!" appStyle
                                    echo "**************************The ${appName} will be exec!************************"
                                    if [ "${appStyle}" = "" ];then
                                                    echo -e "33[31m The style of application can't be empty! 33[0m"
                                                    exit 1
                                    for var in ${appStyle}
                                            search ${var}
                                            if [ "aggregation" = ${appName} ] || [ "join" = ${appName} ] || [ "scan" = ${appName} ];then
                                                    exec_application_SQL ${times}
                                                    exec_application_noSQL ${times}
                            save_result ${appName}
    # the main function of application
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    1. prepare.sh->run.sh为run-all.sh的子流程;
    2. enter_bench->…->leave_bench为prepare.sh和run.sh的子流程;
    3. enter_bench…..gen_report等为workload-functions.sh中的公共函数。



    2.1 数据生成代码分析,接口:HiBench.DataGen


    DataGen类中DataOptions options = new DataOptions(args); 

    case BAYES: {
                    BayesData data = new BayesData(options);
    • 1
    • 2
    • 3
    • 4
    • 5


    package HiBench;
    import java.io.IOException;
    import java.net.URISyntaxException;
    import java.util.Random;
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.FileInputFormat;
    import org.apache.hadoop.mapred.FileOutputFormat;
    import org.apache.hadoop.mapred.JobClient;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapred.MapReduceBase;
    import org.apache.hadoop.mapred.Mapper;
    import org.apache.hadoop.mapred.OutputCollector;
    import org.apache.hadoop.mapred.Reporter;
    import org.apache.hadoop.mapred.SequenceFileOutputFormat;
    import org.apache.hadoop.mapred.lib.NLineInputFormat;
    public class BayesData {
        private static final Log log = LogFactory.getLog(BayesData.class.getName());
        private DataOptions options;
        private Dummy dummy;
        private int cgroups;
        BayesData(DataOptions options) {
            this.options = options;
        private void parseArgs(String[] args) {
            for (int i=0; i<args.length; i++) {
                if ("-class".equals(args[i])) {
                    cgroups = Integer.parseInt(args[++i]);
                } else {
                    DataOptions.printUsage("Unknown bayes data arguments -- " + args[i] + "!!!");
        private static class CreateBayesPages extends MapReduceBase implements
        Mapper<LongWritable, Text, Text, Text> {
            private static final Log log = LogFactory.getLog(CreateBayesPages.class.getName());
            private long pages, slotpages;
            private int groups;
            private HtmlCore generator;
            private Random rand;
            public void configure(JobConf job) {
                try {
                    pages = job.getLong("pages", 0);
                    slotpages = job.getLong("slotpages", 0);
                    groups = job.getInt("groups", 0);
                    generator = new HtmlCore(job);
                } catch (IOException e) {
                    // TODO Auto-generated catch block
            public void map(LongWritable key, Text value,
                    OutputCollector<Text, Text> output, Reporter reporter)
                    throws IOException {
                int slotId = Integer.parseInt(value.toString().trim());
                long[] range = HtmlCore.getPageRange(slotId, pages, slotpages);
                rand = new Random(slotId * 1000 + 101);
                Text k = new Text();
                for (long i=range[0]; i<range[1]; i++) {
                    String classname = "/class" + rand.nextInt(groups);
                    output.collect(k, value);
                    if (0==(i % 10000)) {
                        log.info("still running: " + (i - range[0]) + " of " + slotpages);
        private void setBayesOptions(JobConf job) throws URISyntaxException {
            job.setLong("pages", options.getNumPages());
            job.setLong("slotpages", options.getNumSlotPages());
            job.setInt("groups", cgroups);
            Utils.shareWordZipfCore(options, job);
        private void createBayesData() throws IOException, URISyntaxException {
            log.info("creating bayes text data ... ");
            JobConf job = new JobConf();
            Path fout = options.getResultPath();
            String jobname = "Create bayes data";
            Utils.shareDict(options, job);
            FileInputFormat.setInputPaths(job, dummy.getPath());
            FileOutputFormat.setOutputPath(job, fout);
            log.info("Running Job: " +jobname);
            log.info("Pages file " + dummy.getPath() + " as input");
            log.info("Rankings file " + fout + " as output");
            log.info("Finished Running Job: " + jobname);
        private void init() throws IOException {
            Utils.checkHdfsPath(options.getResultPath(), true);
            Utils.checkHdfsPath(options.getWorkPath(), true);
            dummy = new Dummy(options.getWorkPath(), options.getNumMaps());
            int words = RawData.putDictToHdfs(new Path(options.getWorkPath(), HtmlCore.getDictName()), options.getNumWords());
        public void generate() throws Exception {
        private void close() throws IOException {
            log.info("Closing bayes data generator...");
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168


    [hdfs@sf11 prepare]$ ./prepare.sh 
    patching args= 
    Parsing conf: /opt/HiBench/HiBench-master/conf/00-default-properties.conf 
    Parsing conf: /opt/HiBench/HiBench-master/conf/01-default-streamingbench.conf 
    Parsing conf: /opt/HiBench/HiBench-master/conf/10-data-scale-profile.conf 
    Parsing conf: /opt/HiBench/HiBench-master/conf/20-samza-common.conf 
    Parsing conf: /opt/HiBench/HiBench-master/conf/30-samza-workloads.conf 
    Parsing conf: /opt/HiBench/HiBench-master/conf/99-user_defined_properties.conf 
    Parsing conf: /opt/HiBench/HiBench-master/workloads/bayes/conf/00-bayes-default.conf 
    Parsing conf: /opt/HiBench/HiBench-master/workloads/bayes/conf/10-bayes-userdefine.conf 
    probe sleep jar: /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/share/hadoop/mapreduce2/hadoop-mapreduce-client-jobclient-tests.jar 
    start HadoopPrepareBayes bench 
    /opt/HiBench/HiBench-master/bin/functions/workload-functions.sh: line 120: /dev/stderr: Permission denied 
    rm: `hdfs://archive.cloudera.com:8020/HiBench/Bayes/Input’: No such file or directory 
    Submit MapReduce Job: /opt/cloudera/parcels/CDH/lib/hadoop/bin/hadoop –config /etc/hadoop/conf jar /opt/HiBench/HiBench-master/src/autogen/target/autogen-5.0-SNAPSHOT-jar-with-dependencies.jar HiBench.DataGen -t bayes -b hdfs://archive.cloudera.com:8020/HiBench/Bayes -n Input -m 300 -r 1600 -p 500000 -class 100 -o sequence 
    16/10/21 16:34:02 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 
    16/10/21 16:34:32 INFO HiBench.BayesData: Closing bayes data generator… 
    finish HadoopPrepareBayes bench



    在看了将近两周的HiBench代码进行测试后,终于摸清上述的运行流程,intel 的这个测试框架确实比较简介,通过配置文件和shell以及一些大数据框架自带的例子(如Hibench中的workcount测试就是直接调用hadoop或者spark自带的程序)完成了整个庞大的测试工作,下面我们针对贝叶斯文本分类算法中HiBench使用的三种语言:python,scala,java分别进行分析:

    2.3 python代码分析



    # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    #    http://www.apache.org/licenses/LICENSE-2.0
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    A naive bayes program using MLlib.
    This example requires NumPy (http://www.numpy.org/).
    import sys
    from pyspark import SparkContext
    from pyspark.mllib.util import MLUtils
    from pyspark.mllib.classification import NaiveBayes
    from pyspark.mllib.regression import LabeledPoint
    from pyspark.mllib.linalg import Vectors
    from pyspark.storagelevel import StorageLevel
    from operator import add
    from itertools import groupby
    # Adopted from spark's doc: http://spark.apache.org/docs/latest/mllib-naive-bayes.html
    def parseVector(line):
        return np.array([float(x) for x in line.split(' ')])
    if __name__ == "__main__":
        if len(sys.argv) != 2:
            print >> sys.stderr, "Usage: bayes <file>"
        sc = SparkContext(appName="PythonNaiveBayes")
        filename = sys.argv[1]
        data = sc.sequenceFile(filename, "org.apache.hadoop.io.Text", "org.apache.hadoop.io.Text")
        wordCount = data                                
            .flatMap(lambda (key, doc):doc.split(" "))    
            .map(lambda x:(x, 1))                                
        wordSum = wordCount.map(lambda x:x[1]).reduce(lambda x,y:x+y)
        wordDict = wordCount.zipWithIndex()             
            .map(lambda ((key, count), index): (key, (index, count*1.0 / wordSum)) )             
        sharedWordDict = sc.broadcast(wordDict)
        # for each document, generate vector based on word freq
        def doc2vector(dockey, doc):
            # map to word index: freq
            # combine freq with same word
            docVector = [(key, sum((z[1] for z in values))) for key, values in
                         groupby(sorted([sharedWordDict.value[x] for x in doc.split(" ")],
                                        key=lambda x:x[0]),
                                 key=lambda x:x[0])]
            (indices, values) = zip(*docVector)      # unzip
            label = float(dockey[6:])
            return label, indices, values
        vector = data.map( lambda (dockey, doc) : doc2vector(dockey, doc))
        d = vector.map( lambda (label, indices, values) : indices[-1] if indices else 0)
                  .reduce(lambda a,b:max(a,b)) + 1
    #    print "###### Load svm file", filename
        #examples = MLUtils.loadLibSVMFile(sc, filename, numFeatures = numFeatures)
        examples = vector.map( lambda (label, indices, values) : LabeledPoint(label, Vectors.sparse(d, indices, values)))
        # FIXME: need randomSplit!
        training = examples.sample(False, 0.8, 2)
        test = examples.sample(False, 0.2, 2)
        numTraining = training.count()
        numTest = test.count()
        print " numTraining = %d, numTest = %d." % (numTraining, numTest)
        model = NaiveBayes.train(training, 1.0)
        model_share = sc.broadcast(model)
        predictionAndLabel = test.map( lambda x: (x.label, model_share.value.predict(x.features)))
    #    prediction = model.predict(test.map( lambda x: x.features ))
    #    predictionAndLabel = prediction.zip(test.map( lambda x:x.label ))
        accuracy = predictionAndLabel.filter(lambda x: x[0] == x[1]).count() * 1.0 / numTest
        print "Test accuracy = %s." % accuracy
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103

    2.4 scala 代码分析

    run-spark-job org.apache.spark.examples.mllib.SparseNaiveBayes ${INPUT_HDFS}

    显然scala 的朴素贝叶斯就是调用spark mllib库中的代码了


    2.5 java 代码分析

    run-spark-job com.intel.sparkbench.bayes.JavaBayes ${INPUT_HDFS}


     * Licensed to the Apache Software Foundation (ASF) under one or more
     * contributor license agreements.  See the NOTICE file distributed with
     * this work for additional information regarding copyright ownership.
     * The ASF licenses this file to You under the Apache License, Version 2.0
     * (the "License"); you may not use this file except in compliance with
     * the License.  You may obtain a copy of the License at
     *    http://www.apache.org/licenses/LICENSE-2.0
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
    package com.intel.sparkbench.bayes;
    import org.apache.spark.SparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.broadcast.Broadcast;
    import org.apache.spark.mllib.classification.NaiveBayesModel;
    import org.apache.spark.mllib.linalg.Vectors;
    import org.apache.spark.rdd.RDD;
    import org.apache.spark.storage.StorageLevel;
    import scala.*;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.mllib.regression.LabeledPoint;
    import org.apache.spark.mllib.util.MLUtils;
    import org.apache.spark.mllib.classification.NaiveBayes;
    import org.apache.hadoop.io.Text;
    import java.lang.Boolean;
    import java.lang.Double;
    import java.lang.Long;
    import java.util.*;
    import java.util.regex.Pattern;
     * Adopted from spark's doc: http://spark.apache.org/docs/latest/mllib-naive-bayes.html
    public final class JavaBayes {
      private static final Pattern SPACE = Pattern.compile(" ");
      public static void main(String[] args) throws Exception {
        if (args.length < 1) {
          System.err.println("Usage: JavaBayes <file>");
        Random rand = new Random();
        SparkConf sparkConf = new SparkConf().setAppName("JavaBayes");
        JavaSparkContext ctx = new JavaSparkContext(sparkConf);
    //    int numFeatures = Integer.parseInt(args[1]);
        // Generate vectors according to input documents
        JavaPairRDD<String, String> data = ctx.sequenceFile(args[0], Text.class, Text.class)
                .mapToPair(new PairFunction<Tuple2<Text, Text>, String, String>() {
                    public Tuple2<String, String> call(Tuple2<Text, Text> e) {
                        return new Tuple2<String, String>(e._1().toString(), e._2().toString());
        JavaPairRDD<String, Long> wordCount = data
                .flatMap(new FlatMapFunction<Tuple2<String, String>, String>() {
                    public Iterable<String> call(Tuple2<String, String> e) {
                        return Arrays.asList(SPACE.split(e._2()));
                .mapToPair(new PairFunction<String, String, Long>() {
                    public Tuple2<String, Long> call(String e) {
                        return new Tuple2<String, Long>(e, 1L);
                .reduceByKey(new Function2<Long, Long, Long>() {
                    public Long call(Long i1, Long i2) {
                        return i1 + i2;
          final Long wordSum = wordCount.map(new Function<Tuple2<String, Long>, Long>(){
              public Long call(Tuple2<String, Long> e) {
                  return e._2();
          .reduce(new Function2<Long, Long, Long>() {
              public Long call(Long v1, Long v2) throws Exception {
                  return v1 + v2;
        List<Tuple2<String, Tuple2<Long, Double>>> wordDictList = wordCount.zipWithIndex()
                .map(new Function<Tuple2<Tuple2<String, Long>, Long>, Tuple2<String, Tuple2<Long, Double>>>() {
                    public Tuple2<String, Tuple2<Long, Double>> call(Tuple2<Tuple2<String, Long>, Long> e) throws Exception {
                        String key = e._1()._1();
                        Long count = e._1()._2();
                        Long index = e._2();
                        return new Tuple2<String, Tuple2<Long, Double>>(key, new Tuple2<Long, Double>(index,
                                count.doubleValue() / wordSum));
        Map<String, Tuple2<Long, Double>> wordDict = new HashMap();
        for (Tuple2<String, Tuple2<Long, Double>> item : wordDictList) {
            wordDict.put(item._1(), item._2());
        final Broadcast<Map<String, Tuple2<Long, Double>>> sharedWordDict = ctx.broadcast(wordDict);
        // for each document, generate vector based on word freq
          JavaRDD<Tuple3<Double, Long[], Double[]>> vector = data.map(new Function<Tuple2<String, String>, Tuple3<Double, Long[], Double[]>>() {
              public Tuple3<Double, Long[], Double[]> call(Tuple2<String, String> v1) throws Exception {
                  String dockey = v1._1();
                  String doc = v1._2();
                  String[] keys = SPACE.split(doc);
                  Tuple2<Long, Double>[] datas = new Tuple2[keys.length];
                  for (int i = 0; i < keys.length; i++) {
                      datas[i] = sharedWordDict.getValue().get(keys[i]);
                  Map<Long, Double> vector = new HashMap<Long, Double>();
                  for (int i = 0; i < datas.length; i++) {
                      Long indic = datas[i]._1();
                      Double value = datas[i]._2();
                      if (vector.containsKey(indic)) {
                          vector.put(indic, value + vector.get(indic));
                      } else {
                          vector.put(indic, value);
                  Long[] indices = new Long[vector.size()];
                  Double[] values = new Double[vector.size()];
                  SortedSet<Long> sortedKeys = new TreeSet<Long>(vector.keySet());
                  int c = 0;
                  for (Long key : sortedKeys) {
                      indices[c] = key;
                      values[c] = vector.get(key);
                  Double label = Double.parseDouble(dockey.substring(6));
                  return new Tuple3<Double, Long[], Double[]>(label, indices, values);
           final Long d = vector
                   .map(new Function<Tuple3<Double,Long[],Double[]>, Long>() {
                       public Long call(Tuple3<Double, Long[], Double[]> v1) throws Exception {
                           Long[] indices = v1._2();
                           if (indices.length > 0) {
    //                           System.out.println("v_length:"+indices.length+"  v_val:" + indices[indices.length - 1]);
                               return indices[indices.length - 1];
                           } else return Long.valueOf(0);
                  .reduce(new Function2<Long, Long, Long>() {
                      public Long call(Long v1, Long v2) throws Exception {
    //                      System.out.println("v1:"+v1+"  v2:"+v2);
                          return v1 > v2 ? v1 : v2;
                  }) + 1;
        RDD<LabeledPoint> examples = vector.map(new Function<Tuple3<Double,Long[],Double[]>, LabeledPoint>() {
            public LabeledPoint call(Tuple3<Double, Long[], Double[]> v1) throws Exception {
                int intIndices [] = new int[v1._2().length];
                double intValues [] = new double[v1._3().length];
                for (int i=0; i< v1._2().length; i++){
                    intIndices[i] = v1._2()[i].intValue();
                    intValues[i] = v1._3()[i];
                return new LabeledPoint(v1._1(), Vectors.sparse(d.intValue(),
                        intIndices, intValues));
        //RDD<LabeledPoint> examples = MLUtils.loadLibSVMFile(ctx.sc(), args[0], false, numFeatures);
        RDD<LabeledPoint>[] split = examples.randomSplit(new double[]{0.8, 0.2}, rand.nextLong());
        JavaRDD<LabeledPoint> training = split[0].toJavaRDD();
        JavaRDD<LabeledPoint> test = split[1].toJavaRDD();
        final NaiveBayesModel model = NaiveBayes.train(training.rdd(), 1.0);
        JavaRDD<Double> prediction =
            test.map(new Function<LabeledPoint, Double>() {
                public Double call(LabeledPoint p) {
                    return model.predict(p.features());
        JavaPairRDD < Double, Double > predictionAndLabel =
            prediction.zip(test.map(new Function<LabeledPoint, Double>() {
                public Double call(LabeledPoint p) {
                    return p.label();
        double accuracy = (double) predictionAndLabel.filter(
                new Function<Tuple2<Double, Double>, Boolean>() {
                    public Boolean call(Tuple2<Double, Double> pl) {
                        return pl._1().equals(pl._2());
                }).count() / test.count();
        System.out.println(String.format("Test accuracy = %f", accuracy));
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235


    Type Date Time Input_data_size Duration(s) Throughput(bytes/s) Throughput/node
    JavaSparkBayes 2016-10-09 16:41:09 113387030 48.857 2320793 2320793
    ScalaSparkBayes 2016-10-09 16:42:00 113387030 45.164 2510562 2510562
    PythonSparkBayes 2016-10-09 16:44:03 113387030 118.521 956683 956683


    hibench.bayes.tiny.pages 25000 
    hibench.bayes.tiny.classes 10 
    hibench.bayes.tiny.ngrams 1 
    hibench.bayes.small.pages 30000 
    hibench.bayes.small.classes 100 
    hibench.bayes.small.ngrams 2 
    hibench.bayes.large.pages 100000 
    hibench.bayes.large.classes 100 
    hibench.bayes.large.ngrams 2 
    hibench.bayes.huge.pages 500000 
    hibench.bayes.huge.classes 100 
    hibench.bayes.huge.ngrams 2 
    hibench.bayes.gigantic.pages 1000000 
    hibench.bayes.gigantic.classes 100 
    hibench.bayes.gigantic.ngrams 2 
    hibench.bayes.bigdata.pages 20000000 
    hibench.bayes.bigdata.classes 20000 
    hibench.bayes.bigdata.ngrams 2



  • 相关阅读:
    HDU 1800——Flying to the Mars——————【字符串哈希】
    FZU 2122 ——又见LKity——————【KMP字符串匹配】
    FZU 2122——又见LKity——————【字符串匹配、暴力】
    POJ 3468——A Simple Problem with Integers——————【线段树区间更新, 区间查询】
    HRBUST 1909——理工门外的树——————【离线处理,差分前缀和】
    HRBUST 1161——Leyni——————【线段树单点更新,区间查询】
    用Gvim建立IDE编程环境 (Windows篇)
    FZU 2207 ——以撒的结合——————【LCA + 记录祖先】
    HDU 5635 ——LCP Array ——————【想法题】
  • 原文地址:https://www.cnblogs.com/candlia/p/11920274.html
Copyright © 2011-2022 走看看