基于Hadoop的商品推荐系统

推荐结果=用户的购买向量*物品的相似度矩阵

物品的相似度:物品的共现次数(也可以使用欧氏距离等)

预备工作

1.项目名:GRMS

2.添加Maven依赖:pom.xml

3.创建包:

com.briup.bigdata.project.grms

|--step1

|--step2

|--...

|--utils

4.将集群上的四个xml配置文件放到resources目录中。

5.在HDFS集群的根目录下创建目录:

/grms

|--rawdata/matrix.txt

|--step1

|--...

6.初始数据:matrix.txt

10001      20001      1

10001      20002      1

10001      20005      1

10001      20006      1

10001      20007      1

10002      20003      1

10002      20004      1

10002      20006      1

10003      20002      1

10003      20007      1

10004      20001      1

10004      20002      1

10004      20005      1

10004      20006      1

10005      20001      1

10006      20004      1

10006      20007      1

这里1000开头的是用户编号,2000开头的是商品编号,最后一列是购买次数

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.briup.bigdata.project.grms</groupId><artifactId>GRMS</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.8.3</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>2.8.3</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>2.8.3</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>2.8.3</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-auth</artifactId><version>2.8.3</version></dependency><dependency><groupId>commons-logging</groupId><artifactId>commons-logging</artifactId><version>1.2</version></dependency><dependency><groupId>commons-lang</groupId><artifactId>commons-lang</artifactId><version>2.6</version></dependency><dependency><groupId>commons-configuration</groupId><artifactId>commons-configuration</artifactId><version>1.9</version></dependency><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.5</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-common</artifactId><version>1.2.6</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>1.2.6</version></dependency></dependencies><build><finalName>grms</finalName><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.6.2</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin></plugins></build></project>

//以上版本信息根据各人使用版本进行调整

8. 计算用户购买商品的列表

类名:UserBuyGoodsList.java

方法:

UserBuyGoodsList

UserBuyGoodsListMapper

UserBuyGoodsListReducer

代码实现

package com.briup.bigdata.project.grms;import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*;import java.io.IOException;
import java.util.Iterator;public class UserBuyGoodsList extends Configured implements Tool {static class UserBuyGoodsListMapper extends Mapper<LongWritable, Text, Text, Text> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] tuple=value.toString().split("\t");context.write(new Text(tuple[0]),new Text(tuple[1]));}}static class UserBuyGoodsListReducer extends Reducer<Text,Text,Text,Text>{@Overrideprotected void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException {Iterator<Text> iterator=values.iterator();StringBuilder builder=new StringBuilder();while(iterator.hasNext()){builder.append(iterator.next().toString()+",");}String result=builder.substring(0,builder.length()-1);context.write(key,new Text(result));}}public int run(String[] args) throws Exception {Configuration conf = getConf();Path in = new Path(conf.get("in"));Path out = new Path(conf.get("out"));Job job = Job.getInstance(conf, this.getClass().getSimpleName());job.setJarByClass(UserBuyGoodsList.class);job.setMapperClass(UserBuyGoodsListMapper.class);job.setReducerClass(UserBuyGoodsListReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);TextInputFormat.addInputPath(job,in);TextOutputFormat.setOutputPath(job,out);return job.waitForCompletion(true)?0:-1;}public static void main(String[] args) throws Exception {System.exit(ToolRunner.run(new GoodsConcurrenceList(),args));}
}

以上思路:在Map端中对初始数据按行读取,每行数据按”/t”分隔放入数组,也就是说用户数据进入tuple[0],商品数据进入tuple[1],提交给reduce端处理,利用迭代器将key值相同的value值分别append到stringbuilder中(注意,不建议使用string,因为使用string + 会新生成一个字符串,在大数据中更消耗内存),以逗号分隔,最后在context中写入时注意substring最后一个字符(那个逗号没有意义)

run方法中配置作业,这种方法会相对比较浪费精力因为每次写一个新的类就要重新配置一次,本文最后会推荐一个将所有配置写成一个类,这样重新配置的时候可以轻松且可视一些。

结果数据:

10001      20001,20005,20006,20007,20002

10002      20006,20003,20004

10003      20002,20007

10004      20001,20002,20005,20006

10005      20001

10006      20004,20007

9.计算商品的共现关系

文件:GoodsCooccurrenceList.java

类名:GoodsCooccurrenceList

GoodsCooccurrenceListMapper

GoodsCooccurrenceListReducer

数据来源:第1步的计算结果

代码实现:

package com.briup.bigdata.project.grms;import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;public class GoodsConcurrenceList extends Configured implements Tool{private final static Text K = new Text();private final static IntWritable V = new IntWritable(1);static class GoodsConcurrenceListMapper extends Mapper<LongWritable,Text,Text,IntWritable>{@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] tokens=value.toString().split("\t");String[] items =tokens[1].split(",");for(int i=0;i<items.length;i++){String itemA=items[i];for(int j=0;j<items.length;j++){String itemB=items[j];K.set(itemA+"/t"+itemB);context.write(K,V);}}}}static class GoodsConcurrenceListReducer extends Reducer<Text, IntWritable, Text, IntWritable>{@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum =0;for(IntWritable value :values ){sum += value.get();}V.set(sum);context.write(key, V);}}public int run(String[] strings) throws Exception {Configuration conf = getConf();Job job = Job.getInstance(conf,this.getClass().getSimpleName());Path in = new Path(conf.get("in"));Path out = new Path(conf.get("out"));job.setJarByClass(GoodsConcurrenceList.class);job.setMapperClass(GoodsConcurrenceListMapper.class);job.setReducerClass(GoodsConcurrenceListReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);TextInputFormat.addInputPath(job,in);TextOutputFormat.setOutputPath(job,out);return job.waitForCompletion(true)?0:-1;}public static void main(String[] args) throws Exception {System.exit(ToolRunner.run(new GoodsConcurrenceList(),args));}
}

以上思路:为了方便参数的设定,我定义了两个final的静态数据,分别是K,V 其实可以直接把计算结果放入context中。首先我们把UserBuyGoodsList得到的数据按行读取按”\t”分隔,但是同现矩阵是不需要用户数据的,所以要把tokens[1]再按”,”分隔存入数组items[],这里我们设计一个for循环,为了得到同现的商品编号,itemA与itemB匹配一次(设为K值)则偏移量(V)加一提交给reduce端(这里itemA和itemB都是从0开始的,也就是说会出现自己匹配自己的情况,这些我们在后面可以进行去重(也可以在矩阵相乘时忽略掉),这时可以脑补一下map端提交的数据应该是20001 20001 1     20001 200011     .... 也就是reduce端需要将同现的偏移量累加才能形成同现矩阵,所以用for循环迭代地将value值加到sum中再输出就可以了。

结果数据:

20001      20001      3

20001      20002      2

(数据过多浪费地方就不贴了)

10.计算商品共现矩阵

文件:GoodsConcurrenceMatrix

类名:GoodsConcurrenceMatrixMapper

GoodsConcurrenceMatrixReducer

将共现次数记为矩阵进入计算

代码实现:

package com.briup.bigdata.project.grms;import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;
import java.util.Iterator;public class GoodsConcurrenceMatrix extends Configured implements Tool {static class GoodsConcurrenceMatrixMapper extends Mapper<LongWritable,Text,Text,Text>{@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {StringBuilder sb = new StringBuilder();String s[] = value.toString().split("\t");sb.append(s[1]).append(":").append(s[2]);context.write(new Text(s[0]),new Text(sb.toString()));}}static class GoodsConcurrenceMatrixReducer extends Reducer<Text,Text,Text,Text>{@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {StringBuilder builder = new StringBuilder();Iterator<Text> iterator = values.iterator();if(iterator.hasNext()) builder.append(iterator.next()).append(",");context.write(key,new Text(builder.toString().substring(0,builder.length()-1)));}}public int run(String[] strings) throws Exception {Configuration conf = getConf();Job job = Job.getInstance(conf,this.getClass().getSimpleName());Path in = new Path(conf.get("in"));Path out = new Path(conf.get("out"));job.setJarByClass(GoodsConcurrenceMatrix.class);job.setMapperClass(GoodsConcurrenceMatrixMapper.class);job.setReducerClass(GoodsConcurrenceMatrixReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);TextInputFormat.addInputPath(job,in);TextOutputFormat.setOutputPath(job,out);return job.waitForCompletion(true)?0:-1;}public static void main(String[] args) throws Exception {System.exit(ToolRunner.run( new GoodsConcurrenceMatrix(),args));}
}

以上思路:由于我们已经把同现次数算出来了,只要把后面两项数据以”:”append进stringbuilder并提交给reduce端,而reduce端只需要将key值相同的value以”,”append起来输出即可(这里使用了if(iterator.hasNext()) 也可以使用循环语句)。

计算结果:
            20001    20001:3,20002:2,20005:2,20006:2,20007:1
            20002    20001:2,20002:3,20005:2,20006:2,20007:2
            20003    20003:1,20004:1,20006:1
            20004    20003:1,20004:2,20006:1,20007:1
            20005    20001:2,20002:2,20005:2,20006:2,20007:1
            20006    20001:2,20002:2,20003:1,20004:1,20005:2,20006:3,20007:1
            20007    20001:1,20002:2,20004:1,20005:1,20006:1,20007:3

11. 计算用户的购买向量

文件:UserBuyGoodsVector.java

类名:UserBuyGoodsVector

UserBuyGoodsVectorMapper

UserBuyGoodsVectorReducer

源数据:第1步的结果或者最原始数据。

10001  20001,20005,20006,20007,20002

10002   20006,20003,20004

10003   20002,20007

10004   20001,20002,20005,20006

10005  20001

10006   20004,20007

代码实现:

package com.briup.bigdata.project.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.*;import java.io.IOException;public class UserBuyGoodsVector extends Configured implements Tool{//源数据是UserBuyGoodsList的结果static class UserBuyGoodsVectorMapper extends Mapper<LongWritable,Text,Text,Text>{@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String s[] = value.toString().split("\t");String vs[]=  s.toString().split(",");for(String v:vs){context.write(new Text(v),new Text(s[0]+":1"));}}}static class UserBuyGoodsVectorReducer extends Reducer<Text,Text,Text,Text>{@Overrideprotected void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{StringBuilder sb=new StringBuilder();for(Text value : values){sb.append(value.toString()).append(",");}context.write(key,new Text(sb.substring(0,sb.length()-1)));}}public int run(String[] strings) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = getConf();Path in = new Path(conf.get("in"));Path out = new Path(conf.get("out"));Job job = Job.getInstance(conf, this.getClass().getSimpleName());job.setJarByClass(UserBuyGoodsVector.class);job.setMapperClass(UserBuyGoodsVectorMapper.class);job.setReducerClass(UserBuyGoodsVectorReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);TextInputFormat.addInputPath(job,in);TextOutputFormat.setOutputPath(job,out);return job.waitForCompletion(true)?0:-1;}public static void main(String[] args) throws Exception {System.exit(ToolRunner.run(new UserBuyGoodsVector(),args));}
}

以上思路:如果无法一步到位,可以使用两次MapReduce达到目的,此代码尽可能的简化。

用两个字符串数组存储用户编号和商品编号,s[1]中以逗号分隔存储在vs中,分别作为key值,s[0]每一项记一次作为value值,提交给reduce端。这里我们可以理解为,每个商品对其被购买的用户编号记了一次数,接下来就要把他们合并。于是reduce端中创建一个stringbuilder把所有同key值的value用”,”append即可。

计算结果:

20001      10001:1,10004:1,10005:1

20002      10001:1,10003:1,10004:1

20003      10002:1

20004      10002:1,10006:1

20005      10001:1,10004:1

20006      10001:1,10002:1,10004:1

20007      10001:1,10003:1,10006:1

12. 商品共现矩阵乘以用户购买向量,形成临时的推荐结果。

文件:MultiplyGoodsMatrixAndUserVector.java

类名:MultiplyGoodsMatrixAndUserVectorFirstMapper

MultiplyGoodsMatrixAndUserVectorSecondMapper

文件:MultiplyGoodsMatrixAndUserVectorReducer

思考:文件的来源,来自于两个文件,第一个是第3步的结果(物品的共现矩阵),第二个文件是第4步的结果(用户的购买向量)。所以在一个MR程序中,需要使用两个自定义Mapper分别处理,然后定义一个自定义Reducer来处理这两个Mapper的中间结果。

1.保证两个Mapper的Key要相同。

2.两个Mapper的数据输出的Key和Value的数据类型是一致的。

3.在作业配置中,对于Mapper端的配置需要使用MultipleInputs.addInputPath(job,数据的输入路径,数据输入的格式控制器.class,执行的Mapper类.class);

原始数据:第3步和第4步的结果数据。

代码实现:

package com.briup.bigdata.project.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;public class MultiplyGoodsMatrixAndUserVector extends Configured implements Tool{static class MultiplyGoodsMatrixAndUserVectorFirstMapper extends Mapper<LongWritable,Text,Text,Text>{@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String s[]=value.toString().split("\t");context.write(new Text(s[0]),new Text("m"+s[1]));//处理矩阵}}static class MultiplyGoodsMatrixAndUserVectorSecondMapper extends Mapper<LongWritable,Text,Text,Text>{@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String s[]=value.toString().split("\t");context.write(new Text(s[0]),new Text("v"+s[1]));//处理用户购买向量}}static class MultiplyGoodsMatrixAndUserVectorReducer extends Reducer<Text,Text,Text,Text>{@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {String ms[]=null;String vs[]=null;for(Text value:values){String str=value.toString();if(str.charAt(0)=='m'){ms=str.substring(1).split(",");}if(str.charAt(0)=='v'){vs=str.substring(1).split(",");}}for (String m : ms) {for (String v : vs) {String[] mss = m.split(":");String[] vss = v.split(":");long vv = Long.parseLong(vss[1]);long mm = Long.parseLong(mss[1]);context.write(new Text(vss[0]+","+mss[0]),new Text((vv*mm)+""));}}}}public static void main(String[] args) throws Exception {System.exit(ToolRunner.run(new MultiplyGoodsMatrixAndUserVector(),args));}public int run(String[] strings) throws Exception {Configuration conf=getConf();Path in1=new Path(conf.get("in1"));Path in2=new Path(conf.get("in2"));Path out=new Path(conf.get("out"));Job job=Job.getInstance(conf,this.getClass().getSimpleName());job.setJarByClass(this.getClass());MultipleInputs.addInputPath(job,in1,TextInputFormat.class,MultiplyGoodsMatrixAndUserVectorFirstMapper.class);MultipleInputs.addInputPath(job,in2,TextInputFormat.class,MultiplyGoodsMatrixAndUserVectorSecondMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);job.setReducerClass(MultiplyGoodsMatrixAndUserVectorReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job,out);return job.waitForCompletion(true)?0:1;}
}

以上思路:由于要对两个矩阵相乘,故必须用两个Mapper将商品编号拎出来作为key值,将后面的内容作为value值都提交给reduce端,而这两个提交给reduce的数据必须标记出来,分别以”m”,”v”标记。在reduce端中,针对’m’开头的字符串,取’m’后的数据按’,’分隔存入数组ms,vs同理。现在我们得到的数据是有”:”的,而”:”前面的数据并不参与矩阵的乘法计算,所以再做一个for循环,分别将两个矩阵中所有数据按”:”分隔,将后面的数据转化为Long类型相乘,前面的数据整合到一起作为key值输出。

该类使用了两个map作为输入,所以在run方法中配置作业时要注意使用MultipleInputs.addInputPath的方法。

计算结果:

10001,20001   2

10001,20001   2

10001,20001   3

10001,20001   1

10001,20001   2

10001,20002   3

10001,20002   2

10001,20002   2

(数据过多浪费地方就不贴了)

13. 对第5步计算的推荐的零散结果进行求和。

文件:MakeSumForMultiplication.java

MakeSumForMultiplicationMapper

MakeSumForMultiplicationReducer

原始数据:第5步的计算结果

代码实现:

package com.briup.bigdata.project.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;public class MakeSumForMultiplication extends Configured implements Tool{static class MakeSumForMultiplicationMapper extends Mapper<LongWritable,Text,Text,Text>{@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String s[] = value.toString().split("\t");context.write(new Text(s[0]),new Text(s[1]));}}static class MakeSumForMultiplicationReducer extends Reducer<Text,IntWritable,Text,IntWritable>{@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum=0;for (IntWritable value : values) {sum = value.get();}context.write(key,new IntWritable(sum));}}public int run(String[] strings) throws Exception {Configuration conf = getConf();Path in = new Path(conf.get("in"));Path out = new Path(conf.get("out"));Job job = Job.getInstance(conf, this.getClass().getSimpleName());job.setJarByClass(MakeSumForMultiplication.class);job.setMapperClass(MakeSumForMultiplicationMapper.class);job.setReducerClass(MakeSumForMultiplicationReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);TextInputFormat.addInputPath(job,in);TextOutputFormat.setOutputPath(job,out);return job.waitForCompletion(true)?0:-1;}public static void main(String[] args) throws Exception {System.exit(ToolRunner.run(new MakeSumForMultiplication(),args));}
}

以上思路:这里就是一个简单的词频统计了,将数据按”\t”分隔,前面为key,后面为value,提交给reduce端,key值相同的相加即可。

计算结果:

10001,20001   10

10001,20002   11

10001,20003   1

10001,20004   2

10001,20005   9

(数据过多浪费地方就不贴了)

14.数据去重,在推荐结果中去掉用户已购买的商品信息。
        文件:DuplicateDataForResult.java
        类名:DuplicateDataForResultFirstMapper
              DuplicateDataForResultSecondMapper
              DuplicateDataForResultReducer
        数据来源:
            1.FirstMapper处理用户的购买列表数据。

2.SecondMapper处理 MakeSumForMutiplication的推荐结果数据。

代码实现:

package com.briup.bigdata.project.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.*;import java.io.IOException;
import java.util.*;public class DuplicateDataForResult extends Configured implements Tool{static class DuplicateDataForResultFirstMapper extends Mapper<LongWritable,Text,Text,Text>{@Overrideprotected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{String[] strs=value.toString().split("[\t]");for(String s : strs[1].split(",")){context.write(new Text(strs[0]+","+s),new Text("r"+value.toString()));}}}static class DuplicateDataForResultSecondMapper extends Mapper<LongWritable,Text,Text,Text>{@Overrideprotected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{String[] strs=value.toString().split("[\t]");context.write(new Text(strs[0]),new Text("u"+value.toString()));}}static class DuplicateDataForResultReducer extends Reducer<Text,Text,Text,Text>{@Overrideprotected void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{Map<String,String> map=new HashMap<String, String>();List<String> list=new ArrayList<String>();for(Text value : values){String val=value.toString();if(val.charAt(0)=='r') list.add(key.toString());if(val.charAt(0)=='u') map.put(key.toString(),val.substring(1));}for(String str : list){map.remove(str);}for(String str : map.keySet()){String[] strs=map.get(str).split(",");context.write(new Text(strs[0]),new Text(strs[1]));}}}public int run(String[] args) throws Exception{Configuration conf=getConf();Path in1=new Path(conf.get("in1"));Path in2=new Path(conf.get("in2"));Path out=new Path(conf.get("out"));Job job=Job.getInstance(conf,this.getClass().getSimpleName());job.setJarByClass(this.getClass());MultipleInputs.addInputPath(job,in1,TextInputFormat.class,DuplicateDataForResultFirstMapper.class);MultipleInputs.addInputPath(job,in2,TextInputFormat.class,DuplicateDataForResultSecondMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);job.setReducerClass(DuplicateDataForResultReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job,out);job.setNumReduceTasks(1);return job.waitForCompletion(true)?0:1;}
}

以上思路:将来自于UserBuyGoodsList中的数据中所有用户与其相应购买的商品编号用","并在一起提交给reduce,这样可以与从MakeSumForMultiplication得到的数据有相同的key值,由于是两个Mapper提交的数据,为了区分我们分别给他坐上标记"r","u"。在reduce端中,根据其来源不同分别做处理:将用户已购买商品信息存入List,将推荐信息存入HashMap,这里HashMap的key值在形式上和List相同,这样List就可以作为Map的一个索引,凡是存在于List中的,都是用户已经购买了的所以不需要推荐,于是将索引相同的Map数据remove掉,最后这个Map中存储的就是我们要的数据了,因为下一步要将数据存储进数据库所以输出的数据不需要有","了,用一个for循环输出所有相应key值的value(这个循环中的map.keySet()就是按顺序循环操作对应key值的map)

计算结果:

10001    20004    2
            10001    20003    1
            10002    20002    2
            10002    20007    2
            10002    20001    2
            10002    20005    2
            10003    20006    3
            10003    20005    3
            10003    20001    3
            10003    20004    1
            10004    20007    5
            10004    20004    1
            10004    20003    1
            10005    20006    2
            10005    20002    2
            10005    20005    2
            10005    20007    1
            10006    20006    2
            10006    20002    2
            10006    20005    1
            10006    20003    1
            10006    20001    1
15.将推荐结果保存到MySQL数据库中
        注意:
            a.保证表提前存在。
                grms.results(uid varchar(20),
                gid varchar(20),
                exp int)
            b.通过MR程序将HDFS集群上的数据保存到MySQL数据库中的时候,只能将最终输出的Key值保存到数据库中。

c.自定义最终输出的Key的数据类型。自定义的类实现WritableComparable<自定义的类>,但是作为将数据从HDFS集群输出到MySQL数据库中的Key,还要实现DBWritable接口。

readFields(ResultSet rs)
            write(PrepareStatement ps)

A impl WC,DBW{
                private String uid;
                private String gid;
                private int exp;
            
                readFields(ResultSet rs){
                    uid=rs.getString(1);
                }

write(PrepareStatement ps){
                    ps.setString(1,uid);
                    ps.setString(2,gid);
                    ps.setInt(1,exp);
                }
            }
            d.在作业配置中,需要使用DBConfiguration.setConfiguration()指定连接数据库的相关参数。
                参数1:和当前作业相关的配置对象,Configuration对象要通过Job对象来获取;
                参数2:"com.mysql.jdbc.Driver"
                参数3:"jdbc:mysql://ip:port/grms"
                参数4和5:"用户名"和"密码"。
            e.数据输出的格式控制需要使用DBOutputFormat。
                DBOutputFormat.setOutput();有三个参数:
                    参数1:Job对象。
                    参数2:数据库表名
                    参数3:可变长参数,指的是往数据库中插入的列名。
                    insert into 数据库表名 values(?,?,?);
        文件:SaveRecommendResultToDB.java
        类名:SaveRecommendResultToDBMapper<LW,Text,Text,Text>
              SaveRecommendResultToDBReducer<Text,Text,自定义的Key,NullWritable>
        数据来源:第7步的结果数据。
        数据去向:MySQL数据库,grms.result

package com.briup.bigdata.project.grms;
import java.io.IOException;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;public class SaveRecommendResultToDB extends Configured implements Tool{static class SaveRecommendResultToDBMapper extends Mapper<LongWritable,Text,Text,Text>{@Overrideprotected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{String[] strs=value.toString().split("\t");context.write(new Text(strs[0]+"\t"+strs[1]),new Text(strs[2]));}}static class SaveRecommendResultToDBReducer extends Reducer<Text,Text,RecommendResultDB,NullWritable>{private RecommendResultDB rrdb=new RecommendResultDB();@Overrideprotected void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{String[] strs=key.toString().split("\t");rrdb.setUid(strs[0]);rrdb.setGid(strs[1]);rrdb.setExp(Integer.parseInt(values.iterator().next().toString()));context.write(rrdb,NullWritable.get());}}public int run(String[] args) throws Exception{Configuration conf=getConf();Path in=new Path(conf.get("in"));Job job=Job.getInstance(conf,this.getClass().getSimpleName());job.setJarByClass(this.getClass());job.setMapperClass(SaveRecommendResultToDBMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);job.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job,in);job.setReducerClass(SaveRecommendResultToDBReducer.class);job.setOutputKeyClass(RecommendResultDB.class);job.setOutputValueClass(NullWritable.class);job.setOutputFormatClass(DBOutputFormat.class);Properties prop=new Properties();prop.load(this.getClass().getResourceAsStream("/db.properties"));DBConfiguration.configureDB(job.getConfiguration(),prop.getProperty("grms.driver"),prop.getProperty("grms.url"),prop.getProperty("grms.username"),prop.getProperty("grms.password"));DBOutputFormat.setOutput(job,prop.getProperty("grms.tblname"),"uid","gid","exp");return job.waitForCompletion(true)?0:1;}
}
package com.briup.bigdata.project.grms;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Objects;import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;public class RecommendResultDB implements DBWritable,WritableComparable<RecommendResultDB>{private String uid;private String gid;private int exp;public RecommendResultDB(){}public RecommendResultDB(String uid,String gid,int exp){this.uid=uid;this.gid=gid;this.exp=exp;}@Overridepublic int compareTo(RecommendResultDB o){int uidComp=this.uid.compareTo(o.uid);int gidComp=this.gid.compareTo(o.gid);int indexComp=this.exp-o.exp;return uidComp==0?(gidComp==0?indexComp:gidComp):uidComp;}@Overridepublic void write(DataOutput out) throws IOException{out.writeUTF(uid);out.writeUTF(gid);out.writeInt(exp);}@Overridepublic void readFields(DataInput in) throws IOException{uid=in.readUTF();gid=in.readUTF();exp=in.readInt();}@Overridepublic void write(PreparedStatement preparedStatement) throws SQLException{preparedStatement.setString(1,uid);preparedStatement.setString(2,gid);preparedStatement.setInt(3,exp);}@Overridepublic void readFields(ResultSet resultSet) throws SQLException{if(resultSet==null) return;uid=resultSet.getString(1);gid=resultSet.getString(2);exp=resultSet.getInt(3);}@Overridepublic boolean equals(Object o){if(this==o) return true;if(!(o instanceof RecommendResultDB)) return false;RecommendResultDB that=(RecommendResultDB)o;return getExp()==that.getExp()&&Objects.equals(getUid(),that.getUid())&&Objects.equals(getGid(),that.getGid());}@Overridepublic int hashCode(){return Objects.hash(getUid(),getGid(),getExp());}public String getUid(){return uid;}public void setUid(String uid){this.uid=uid;}public String getGid(){return gid;}public void setGid(String gid){this.gid=gid;}public int getExp(){return exp;}public void setExp(int exp){this.exp=exp;}@Overridepublic String toString(){return "RecommendResultDB{"+"uid='"+uid+'\''+", gid='"+gid+'\''+", exp="+exp+'}';}
}

以上思路:设计一个RecommendResultDB类 在RecommendResulttoDB中的reduce端,将map拆分好的三个字符串按照uid,gid,exp的顺序存储进数据库。

16. 构建作业流对象(JobControl),让程序自行提交作业。

文件:GoodsRecommendationManagementSystemJobController.java

类名:GoodsRecommendationManagementSystemJobController

1.可以看到我们上面设计了9个类,其中一个是对RecommendResulttoDB的没有输入输出,为了方便我们一次性对所有作业配置,分别创建step1到step8的Job对象,然后进行各自的作业配置。

2.创建8个ControlledJob对象,将上一步的Job对象转化成可被控制的作业。

3.对可被控制的作业添加依赖关系。

4.构建JobControl对象,将8个可被控制的作业逐个添加。

5.构建线程对象,并启动线程,执行作业。

package com.briup.bigdata.project.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.util.Properties;public class GoodsRecommendationManagemetSystemJobController extends Configured implements Tool {@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();Path in1=new Path(conf.get("in1"));Path out1=new Path(conf.get("out1"));Path out2=new Path(conf.get("out2"));Path out3=new Path(conf.get("out3"));Path out4=new Path(conf.get("out4"));Path out5=new Path(conf.get("out5"));Path out6=new Path(conf.get("out6"));Path out7=new Path(conf.get("out7"));//--step1--Job job1=Job.getInstance(conf,UserBuyGoodsList.class.getSimpleName());job1.setJarByClass(this.getClass());job1.setMapperClass(UserBuyGoodsList.UserBuyGoodsListMapper.class);job1.setMapOutputKeyClass(Text.class);job1.setMapOutputValueClass(Text.class);job1.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job1,in1);job1.setReducerClass(UserBuyGoodsList.UserBuyGoodsListReducer.class);job1.setOutputKeyClass(Text.class);job1.setOutputValueClass(Text.class);job1.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job1,out1);//--step2--Job job2=Job.getInstance(conf,GoodsConcurrenceList.class.getSimpleName());job2.setJarByClass(this.getClass());job2.setMapperClass(GoodsConcurrenceList.GoodsConcurrenceListMapper.class);job2.setMapOutputKeyClass(Text.class);job2.setMapOutputValueClass(Text.class);job2.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job2,out1);job2.setReducerClass(GoodsConcurrenceList.GoodsConcurrenceListReducer.class);job2.setOutputKeyClass(Text.class);job2.setOutputValueClass(Text.class);job2.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job2,out2);//--step3--Job job3=Job.getInstance(conf,GoodsConcurrenceMatrix.class.getSimpleName());job3.setJarByClass(this.getClass());job3.setMapperClass(GoodsConcurrenceMatrix.GoodsConcurrenceMatrixMapper.class);job3.setMapOutputKeyClass(Text.class);job3.setMapOutputValueClass(Text.class);job3.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job3,out2);job3.setReducerClass(GoodsConcurrenceMatrix.GoodsConcurrenceMatrixReducer.class);job3.setOutputKeyClass(Text.class);job3.setOutputValueClass(Text.class);job3.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job3,out3);//--step4--Job job4=Job.getInstance(conf,UserBuyGoodsVector.class.getSimpleName());job4.setJarByClass(this.getClass());job4.setMapperClass(UserBuyGoodsVector.UserBuyGoodsVectorMapper.class);job4.setMapOutputKeyClass(Text.class);job4.setMapOutputValueClass(Text.class);job4.setInputFormatClass(TextInputFormat.class);// 数据来源:第1步的计算结果或者原始数据TextInputFormat.addInputPath(job4,out1);job4.setReducerClass(UserBuyGoodsVector.UserBuyGoodsVectorReducer.class);job4.setOutputKeyClass(Text.class);job4.setOutputValueClass(Text.class);job4.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job4,out4);//--step5--Job job5=Job.getInstance(conf,MultiplyGoodsMatrixAndUserVector.class.getSimpleName());job5.setJarByClass(this.getClass());// 数据来源:第1步的计算结果或者原始数据MultipleInputs.addInputPath(job5,out3,TextInputFormat.class,MultiplyGoodsMatrixAndUserVector.MultiplyGoodsMatrixAndUserVectorFirstMapper.class);MultipleInputs.addInputPath(job5,out4,TextInputFormat.class,MultiplyGoodsMatrixAndUserVector.MultiplyGoodsMatrixAndUserVectorSecondMapper.class);job5.setMapOutputKeyClass(Text.class);job5.setMapOutputValueClass(Text.class);job5.setReducerClass(MultiplyGoodsMatrixAndUserVector.MultiplyGoodsMatrixAndUserVectorReducer.class);job5.setOutputKeyClass(Text.class);job5.setOutputValueClass(Text.class);job5.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job5,out5);//--step6--Job job6=Job.getInstance(conf,MakeSumForMultiplication.class.getSimpleName());job6.setJarByClass(this.getClass());job6.setMapperClass(MakeSumForMultiplication.MakeSumForMultiplicationMapper.class);job6.setMapOutputKeyClass(Text.class);job6.setMapOutputValueClass(LongWritable.class);job6.setInputFormatClass(TextInputFormat.class);// 数据来源:第5步的计算结果TextInputFormat.addInputPath(job6,out5);job6.setReducerClass(MakeSumForMultiplication.MakeSumForMultiplicationReducer.class);job6.setOutputKeyClass(Text.class);job6.setOutputValueClass(LongWritable.class);job6.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job6,out6);//--step7--Job job7=Job.getInstance(conf,DuplicateDataForResult.class.getSimpleName());job7.setJarByClass(this.getClass());MultipleInputs.addInputPath(job7,out1,TextInputFormat.class,DuplicateDataForResult.DuplicateDataForResultFirstMapper.class);MultipleInputs.addInputPath(job7,out6,TextInputFormat.class,DuplicateDataForResult.DuplicateDataForResultSecondMapper.class);job7.setMapOutputKeyClass(Text.class);job7.setMapOutputValueClass(Text.class);job7.setReducerClass(DuplicateDataForResult.DuplicateDataForResultReducer.class);job7.setOutputKeyClass(Text.class);job7.setOutputValueClass(Text.class);job7.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job7,out7);job7.setNumReduceTasks(1);//--step8--Job job8=Job.getInstance(conf,SaveRecommendResultToDB.class.getSimpleName());job8.setJarByClass(this.getClass());job8.setMapperClass(SaveRecommendResultToDB.SaveRecommendResultToDBMapper.class);job8.setMapOutputKeyClass(Text.class);job8.setMapOutputValueClass(Text.class);job8.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job8,out7);job8.setReducerClass(SaveRecommendResultToDB.SaveRecommendResultToDBReducer.class);job8.setOutputKeyClass(RecommendResultDB.class);job8.setOutputValueClass(NullWritable.class);job8.setOutputFormatClass(DBOutputFormat.class);Properties prop=new Properties();prop.load(this.getClass().getResourceAsStream("/db.properties"));DBConfiguration.configureDB(job8.getConfiguration(),prop.getProperty("grms.driver"),prop.getProperty("grms.url"),prop.getProperty("grms.username"),prop.getProperty("grms.password"));DBOutputFormat.setOutput(job8,prop.getProperty("grms.tblname"),"uid","gid","exp");// JobControllerControlledJob cj1=new ControlledJob(job1.getConfiguration());cj1.setJob(job1);ControlledJob cj2=new ControlledJob(job2.getConfiguration());cj2.setJob(job2);ControlledJob cj3=new ControlledJob(job3.getConfiguration());cj3.setJob(job3);ControlledJob cj4=new ControlledJob(job4.getConfiguration());cj4.setJob(job4);ControlledJob cj5=new ControlledJob(job5.getConfiguration());cj5.setJob(job5);ControlledJob cj6=new ControlledJob(job6.getConfiguration());cj6.setJob(job6);ControlledJob cj7=new ControlledJob(job7.getConfiguration());cj7.setJob(job7);ControlledJob cj8=new ControlledJob(job8.getConfiguration());cj8.setJob(job8);// 添加作业之间的依赖关系cj2.addDependingJob(cj1);cj3.addDependingJob(cj2);cj4.addDependingJob(cj1);cj5.addDependingJob(cj3);cj5.addDependingJob(cj4);cj6.addDependingJob(cj5);cj7.addDependingJob(cj1);cj7.addDependingJob(cj6);cj8.addDependingJob(cj7);// 创建JobControl对象,添加ControlledJobJobControl jc=new JobControl(this.getClass().getSimpleName());jc.addJob(cj1);jc.addJob(cj2);jc.addJob(cj3);jc.addJob(cj4);jc.addJob(cj5);jc.addJob(cj6);jc.addJob(cj7);jc.addJob(cj8);// 构建线程类对象,执行作业Thread thread=new Thread(jc);thread.start();do{for(ControlledJob cj : jc.getRunningJobList()){cj.getJob().monitorAndPrintJob();}}while(!jc.allFinished());return 0;}public static void main(String[] args) throws Exception {System.exit(ToolRunner.run(new GoodsRecommendationManagemetSystemJobController(),args));}
}

到了这里我们就可以把上面所有的类中的main函数都注释掉,打包jar然后在yarn集群中执行GoodsRecommendationManagementSystemJobController程序了,注意-Din和-Dout的使用

我们的商品推荐算法已经基本上完成了。

有条件的可以写一个脚本用于Yarn集群下执行shell命令免得一个一个输-D,挺累的。

附加:db.properties

grms.driver=com.mysql.jdbc.Driver
grms.url=jdbc:mysql://ud2:5721/grms
grms.username=root
grms.password=root
grms.tblname=results

JobUtils类,用于将所有作业配置(除了那些导入多个Mapper的)简化,请根据代码自行简化

package com.briup.bigdata.project.grms.utils;import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler.RandomSampler;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;public class JobUtil{private static Job job;private static String in;private static String out;private static Configuration configuration;public static void setConf(                 // 定义setConf方法Configuration c,                        // MapReduce作业传递的整个作业的配置对象Class cz,                               // MapReduce作业志行的jar包中包含的主类的镜像String name,                            // 作业的名字String vin,                             // 数据的输入路径String vout                             // 数据的输出路径){try{if(c==null){throw new Exception("配置信息不能为null。");}job=Job.getInstance(c,name);        // 构建Job对象,设置配置对象和作业名job.setJarByClass(cz);              // 提供执行的作业的主类的镜像in=vin;                             // 将数据的输入路径传递给全局变量out=vout;                           // 将数据的输出路径传递给全局变变量configuration=c;}catch(Exception e){e.printStackTrace();}}public static void setMapper(               // 定义setMapper方法Class<? extends Mapper> x,              // 设置作业中运行的Mapper类的镜像参数Class<? extends Writable> y,            // 设置作业中Mapper的Key的数据类型参数Class<? extends Writable> z,            // 设置作业中Mapper的Value的数据类型参数Class<? extends InputFormat> o          // 设置作业中数据输入的格式参数){try{job.setMapperClass(x);job.setMapOutputKeyClass(y);job.setMapOutputValueClass(z);job.setInputFormatClass(o);o.getMethod("addInputPath",Job.class,Path.class).invoke(null,job,new Path(in));}catch(Exception e){e.printStackTrace();}}public static void setReducer(              // 定义setReducer方法Class<? extends Reducer> a,             // 设置作业中运行的Reducer类的镜像参数Class<? extends Writable> b,            // 设置作业中Reducer的Key的数据类型参数Class<? extends Writable> c,            // 设置作业中Reducer的Value的数据类型参数Class<? extends OutputFormat> d,        // 设置作业中数据输出的格式参数int rnum                                // 设置Reducer的个数){try{job.setReducerClass(a);job.setOutputKeyClass(b);job.setOutputValueClass(c);job.setOutputFormatClass(d);d.getMethod("setOutputPath",Job.class,Path.class).invoke(null,job,new Path(out));job.setNumReduceTasks(rnum);}catch(Exception e){e.printStackTrace();}}public static void setTotalSort(float a,int b,int c) throws InterruptedException, IOException, ClassNotFoundException, URISyntaxException{job.setPartitionerClass(TotalOrderPartitioner.class);InputSampler.writePartitionFile(job,new RandomSampler(a,b,c));job.addCacheFile(new URI(TotalOrderPartitioner.getPartitionFile(getConfiguration())));}public static void setSecondarySort(Class<? extends WritableComparator> g,Class<? extends WritableComparator> s,Class<? extends Partitioner> p) throws ClassNotFoundException{job.setPartitionerClass(p);job.setGroupingComparatorClass(g);job.setSortComparatorClass(s);}public static void setCombiner(boolean flag,Class<? extends Reducer> combiner){if(flag&&combiner!=null) job.setCombinerClass(combiner);}public static int commit() throws Exception{return job.waitForCompletion(true)?0:1;         // 提交作业}public static Job getJob(){return job;}public static void setJob(Job xyz){JobUtil.job=xyz;}public static Configuration getConfiguration(){return job.getConfiguration();}public static void setConfiguration(Configuration configuration){JobUtil.configuration=configuration;}
}

MapReduce实现商品推荐算法(用户购买向量*商品同现矩阵)相关推荐

  1. Vue.js+Mysq+java+springboot+商品推荐算法实现商品推荐网站+商品管理系统后台

    面对海量的商品信息如何实现针对不同用户维度开展个性化商品推荐,实现用户线上选购商品,下订单,支付,物流配送等?本次毕设程序基于前后端分离开发模式,搭建系统网络商品推荐系统前台与系统后台商品管理系统,通 ...

  2. 基于java与springboot结合商品推荐算法实现商品推荐网站+商品管理系统后台,mysql,vue

    面对海量的商品信息如何实现针对不同用户维度开展个性化商品推荐,实现用户线上选购商品,下订单,支付,物流配送等?本次毕设程序基于前后端分离开发模式,搭建系统网络商品推荐系统前台与系统后台商品管理系统,通 ...

  3. (转) 基于MapReduce的ItemBase推荐算法的共现矩阵实现(一)

    转自:http://zengzhaozheng.blog.51cto.com/8219051/1557054 一.概述 这2个月为公司数据挖掘系统做一些根据用户标签情况对用户的相似度进行评估,其中涉及 ...

  4. 购物网站的商品推荐算法有哪些?

    作者:Razzit 链接:https://www.zhihu.com/question/19967564/answer/25015492 来源:知乎 著作权归作者所有.商业转载请联系作者获得授权,非商 ...

  5. 无人货架的商品推荐算法

    首先这是一个针对无人货架商品调整的思路.无人货架在安装时,会默认的冷启动一些商品.在随后的销售过程中,需要根据这个货架的商品消耗情况去调整此货架的商品摆放.撤掉卖不掉的商品,再摆放预计会卖得好的商品. ...

  6. 【推荐算法】阿里大规模商品推荐算法

    概述 本文为阿里2018年发表论文<Billion-scale Commodity Embedding for E-commerce Recommendation in Alibaba>的 ...

  7. 推荐算法-用户推荐(UserCF)和物品推荐(ItemCF)对比

    一.定义 UserCF:推荐那些和他有共同兴趣爱好的用户喜欢的物品 ItemCF:推荐那些和他之前喜欢的物品类似的物品 根据用户推荐重点是反应和用户兴趣相似的小群体的热点,根据物品推荐着重与用户过去的 ...

  8. 基于python 的电影推荐算法_基于python语言编程的矩阵分解电影推荐算法

    [实例简介]一种基于矩阵分解方法的电影推荐算法 [实例截图] [核心代码] import numpy as np from numba import cuda, float64, jit from s ...

  9. 词向量发展史-共现矩阵-SVD-NNLM-Word2Vec-Glove-ELMo

    话不多说,直接上干货. 首先介绍相关概念: 词嵌入:把词映射为实数域上向量的技术也叫词嵌入(word embedding). 词向量的分类表示: 一.共现矩阵 通过统计一个事先指定大小的窗口内的wor ...

  10. 斯坦福cs224n-2021 assignment1-探索词向量—词共现矩阵—SVD(奇异值分解)

    词共现矩阵: 通过统计一个事先指定大小(window_size)的窗口内的word共现次数,以word周边的共现词的次数做为当前word的vector. SVD(奇异值分解) 基于共现矩阵得到的离散词 ...

最新文章

  1. [转]WebPack 常用功能介绍
  2. js for循环与for in循环的区别
  3. 【附赠PPT】 KubeMeet 成都站回顾:让云原生应用交付和管理变得更简单
  4. vim E492: Not an editor command: ^M
  5. mysql explain output_MySQL查询优化之explain的深入解析【转载】
  6. dto 是只给前端需要的数据吗_DO、VO、DTO...XXOO,你弄明白了么
  7. 配置cacti 监控squid
  8. VS2010项目的部署与安装
  9. 第四次作业——测试作业
  10. php 热点图,JavaScript_javascript如何写热点图,在gis中,你如果用js来写热点 - phpStudy...
  11. tomcat使用自签名证书实现https加密访问
  12. 运用Vue Router的进程守护修改单页的title
  13. Drupal 主题函数知识
  14. 【业务分析】为什么YouTube广告只看5秒就可跳过,却更赚钱?
  15. ajax异步请求中途取消
  16. python-字符串 修改间隔符 and定义变量 交换变量
  17. gitlab仓储搭建
  18. 小学生识字现状调查问卷
  19. 手把手带你把小爱同学装进你的台灯
  20. gpfs集群linux编译环境,GPFS通用并行文件系统之Python自动部署GPFS集群

热门文章

  1. wps怎么打包图片_wps如何压缩图片
  2. 压缩JPG图片大小而且不失真
  3. matlab科研绘图模板,直接奉上源代码!
  4. 5.2.3 UE behaviour in state 5GMM-REGISTERED
  5. 怎么把高清图片导入到CAD图纸文件中?
  6. win7系统中如何使文件显示出扩展名
  7. 亚马逊AWS EC2服务器配置教程
  8. 数字图像处理复习总结
  9. TSO、UFO、GSO、LRO、GRO和RSS介绍(ethtool命令)
  10. verilog实现pwm