1.HiBench算法简介

Hibench 包含9个典型的hadoop负载(micro benchmarks,hdfs benchmarks,web search bench marks,machine learning benchmarks和data analytics benchmarks)

具体参考CDH集群安装&测试总结:第三节内容

  • micro benchmarks 
    Sort:使用hadoop randomtextwriter生成数据,并对数据进行排序。 
    Wordcount:统计输入数据中每个单词的出现次数,输入数据使用hadoop randomtextwriter生成。 
    TeraSort:输入数据由hadoop teragen产生,通过key值进行排序。

  • hdfs benchmarks 
    增强行的dfsio:通过产生大量同时执行读写请求的任务测试hadoop机群的hdfs吞吐量

  • web search bench marks 
    Nutch indexing:大规模收索引擎,这个是负载测试nutch(apache的一个开源搜索引擎)的搜索子系统,使用自动生成的web数据,web数据中的连接和单词符合zipfian分布(一个单词出现的次数与它在频率表的排名成反比) 
    Pagerank:这个负载包含在一种在hadoop上的pagerank的算法实现,使用自动生成的web数据,web数据中的链接符合zipfian分布。(对于任意一个term其频度(frequency)的排名(rank)和frequency的乘积大致是一个常数)

  • machine learning benchmarks 
    Mahout bayesian classification(bayes):大规模机器学习,这个负载测试mahout(apache开源机器学习库)中的naive bayesian 训练器,输入的数据是自动生成的文档,文档中的单词符合zipfian分布。 
    Mahout k-means clustering(kmeans):测试mahout中的k-means聚类算法,输入的数据集由基于平均分布和高斯分布的genkmeansdataset产生。

  • data analytics benchmarks 
    Hive query benchmarks(hivebench):包含执行的典型olap查询的hive查询(aggregation和join),使用自动生成的web数据,web数据的链接符合zipfian分布。

注:使用的生成数据程序在hadoop-mapreduce-examples-2.6.0 jar 包内,可以使用反编译工具查看。


2.HiBench中bayes算法流程

  1. 主要流程为conf下配置测试项,测试语言和DataSize,然后运行bin下run-all.sh完成一次测试,此流程为手动完成,可以编写脚本重复此步骤完成多次测试减少手动操作; 
    e.g.
#!/bin/bash

#       Time: 20160930,created by sunfei
#       Describe: automatic run the hibench
#       Functions :
#            search(): Find the style of application in the  99-user_defined_properties.conf,eg:tiny,small..
#                               exec_application_noSQL(): run the application for times,and no use hive
#                               exec_application_SQL(): run the application for times,and use hive
#                               save_result(): save the result of application
#                               main_function(): the main function of running all the appliction
#                               main(): the main function of running different kind applicationcpuLoad()
{cpu=`grep -c 'model name' /proc/cpuinfo`load_15=`uptime | awk '{print $NF}'`average_load=`echo "scale=2;a=${load_15}/${cpu};if(length(a)==scale(a)) print 0;print a" | bc`date >> datetime-load.txt${average_load} >> cpu-load.txtpaste datetime-load.txt cpu-load.txt >> load-day.txt
}search()
{#config="/opt/HiBench/HiBench-master/conf/99-user_defined_properties.conf"config=/usr/HiBench-master/conf/99-user_defined_properties.confsed -n '/hibench.scale.profile/p' ${config} >> hibench.txtvar=''while read linedoif [ ${line:0:13} = "hibench.scale" ];thenecho -e "\033[32m match sucessfull! \033[0m"var=${line:22}fidone<"hibench.txt"if [ "$var" = "${1}" ];thenecho -e "\033[31m The style of application can't same,do you want to continue? yes | no \033[0m"read -p "Input your chose :" choseif [ "${chose}" = "no" ];thenexit 1elseecho -e "\033[32m The ${1}  style of application will be run! \033[0m"fifiif [ -f "hibench.txt" ];thenrm -rf "hibench.txt"echo -e "\033[32m The hibench.txt has deleted! \033[0m"fiecho -e "\033[32m The application will run the "${1}" style \033[0m"sed -i "s/${var}/${1}/" ${config}
}exec_application_noSQL()
{var=0for ((i=1;i<=${1};i++))dolet "var=$i%1"if [ "$var" -eq 0 ];thenhadoop fs -rm  -r hdfs://archive.cloudera.com:8020/user/hdfs/.Trash/*hadoop fs -rm -r hdfs://archive.cloudera.com:8020/HiBench/*fiecho -e  "\033[32m **********************The current times is ********************:\033[0m" ${i}#/opt/HiBench/HiBench-master/bin/run-all.sh/usr/HiBench-master/bin/run-all.shecho -e  "\033[32m ********************** The current time is "${i}" ,and it has exec finished successfully! ********************:\033[0m"doneecho -e "\033[32m *********The application has finished,please modify the configuration!***** \033[0m"
}exec_application_SQL()
{var=0for ((i=1;i<=${1};i++))doecho "drop table uservisits;drop table uservisits_aggre;drop table rankings;drop table rankings_uservisits_join;drop table uservisits_copy;exit;" | /usr/bin/hivelet "var=$i%1"if [ "$var" -eq 0 ];thenhadoop fs -rm  -r hdfs://archive.cloudera.com:8020/user/hdfs/.Trash/*hadoop fs -rm -r hdfs://archive.cloudera.com:8020/HiBench/*fiecho -e  "\033[32m **********************The current times is ********************:\033[0m" ${i}#/opt/HiBench/HiBench-master/bin/run-all.sh/usr/HiBench-master/bin/run-all.shecho -e  "\033[32m **********************The current time is "${i}" ,and it has exec finished successfully! ********************:\033[0m"doneecho -e "\033[32m *********The application has finished,please modify the configuration!***** \033[0m"}save_result()
{if [ -f result.txt ];thenrm -rf result.txtecho -e "\033[32m The hibench.txt has deleted! \033[0m"fi#select the words in the report#filepath=/opt/HiBench/HiBench-master/report/hibench.reportfilepath=/usr/HiBench-master/report/hibench.reportword=""var1=`date +"%m/%d/%Y-%k:%M:%S"`var2=${1}var5=".txt"var4=${var2}${var5}case ${1} in"aggregation")word="JavaSparkAggregation";;"join")word="JavaSparkJoin";;"scan")word="JavaSparkScan";;"kmeans")word="JavaSparkKmeans";;"pagerank")word="JavaSparkPagerank";;"sleep")word="JavaSparkSleep";;"sort")word="JavaSparkSort";;"wordcount")word="JavaSparkWordcount";;"bayes")word="JavaSparkBayes";;"terasort")word="JavaSparkTerasort";;*)echo -e "\033[32m The name of application is wrong,please change it! \033[0m";;esacwhile read linedoecho $line | sed -n "/${word}/p" >> ${var4}done <$filepathecho -e "\033[32m The job has finished! \033[0m"
}main_function()
{#Input the name of application need to execfor appName in aggregation join scan pagerank sleep sort wordcount bayes terasort kmeansdo#appConfig=/opt/HiBench/HiBench-master/conf/benchmarks.lstappConfig=/usr/HiBench-master/conf/benchmarks.lstecho "The name of application is :"${appName}echo ${appName} > ${appConfig}for style in tiny small large huge giganticdosearch ${style}if [ "aggregation" = ${appName} ] || [ "join" = ${appName} ] || [ "scan" = ${appName} ];thenexec_application_SQL ${1}elseexec_application_noSQL ${1}fidonesave_result ${appName}done
}main()
{# run the applicationread -p "Input the times of exec: " timesif [ "${times}" -eq 0 -o "${times}" -gt 60 ];thenecho -e "\033[31m The times of application can't be empty or gt 60 ! Do you want to continue ? yes | no\033[0m"read -p "Input your chose :" choseif [ "${chose}" = "no" ];thenexit 1elseecho -e "\033[32m The application will be run ${times} times ! \033[0m"fifiecho -e "\033[33m Select the style of application : \033[0m \033[31m All | Signal \033[0m"read -p "Input your chose :" styleif [ "${style}" = "" ];thenecho -e "\033[31m The style of application can't be empty \033[0m"exit 1elif [ "${style}" != "All" -a "${style}" != "Signal" ];thenecho -e "\033[31m The style of application is wrong,please correct! \033[0m"exit 1elseecho -e "\033[32m The style of application is ok ! \033[0m"fiif [ "All" = "${style}" ];thenmain_function ${times}elseecho -e "\033[033m Input the name of apliaction,eg:\033[0m \033[31m aggregation | join | scan | kmeans | pagerank | sleep | sort | wordcount | bayes | terasort\033[0m"read -p "Input you chose :" applicationif [ "${application}" = "" ];thenecho -e "\033[31m The name of application can't be empty! \033[0m"exit 1fiecho "********************The ${application} will be exec**********************"appConfig=/usr/HiBench-master/conf/benchmarks.lst#appConfig=/opt/HiBench/HiBench-master/conf/benchmarks.lstread -p "Do you want exec all the style of application,eg:tiny,small,large,huge,gigantic? yes | no " choseif [ "${chose}" = "" ];thenecho -e "\033[31m The style of application can't be empty! \033[0m"exit 1elif [ "yes" != ${chose} ] && [ "no" != ${chose} ];thenecho -e "\033[31m The style of application is wrong,please correct! \033[0m"exit 1elseecho -e "\033[32m The style of application is ok ! \033[0m"firead -p "Input the sytle of application,eg:( tiny small large huge gigantic )!" appStyleecho "***************************The ${appStyle} style will be exec***************************"for appName in ${application}doecho ${appName} > ${appConfig}if [ "yes" = "${chose}" ];thenfor var in tiny small large huge giganticdoecho "******************The ${appName} will be exec!************************************"search ${var}if [ "aggregation" = ${appName} ] || [ "join" = ${appName} ] || [ "scan" = ${appName} ];thenexec_application_SQL ${times}elseexec_application_noSQL ${times}fidoneelse#       read -p "Input the sytle of application,eg:( tiny small large huge gigantic )!" appStyleecho "**************************The ${appName} will be exec!************************"if [ "${appStyle}" = "" ];thenecho -e "\033[31m The style of application can't be empty! \033[0m"exit 1fifor var in ${appStyle}dosearch ${var}if [ "aggregation" = ${appName} ] || [ "join" = ${appName} ] || [ "scan" = ${appName} ];thenexec_application_SQL ${times}elseexec_application_noSQL ${times}fidonefisave_result ${appName}donefi
}# the main function of application
main
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242
  • 243
  • 244
  • 245
  • 246
  • 247
  • 248
  • 249
  • 250
  • 251
  • 252
  • 253
  • 254
  • 255
  • 256
  • 257
  • 258
  • 259
  1. prepare.sh->run.sh为run-all.sh的子流程;
  2. enter_bench->…->leave_bench为prepare.sh和run.sh的子流程;
  3. enter_bench…..gen_report等为workload-functions.sh中的公共函数。

流程图如下:

2.1 数据生成代码分析,接口:HiBench.DataGen

对java代码我不太熟悉,接口中我看主要用了一个switch语句

DataGen类中DataOptions options = new DataOptions(args); 
如果是bayes测试的话,就调用对应的数据生成类,进行数据生成。生成的数据接口部分代码:

case BAYES: {BayesData data = new BayesData(options);data.generate();break;}
  • 1
  • 2
  • 3
  • 4
  • 5

BayesData实现:

package HiBench;import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Random;import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.lib.NLineInputFormat;public class BayesData {private static final Log log = LogFactory.getLog(BayesData.class.getName());private DataOptions options;private Dummy dummy;private int cgroups;BayesData(DataOptions options) {this.options = options;parseArgs(options.getRemainArgs());}private void parseArgs(String[] args) {for (int i=0; i<args.length; i++) {if ("-class".equals(args[i])) {cgroups = Integer.parseInt(args[++i]);} else {DataOptions.printUsage("Unknown bayes data arguments -- " + args[i] + "!!!");System.exit(-1);}}}private static class CreateBayesPages extends MapReduceBase implementsMapper<LongWritable, Text, Text, Text> {private static final Log log = LogFactory.getLog(CreateBayesPages.class.getName());private long pages, slotpages;private int groups;private HtmlCore generator;private Random rand;public void configure(JobConf job) {try {pages = job.getLong("pages", 0);slotpages = job.getLong("slotpages", 0);groups = job.getInt("groups", 0);generator = new HtmlCore(job);} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();}}@Overridepublic void map(LongWritable key, Text value,OutputCollector<Text, Text> output, Reporter reporter)throws IOException {int slotId = Integer.parseInt(value.toString().trim());long[] range = HtmlCore.getPageRange(slotId, pages, slotpages);generator.fireRandom(slotId);rand = new Random(slotId * 1000 + 101);Text k = new Text();for (long i=range[0]; i<range[1]; i++) {String classname = "/class" + rand.nextInt(groups);k.set(classname);value.set(generator.genBayesWords());output.collect(k, value);reporter.incrCounter(HiBench.Counters.BYTES_DATA_GENERATED,k.getLength()+value.getLength());if (0==(i % 10000)) {log.info("still running: " + (i - range[0]) + " of " + slotpages);}}}}private void setBayesOptions(JobConf job) throws URISyntaxException {job.setLong("pages", options.getNumPages());job.setLong("slotpages", options.getNumSlotPages());job.setInt("groups", cgroups);Utils.shareWordZipfCore(options, job);}private void createBayesData() throws IOException, URISyntaxException {log.info("creating bayes text data ... ");JobConf job = new JobConf();Path fout = options.getResultPath();Utils.checkHdfsPath(fout);String jobname = "Create bayes data";job.setJobName(jobname);Utils.shareDict(options, job);setBayesOptions(job);FileInputFormat.setInputPaths(job, dummy.getPath());job.setInputFormat(NLineInputFormat.class);job.setJarByClass(CreateBayesPages.class);job.setMapperClass(CreateBayesPages.class);job.setNumReduceTasks(0);FileOutputFormat.setOutputPath(job, fout);job.setOutputFormat(SequenceFileOutputFormat.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);log.info("Running Job: " +jobname);log.info("Pages file " + dummy.getPath() + " as input");log.info("Rankings file " + fout + " as output");JobClient.runJob(job);log.info("Finished Running Job: " + jobname);}private void init() throws IOException {Utils.checkHdfsPath(options.getResultPath(), true);Utils.checkHdfsPath(options.getWorkPath(), true);dummy = new Dummy(options.getWorkPath(), options.getNumMaps());int words = RawData.putDictToHdfs(new Path(options.getWorkPath(), HtmlCore.getDictName()), options.getNumWords());options.setNumWords(words);Utils.serialWordZipf(options);}public void generate() throws Exception {init();createBayesData();close();}private void close() throws IOException {log.info("Closing bayes data generator...");Utils.checkHdfsPath(options.getWorkPath());}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168

prepare.sh运行时输出如下,可以看到刚开始主要是读取配置文件中的内容,随后调用hadoop和jar包跑了一个任务,这个就是bayes文本分类的生成数据,按照第一节以及介绍的和官网的说明,这个文本主要使用linux中的字典:”/usr/share/dict/words”并且符合zipfian分布。

[hdfs@sf11 prepare]$ ./prepare.sh 
patching args= 
Parsing conf: /opt/HiBench/HiBench-master/conf/00-default-properties.conf 
Parsing conf: /opt/HiBench/HiBench-master/conf/01-default-streamingbench.conf 
Parsing conf: /opt/HiBench/HiBench-master/conf/10-data-scale-profile.conf 
Parsing conf: /opt/HiBench/HiBench-master/conf/20-samza-common.conf 
Parsing conf: /opt/HiBench/HiBench-master/conf/30-samza-workloads.conf 
Parsing conf: /opt/HiBench/HiBench-master/conf/99-user_defined_properties.conf 
Parsing conf: /opt/HiBench/HiBench-master/workloads/bayes/conf/00-bayes-default.conf 
Parsing conf: /opt/HiBench/HiBench-master/workloads/bayes/conf/10-bayes-userdefine.conf 
probe sleep jar: /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/share/hadoop/mapreduce2/hadoop-mapreduce-client-jobclient-tests.jar 
start HadoopPrepareBayes bench 
/opt/HiBench/HiBench-master/bin/functions/workload-functions.sh: line 120: /dev/stderr: Permission denied 
rm: `hdfs://archive.cloudera.com:8020/HiBench/Bayes/Input’: No such file or directory 
Submit MapReduce Job: /opt/cloudera/parcels/CDH/lib/hadoop/bin/hadoop –config /etc/hadoop/conf jar /opt/HiBench/HiBench-master/src/autogen/target/autogen-5.0-SNAPSHOT-jar-with-dependencies.jar HiBench.DataGen -t bayes -b hdfs://archive.cloudera.com:8020/HiBench/Bayes -n Input -m 300 -r 1600 -p 500000 -class 100 -o sequence 
16/10/21 16:34:02 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 
16/10/21 16:34:32 INFO HiBench.BayesData: Closing bayes data generator… 
finish HadoopPrepareBayes bench

部分生成数据:

在看了将近两周的HiBench代码进行测试后,终于摸清上述的运行流程,intel 的这个测试框架确实比较简介,通过配置文件和shell以及一些大数据框架自带的例子(如Hibench中的workcount测试就是直接调用hadoop或者spark自带的程序)完成了整个庞大的测试工作,下面我们针对贝叶斯文本分类算法中HiBench使用的三种语言:python,scala,java分别进行分析:

2.3 python代码分析

 

部分python代码:

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#"""
A naive bayes program using MLlib.This example requires NumPy (http://www.numpy.org/).
"""import sysfrom pyspark import SparkContext
from pyspark.mllib.util import MLUtils
from pyspark.mllib.classification import NaiveBayes
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vectors
from pyspark.storagelevel import StorageLevel
from operator import add
from itertools import groupby
#
# Adopted from spark's doc: http://spark.apache.org/docs/latest/mllib-naive-bayes.html
#
def parseVector(line):return np.array([float(x) for x in line.split(' ')])if __name__ == "__main__":if len(sys.argv) != 2:print >> sys.stderr, "Usage: bayes <file>"exit(-1)sc = SparkContext(appName="PythonNaiveBayes")filename = sys.argv[1]data = sc.sequenceFile(filename, "org.apache.hadoop.io.Text", "org.apache.hadoop.io.Text")wordCount = data                                \.flatMap(lambda (key, doc):doc.split(" "))    \.map(lambda x:(x, 1))                                \.reduceByKey(add)wordSum = wordCount.map(lambda x:x[1]).reduce(lambda x,y:x+y)wordDict = wordCount.zipWithIndex()             \.map(lambda ((key, count), index): (key, (index, count*1.0 / wordSum)) )             \.collectAsMap()sharedWordDict = sc.broadcast(wordDict)# for each document, generate vector based on word freqdef doc2vector(dockey, doc):# map to word index: freq# combine freq with same worddocVector = [(key, sum((z[1] for z in values))) for key, values ingroupby(sorted([sharedWordDict.value[x] for x in doc.split(" ")],key=lambda x:x[0]),key=lambda x:x[0])](indices, values) = zip(*docVector)      # unziplabel = float(dockey[6:])return label, indices, valuesvector = data.map( lambda (dockey, doc) : doc2vector(dockey, doc))vector.persist(StorageLevel.MEMORY_ONLY)d = vector.map( lambda (label, indices, values) : indices[-1] if indices else 0)\.reduce(lambda a,b:max(a,b)) + 1#    print "###### Load svm file", filename#examples = MLUtils.loadLibSVMFile(sc, filename, numFeatures = numFeatures)examples = vector.map( lambda (label, indices, values) : LabeledPoint(label, Vectors.sparse(d, indices, values)))examples.cache()# FIXME: need randomSplit!training = examples.sample(False, 0.8, 2)test = examples.sample(False, 0.2, 2)numTraining = training.count()numTest = test.count()print " numTraining = %d, numTest = %d." % (numTraining, numTest)model = NaiveBayes.train(training, 1.0)model_share = sc.broadcast(model)predictionAndLabel = test.map( lambda x: (x.label, model_share.value.predict(x.features)))
#    prediction = model.predict(test.map( lambda x: x.features ))
#    predictionAndLabel = prediction.zip(test.map( lambda x:x.label ))accuracy = predictionAndLabel.filter(lambda x: x[0] == x[1]).count() * 1.0 / numTestprint "Test accuracy = %s." % accuracy
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103

2.4 scala 代码分析

run-spark-job org.apache.spark.examples.mllib.SparseNaiveBayes ${INPUT_HDFS}

显然scala 的朴素贝叶斯就是调用spark mllib库中的代码了

 
 

2.5 java 代码分析

run-spark-job com.intel.sparkbench.bayes.JavaBayes ${INPUT_HDFS}

java部分比较意外的HiBench没有采用原生的代码或者jar包,而是自己写了一个 
代码如下,回头慢慢分析:

/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements.  See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License.  You may obtain a copy of the License at**    http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package com.intel.sparkbench.bayes;import org.apache.spark.SparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.mllib.classification.NaiveBayesModel;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.rdd.RDD;
import org.apache.spark.storage.StorageLevel;
import scala.*;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;
import org.apache.spark.mllib.classification.NaiveBayes;
import org.apache.hadoop.io.Text;import java.lang.Boolean;
import java.lang.Double;
import java.lang.Long;
import java.util.*;
import java.util.regex.Pattern;/** Adopted from spark's doc: http://spark.apache.org/docs/latest/mllib-naive-bayes.html*/
public final class JavaBayes {private static final Pattern SPACE = Pattern.compile(" ");public static void main(String[] args) throws Exception {if (args.length < 1) {System.err.println("Usage: JavaBayes <file>");System.exit(1);}Random rand = new Random();SparkConf sparkConf = new SparkConf().setAppName("JavaBayes");JavaSparkContext ctx = new JavaSparkContext(sparkConf);
//    int numFeatures = Integer.parseInt(args[1]);// Generate vectors according to input documentsJavaPairRDD<String, String> data = ctx.sequenceFile(args[0], Text.class, Text.class).mapToPair(new PairFunction<Tuple2<Text, Text>, String, String>() {@Overridepublic Tuple2<String, String> call(Tuple2<Text, Text> e) {return new Tuple2<String, String>(e._1().toString(), e._2().toString());}});JavaPairRDD<String, Long> wordCount = data.flatMap(new FlatMapFunction<Tuple2<String, String>, String>() {@Overridepublic Iterable<String> call(Tuple2<String, String> e) {return Arrays.asList(SPACE.split(e._2()));}}).mapToPair(new PairFunction<String, String, Long>() {@Overridepublic Tuple2<String, Long> call(String e) {return new Tuple2<String, Long>(e, 1L);}}).reduceByKey(new Function2<Long, Long, Long>() {@Overridepublic Long call(Long i1, Long i2) {return i1 + i2;}});final Long wordSum = wordCount.map(new Function<Tuple2<String, Long>, Long>(){@Overridepublic Long call(Tuple2<String, Long> e) {return e._2();}}).reduce(new Function2<Long, Long, Long>() {@Overridepublic Long call(Long v1, Long v2) throws Exception {return v1 + v2;}});List<Tuple2<String, Tuple2<Long, Double>>> wordDictList = wordCount.zipWithIndex().map(new Function<Tuple2<Tuple2<String, Long>, Long>, Tuple2<String, Tuple2<Long, Double>>>() {@Overridepublic Tuple2<String, Tuple2<Long, Double>> call(Tuple2<Tuple2<String, Long>, Long> e) throws Exception {String key = e._1()._1();Long count = e._1()._2();Long index = e._2();return new Tuple2<String, Tuple2<Long, Double>>(key, new Tuple2<Long, Double>(index,count.doubleValue() / wordSum));}}).collect();Map<String, Tuple2<Long, Double>> wordDict = new HashMap();for (Tuple2<String, Tuple2<Long, Double>> item : wordDictList) {wordDict.put(item._1(), item._2());}final Broadcast<Map<String, Tuple2<Long, Double>>> sharedWordDict = ctx.broadcast(wordDict);// for each document, generate vector based on word freqJavaRDD<Tuple3<Double, Long[], Double[]>> vector = data.map(new Function<Tuple2<String, String>, Tuple3<Double, Long[], Double[]>>() {@Overridepublic Tuple3<Double, Long[], Double[]> call(Tuple2<String, String> v1) throws Exception {String dockey = v1._1();String doc = v1._2();String[] keys = SPACE.split(doc);Tuple2<Long, Double>[] datas = new Tuple2[keys.length];for (int i = 0; i < keys.length; i++) {datas[i] = sharedWordDict.getValue().get(keys[i]);}Map<Long, Double> vector = new HashMap<Long, Double>();for (int i = 0; i < datas.length; i++) {Long indic = datas[i]._1();Double value = datas[i]._2();if (vector.containsKey(indic)) {vector.put(indic, value + vector.get(indic));} else {vector.put(indic, value);}}Long[] indices = new Long[vector.size()];Double[] values = new Double[vector.size()];SortedSet<Long> sortedKeys = new TreeSet<Long>(vector.keySet());int c = 0;for (Long key : sortedKeys) {indices[c] = key;values[c] = vector.get(key);c+=1;}Double label = Double.parseDouble(dockey.substring(6));return new Tuple3<Double, Long[], Double[]>(label, indices, values);}});vector.persist(StorageLevel.MEMORY_ONLY());final Long d = vector.map(new Function<Tuple3<Double,Long[],Double[]>, Long>() {@Overridepublic Long call(Tuple3<Double, Long[], Double[]> v1) throws Exception {Long[] indices = v1._2();if (indices.length > 0) {
//                           System.out.println("v_length:"+indices.length+"  v_val:" + indices[indices.length - 1]);return indices[indices.length - 1];} else return Long.valueOf(0);}}).reduce(new Function2<Long, Long, Long>() {@Overridepublic Long call(Long v1, Long v2) throws Exception {
//                      System.out.println("v1:"+v1+"  v2:"+v2);return v1 > v2 ? v1 : v2;}}) + 1;RDD<LabeledPoint> examples = vector.map(new Function<Tuple3<Double,Long[],Double[]>, LabeledPoint>() {@Overridepublic LabeledPoint call(Tuple3<Double, Long[], Double[]> v1) throws Exception {int intIndices [] = new int[v1._2().length];double intValues [] = new double[v1._3().length];for (int i=0; i< v1._2().length; i++){intIndices[i] = v1._2()[i].intValue();intValues[i] = v1._3()[i];}return new LabeledPoint(v1._1(), Vectors.sparse(d.intValue(),intIndices, intValues));}}).rdd();//RDD<LabeledPoint> examples = MLUtils.loadLibSVMFile(ctx.sc(), args[0], false, numFeatures);RDD<LabeledPoint>[] split = examples.randomSplit(new double[]{0.8, 0.2}, rand.nextLong());JavaRDD<LabeledPoint> training = split[0].toJavaRDD();JavaRDD<LabeledPoint> test = split[1].toJavaRDD();final NaiveBayesModel model = NaiveBayes.train(training.rdd(), 1.0);JavaRDD<Double> prediction =test.map(new Function<LabeledPoint, Double>() {@Overridepublic Double call(LabeledPoint p) {return model.predict(p.features());}});JavaPairRDD < Double, Double > predictionAndLabel =prediction.zip(test.map(new Function<LabeledPoint, Double>() {@Overridepublic Double call(LabeledPoint p) {return p.label();}}));double accuracy = (double) predictionAndLabel.filter(new Function<Tuple2<Double, Double>, Boolean>() {@Overridepublic Boolean call(Tuple2<Double, Double> pl) {return pl._1().equals(pl._2());}}).count() / test.count();System.out.println(String.format("Test accuracy = %f", accuracy));ctx.stop();}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235

3.运行结果

Type Date Time Input_data_size Duration(s) Throughput(bytes/s) Throughput/node
JavaSparkBayes 2016-10-09 16:41:09 113387030 48.857 2320793 2320793
ScalaSparkBayes 2016-10-09 16:42:00 113387030 45.164 2510562 2510562
PythonSparkBayes 2016-10-09 16:44:03 113387030 118.521 956683 956683

bayes算法数据规模参考:

#Bayes 
hibench.bayes.tiny.pages 25000 
hibench.bayes.tiny.classes 10 
hibench.bayes.tiny.ngrams 1 
hibench.bayes.small.pages 30000 
hibench.bayes.small.classes 100 
hibench.bayes.small.ngrams 2 
hibench.bayes.large.pages 100000 
hibench.bayes.large.classes 100 
hibench.bayes.large.ngrams 2 
hibench.bayes.huge.pages 500000 
hibench.bayes.huge.classes 100 
hibench.bayes.huge.ngrams 2 
hibench.bayes.gigantic.pages 1000000 
hibench.bayes.gigantic.classes 100 
hibench.bayes.gigantic.ngrams 2 
hibench.bayes.bigdata.pages 20000000 
hibench.bayes.bigdata.classes 20000 
hibench.bayes.bigdata.ngrams 2


参考文献

https://github.com/intel-hadoop/HiBench

HiBench算法简介相关推荐

  1. 数据结构与算法:算法简介

    数据结构与算法:算法简介 雪柯 大工生物信息 提笔为写给奋进之人 已关注 你说呢 . shenwei356 等 70 人赞同了该文章 引用自算法图解,作者[美] Aditya Bhargava 译袁国 ...

  2. hash算法_一致性hash算法简介

    一致性hash算法有什么用?我们为什么需要一致性hash算法?这两个问题的答案可以看这篇文章 分布式系统路由算法简介. 了解了一致性hash算法出现的背景,我们来看看什么是一致性hash算法.一致性h ...

  3. Minimax 和 Alpha-beta 剪枝算法简介,及以此实现的井字棋游戏(Tic-tac-toe)

    前段时间用 React 写了个2048 游戏来练练手,准备用来回顾下 React 相关的各种技术,以及试验一下新技术.在写这个2048的过程中,我考虑是否可以在其中加入一个 AI 算法来自动进行游戏, ...

  4. 推荐系统算法_机器学习和推荐系统(二)推荐算法简介

    推荐算法简介 一. 基于人口统计学的推荐算法 二.基于内容的推荐算法 三. 基于协同过滤的推荐算法 协同过滤(Collaborative Filtering , CF) 基于近邻的系统过滤 基于用户( ...

  5. 图像迁移风格保存模型_CV之NS:图像风格迁移(Neural Style 图像风格变换)算法简介、关键步骤配图、案例应用...

    CV之NS:图像风格迁移(Neural Style 图像风格变换)算法简介.过程思路.关键步骤配图.案例应用之详细攻略 目录 图像风格迁移算法简介 图像风格迁移算法过程思路 1.VGG对比NS 图像风 ...

  6. 魔棒工具--RegionGrow算法简介

    from: 魔棒工具--RegionGrow算法简介 ps里面的魔棒工具非常好用,是图像处理中非常常用的一个工具,它现在已经是我的c++工具箱中很重要的一员了,我会在以后的时间里把我的工具箱逐渐介绍给 ...

  7. 【数据挖掘】基于划分的聚类方法 ( K-Means 算法简介 | K-Means 算法步骤 | K-Means 图示 )

    文章目录 一. 基于划分的聚类方法 二. K-Means 算法 简介 三. K-Means 算法 步骤 四. K-Means 方法的评分函数 五. K-Means 算法 图示 一. 基于划分的聚类方法 ...

  8. AI - 常见算法简介(Common Algorithms)

    机器学习常见算法简介 - 原文链接:http://usblogs.pwc.com/emerging-technology/machine-learning-methods-infographic/ 应 ...

  9. DL之CNN:卷积神经网络算法简介之原理简介——CNN网络的3D可视化(LeNet-5为例可视化)

    DL之CNN:卷积神经网络算法简介之原理简介--CNN网络的3D可视化(LeNet-5为例可视化) CNN网络的3D可视化 3D可视化地址:http://scs.ryerson.ca/~aharley ...

最新文章

  1. 电路实验1-电容充放电
  2. jquery技巧(持续更新。。)
  3. Java多线程(3)—生产者/消费者
  4. 蓝桥杯历届试题 国王的烦恼(并查集逆序加边+坑)
  5. js 设置style属性
  6. ta-lib依赖安装问题
  7. 在线sql服务器,SQL Server链接服务器
  8. Find n‘th number in a number system with only 3 and 4
  9. ocr带单字坐标离线识别
  10. Linux启动系统时不启动防火墙,Linux系统启动并配置防火墙的方法
  11. 3道js面试题引发的脑洞
  12. 【机器人算法】机器人动力学参数辨识
  13. 最火大厂面试题、面试技巧汇总及简历编写(附简历模版下载)
  14. 数据仓库系列9- 大数据分析
  15. 在odl中怎样实现rpc
  16. 从0到1的电商架构应该怎么做?
  17. 自动删除微博【新版微博】
  18. Bluedroid 打开蓝牙流程
  19. 蜗牛星际C单下实现黑群的局域网唤醒
  20. 24Python读取PPT文档内容

热门文章

  1. 决策树算法梳理以及python实现
  2. 4月10日服务器例行维护公告,4月10日服务器例行维护公告
  3. 前台alert弹出页面,点击确定,关闭弹出框,整个页面进行刷新数据
  4. 为什么PrimeTime修timing时,带physical aware还不如不带physical aware
  5. AV1编码标准-算法描述
  6. python工程监理_1.工程监理招标属于()
  7. 求解汽车加油问题C++
  8. 网络传输协议原理透析
  9. fest556_FEST JavaFX编译器Maven插件
  10. C语言指针 *p++和*++p及++*p的区别