商品推荐

入口方法

package cn.sxt.itemcf;
import org.apache.hadoop.conf.Configuration;
import java.util.HashMap;
import java.util.Map;
public class StartRun {public static void main(String[] args) {Configuration conf = new Configuration();conf.set("mapreduce.app-submission.corss-paltform", "true");conf.set("mapreduce.framework.name", "local");Map<String,String> paths = new HashMap<String,String>();paths.put("Step1Input","/data/itemcf/input/");paths.put("Step1Output","/data/itemcf/output/step1");paths.put("Step2Input",paths.get("Step1Output"));paths.put("Step2Output","/data/itemcf/output/step2");paths.put("Step3Input",paths.get("Step2Output"));paths.put("Step3Output","/data/itemcf/output/step3");paths.put("Step4Input1",paths.get("Step2Output"));paths.put("Step4Input2",paths.get("Step3Output"));paths.put("Step4Output","/data/itemcf/output/step4");paths.put("Step5Input",paths.get("Step4Output"));paths.put("Step5Output","/data/itemcf/output/step5");paths.put("Step6Input",paths.get("Step5Output"));paths.put("Step6Output","/data/itemcf/output/step6");
// Step1.run(conf,paths);//去除重复行
// Step2.run(conf,paths);
// Step3.run(conf,paths);
// Step4.run(conf,paths);
// Step5.run(conf,paths);Step6.run(conf,paths);}public static Map<String,Integer> R = new HashMap<String,Integer>();static {R.put("click",1);R.put("collect",2);R.put("cart",3);R.put("alipay",4);}
}package cn.sxt.itemcf;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.Map;
public class Step1 {public static boolean run(Configuration conf, Map<String, String> paths) {try {FileSystem fs = FileSystem.get(conf);Job job = Job.getInstance(conf);job.setJobName("step1");job.setJarByClass(Step1.class);job.setMapperClass(Step1_Mapper.class);job.setReducerClass(Step1_Reduce.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);FileInputFormat.addInputPath(job,new Path(paths.get("Step1Input")));Path output = new Path(paths.get("Step1Output"));if(fs.exists(output)){fs.delete(output,true);}FileOutputFormat.setOutputPath(job,output);boolean b = job.waitForCompletion(true);return b;} catch (Exception e) {e.printStackTrace();}return false;}private static class Step1_Mapper extends Mapper<LongWritable,Text,Text,NullWritable> {//key是行的偏移量,这里是将第一行以外的数据做处理,就是去除掉第一行 item_id,user_id,action,vtime@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {if(key.get() != 0){//以数据作为key,直接达到去重的目的context.write(value,NullWritable.get());}}}private static class Step1_Reduce extends Reducer<Text,IntWritable,Text,NullWritable> {@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {context.write(key,NullWritable.get());}}
}package cn.sxt.itemcf;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class Step2 {public static boolean run(Configuration conf, Map<String, String> paths) {try {FileSystem fs = FileSystem.get(conf);Job job = Job.getInstance(conf);job.setJobName("step2");job.setJarByClass(Step2.class);job.setMapperClass(Step2_Mapper.class);job.setReducerClass(Step2_Reduce.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);FileInputFormat.addInputPath(job,new Path(paths.get("Step2Input")));Path output = new Path(paths.get("Step2Output"));if(fs.exists(output)){fs.delete(output,true);}FileOutputFormat.setOutputPath(job,output);boolean b = job.waitForCompletion(true);return b;} catch (Exception e) {e.printStackTrace();}return false;}private static class Step2_Mapper extends Mapper<LongWritable,Text,Text,Text> {//进来的数据格式 i160,u2781,click,2014/9/23 22:25@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] tokens = value.toString().split(",");String item = tokens[0];String user = tokens[1];String action = tokens[2];Text k = new Text(user);Integer rv = StartRun.R.get(action);Text v = new Text(item+":"+rv.intValue());//出去的格式为 u2781 i160:2context.write(k,v);}}private static class Step2_Reduce extends Reducer<Text,Text,Text,Text> {@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {//u2781 i160:2Map<String,Integer> r = new HashMap<String,Integer>();for (Text value : values) {String[] vs = value.toString().split(":");String item = vs[0];Integer action = Integer.parseInt(vs[1]);//先判断map集合中有没有该item对应的值,没有就取0,有就取出来和新的相加,表示该用户对该商品的总评分action = ((Integer) (r.get(item) == null ? 0 : r.get(item))).intValue()+action;r.put(item,action);}StringBuffer sb = new StringBuffer();for (Map.Entry<String, Integer> entry : r.entrySet()) {sb.append(entry.getKey()+":"+entry.getValue().intValue()+",");}//打印出来后就是该用户对自己接触过的商品的全部评分context.write(key,new Text(sb.toString()));}}
}package cn.sxt.itemcf;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.Map;
public class Step3 {private final static Text K = new Text();private final static IntWritable V = new IntWritable(1);public static boolean run(Configuration conf, Map<String, String> paths) {try {FileSystem fs = FileSystem.get(conf);Job job = Job.getInstance(conf);job.setJobName("step3");job.setJarByClass(Step3.class);job.setMapperClass(Step3_Mapper.class);job.setReducerClass(Step3_Reduce.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job,new Path(paths.get("Step3Input")));Path output = new Path(paths.get("Step3Output"));if(fs.exists(output)){fs.delete(output,true);}FileOutputFormat.setOutputPath(job,output);boolean b = job.waitForCompletion(true);return b;} catch (Exception e) {e.printStackTrace();}return false;}private static class Step3_Mapper extends Mapper<LongWritable,Text,Text,IntWritable> {//value的值 u2778    i160:8,i270:1,i319:2,i352:5,i487:1,i325:1,i249:2,@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] tokens = value.toString().split("t");//items i160:8String[] items = tokens[1].split(",");for (int i = 0; i < items.length; i++) {String itemA = items[i].split(":")[0];for (int j = 0; j < items.length; j++) {String itemB = items[j].split(":")[0];K.set(itemA+":"+itemB);context.write(K,V);}}}}private static class Step3_Reduce extends Reducer<Text,IntWritable,Text,IntWritable> {//统计的结果表示买了A商品同时又买了B商品的次数,得到一个类矩阵@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable value : values) {sum = sum + value.get();}V.set(sum);context.write(key,V);}}
}package cn.sxt.itemcf;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.regex.Pattern;
public class Step4 {public static boolean run(Configuration conf, Map<String, String> paths) {try {FileSystem fs = FileSystem.get(conf);Job job = Job.getInstance(conf);job.setJobName("step4");job.setJarByClass(Step4.class);job.setMapperClass(Step4_Mapper.class);job.setReducerClass(Step4_Reduce.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);FileInputFormat.setInputPaths(job,new Path[] {new Path(paths.get("Step4Input1")),new Path(paths.get("Step4Input2"))});Path output = new Path(paths.get("Step4Output"));if(fs.exists(output)){fs.delete(output,true);}FileOutputFormat.setOutputPath(job,output);boolean b = job.waitForCompletion(true);return b;} catch (Exception e) {e.printStackTrace();}return false;}private static class Step4_Mapper extends Mapper<LongWritable,Text,Text,Text> {private String flag;@Overrideprotected void setup(Context context) throws IOException, InterruptedException {//进来的有两个目录,一个step2,一个step3,flag的值就是其中之一FileSplit split = (FileSplit) context.getInputSplit();flag = split.getPath().getParent().getName();System.out.println(flag + "**************************");}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//数据要么是 u2778  i160:8,i270:1,i319:2,i352:5,i487:1,i325:1,i249:2, step2//要么是 i100:i184  2 step3String[] tokens = Pattern.compile("[t,]").split(value.toString());if(flag.equals("step3")){String[] v1 = tokens[0].split(":");String itemID1 = v1[0];String itemID2 = v1[1];String num = tokens[1];Text k = new Text(itemID1);Text v = new Text("A:"+itemID2+","+num);//i100 A:i184,2context.write(k,v);}else if(flag.equals("step2")){//上面已经按制表符和逗号做了分隔,到这的数据实际就是//u2778   i160:8 i270:1 i319:2 i352:5 i487:1 i325:1 i249:2// 0 1 2 3 4 5 6 7String userID = tokens[0];for (int i = 1; i < tokens.length; i++) {String[] vector = tokens[i].split(":");String itemID = vector[0];String pref = vector[1];Text k = new Text(itemID);Text v = new Text("B:"+userID+","+pref);//输出的数据为i160 B:u2778,8//i270 B:u2778,1context.write(k,v);}}}}private static class Step4_Reduce extends Reducer<Text,Text,Text,Text> {//i100 A:i184,2//i100 B:u2778,1@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {Map<String, Integer> mapA = new HashMap<String, Integer>();Map<String, Integer> mapB = new HashMap<String, Integer>();for (Text line : values) {String val = line.toString();if(val.startsWith("A:")){String[] kv = Pattern.compile("[t,]").split(val.substring(2));try {mapA.put(kv[0],Integer.parseInt(kv[1]));}catch (Exception e){e.printStackTrace();}}else if(val.startsWith("B:")){String[] kv = Pattern.compile("[t,]").split(val.substring(2));try {//不同的用户对该商品的评价,key是用户mapB.put(kv[0],Integer.parseInt(kv[1]));}catch (Exception e){e.printStackTrace();}}}double result = 0;Iterator<String> iter = mapA.keySet().iterator();while (iter.hasNext()){//同现矩阵中的某个关联商品String mapk = iter.next();//关联的商品名称//i184,2int num = mapA.get(mapk).intValue();Iterator<String> iterb = mapB.keySet().iterator();while (iterb.hasNext()){//对于该商品不同用户的评分String mapkb = iterb.next();//用户IDint pref = mapB.get(mapkb).intValue();result = num*pref;Text k = new Text(mapkb);Text v = new Text(mapk + ","+result);context.write(k,v);}}}}
}package cn.sxt.itemcf;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.regex.Pattern;
public class Step5 {private final static Text K = new Text();private final static Text V = new Text();public static boolean run(Configuration conf, Map<String, String> paths) {try {FileSystem fs = FileSystem.get(conf);Job job = Job.getInstance(conf);job.setJobName("step5");job.setJarByClass(Step5.class);job.setMapperClass(Step5_Mapper.class);job.setReducerClass(Step5_Reduce.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);FileInputFormat.addInputPath(job,new Path(paths.get("Step5Input")));Path output = new Path(paths.get("Step5Output"));if(fs.exists(output)){fs.delete(output,true);}FileOutputFormat.setOutputPath(job,output);boolean b = job.waitForCompletion(true);return b;} catch (Exception e) {e.printStackTrace();}return false;}private static class Step5_Mapper extends Mapper<LongWritable,Text,Text,Text> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] tokens = Pattern.compile("[t,]").split(value.toString());Text k = new Text(tokens[0]);Text v = new Text(tokens[1]+","+tokens[2]);context.write(k,v);}}private static class Step5_Reduce extends Reducer<Text,Text,Text,Text> {@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {Map<String,Double> map = new HashMap<String,Double>();for (Text line : values) {String[] tokens = line.toString().split(",");String itemID = tokens[0];Double source = Double.parseDouble(tokens[1]);if(map.containsKey(itemID)){map.put(itemID,map.get(itemID)+source);}else{map.put(itemID,source);}}Iterator<String> iter = map.keySet().iterator();while (iter.hasNext()){String itemID = iter.next();double source = map.get(itemID);Text v = new Text(itemID+","+source);context.write(key,v);}}}
}package cn.sxt.itemcf;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Map;
import java.util.regex.Pattern;
public class Step6 {private final static Text K = new Text();private final static Text V = new Text();public static boolean run(Configuration conf, Map<String, String> paths) {try {FileSystem fs = FileSystem.get(conf);Job job = Job.getInstance(conf);job.setJobName("step6");job.setJarByClass(Step6.class);job.setMapperClass(Step6_Mapper.class);job.setReducerClass(Step6_Reduce.class);job.setSortComparatorClass(NumSort.class);job.setGroupingComparatorClass(UserGroup.class);job.setMapOutputKeyClass(PairWritable.class);job.setMapOutputValueClass(Text.class);FileInputFormat.addInputPath(job,new Path(paths.get("Step6Input")));Path output = new Path(paths.get("Step6Output"));if(fs.exists(output)){fs.delete(output,true);}FileOutputFormat.setOutputPath(job,output);boolean b = job.waitForCompletion(true);return b;} catch (Exception e) {e.printStackTrace();}return false;}private static class Step6_Mapper extends Mapper<LongWritable, Text, PairWritable, Text> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] tokens = Pattern.compile("[t,]").split(value.toString());String u = tokens[0];String item = tokens[1];String num = tokens[2];PairWritable k = new PairWritable();k.setUid(u);k.setNum(Double.parseDouble(num));V.set(item+":"+num);context.write(k,V);}}private static class Step6_Reduce extends Reducer<PairWritable, Text, Text, Text> {@Overrideprotected void reduce(PairWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {int i = 0;StringBuffer sb = new StringBuffer();for (Text value : values) {if(i == 10){break;}sb.append(value.toString()+",");i++;}K.set(key.getUid());V.set(sb.toString());context.write(K,V);}}private static class NumSort extends WritableComparator {public NumSort() {super(PairWritable.class,true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {PairWritable o1 = (PairWritable) a;PairWritable o2 = (PairWritable) b;int r = o1.getUid().compareTo(o2.getUid());if(r == 0){return -Double.compare(o1.getNum(),o2.getNum());}return r;}}private static class UserGroup extends WritableComparator {public UserGroup() {super(PairWritable.class,true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {PairWritable o1 = (PairWritable) a;PairWritable o2 = (PairWritable) b;return o1.getUid().compareTo(o2.getUid());}}private static class PairWritable implements WritableComparable<PairWritable> {private String uid;private double num;public String getUid() {return uid;}public void setUid(String uid) {this.uid = uid;}public double getNum() {return num;}public void setNum(double num) {this.num = num;}@Overridepublic int compareTo(PairWritable o) {int r = this.uid.compareTo(o.getUid());if(r == 0){return Double.compare(this.num,o.getNum());}return r;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(uid);out.writeDouble(num);}@Overridepublic void readFields(DataInput in) throws IOException {this.uid = in.readUTF();this.num = in.readDouble();}}
}

hadoop商品推荐_北京尚学堂学员学习经验分享:商品推荐相关推荐

  1. hadoop商品推荐_百战卓越班学员学习经验分享:商品推荐

    商品推荐 入口方法 package cn.sxt.itemcf; import org.apache.hadoop.conf.Configuration; import java.util.HashM ...

  2. 北京尚学堂python 百度网盘_北京尚学堂_1903期_Python_全套视频教程(视频、源码、课件)...

    北京尚学堂_1903期_Python_全套视频教程(视频.源码.课件) |____尚学堂官网.url |____尚学堂_程序员修炼手册(电子版).url |____软件开发常用词汇(北京尚学堂发布). ...

  3. python之父北京尚学堂_北京尚学堂 - 主页

    ${content} 你输入的邮件地址曾经通过${type}激活了本站帐号,请使用${type}帐号直接登录. 课程习题 : 提示 请选择一个答案 提交 查看正确答案 下一题 ${option}: $ ...

  4. js 获取sessionid_百战卓越班学员学习经验分享:页面js代码

    点击页面产生的信息经过nginx保存到本地文件 页面js代码 这段js代码是在进入页面的时候就开始执行,模拟用户进入页面产生的信息,它的入口是autoLoad方法 (function() {var C ...

  5. 2018年终总结_四年计算机科班学习经验分享+半年工作感悟

    一.写作初衷&简单自我介绍 本科专业为计算机科学与技术,今年六月毕业后入职一家上市游戏公司从事游戏服务器端开发,九月中旬离开游戏行业,目前在一家互联网公司从事图形图像音视频算法岗,偶尔兼顾服务 ...

  6. 北京尚学堂:小白如何快速入门编程

    2019独角兽企业重金招聘Python工程师标准>>> 版权声明:本文为北京尚学堂原创文章,未经允许不得转载.​ 大学里面学的是理论知识,比较广泛,主要是对编程进行一个大体的介绍,对 ...

  7. 北京尚学堂退课退课,口碑还不错

    解约协议书 甲方:北京尚学堂科技有限公司 统一社会信用代码:911101067855246634 乙方: 手机号: 甲方北京尚学堂科技有限公司与乙方 姓名 于 年 月 日签订的教育培训服务合同,乙方以 ...

  8. 北京尚学堂|为什么要学习Java

    2019独角兽企业重金招聘Python工程师标准>>> 版权声明:本文为北京尚学堂原创文章,未经允许不得转载. 1. 思考一下​ 学习Java之前,先别急,静下心来好好想想: 1) ...

  9. 北京尚学堂偷偷告诉你:作为程序员必备的基本品质

    2019独角兽企业重金招聘Python工程师标准>>> 大多数对程序员的认识都停留在"代码机器,整天就知道敲代码的宅人类",其实不然,一个好的程序员是需要很好的创 ...

最新文章

  1. 从命令行到IDE,版本管理工具Git详解(远程仓库创建+命令行讲解+IDEA集成使用)
  2. python读取文件多行内容-使用python读取.text文件特定行的数据方法
  3. List的方法和属性 方法或属性 作用
  4. 使用indent格式化代码
  5. Qt Creator测验Testing
  6. JDK8之Stream新特性
  7. sys_brk分析 linux1.2.0版本,linux内存管理之sys_brk实现分析(续)
  8. 大数据阶段划分及案例单词统计
  9. jq处理返回来json_JQuery ajax返回JSON时的处理方式 (三种方式)
  10. ubuntu 14.04 配置 java 环境
  11. ssh 无法连接 z/OS 主机
  12. Error:Cause: org/gradle/api/publication/maven/internal/DefaultMavenFactory Android
  13. golang的https服务器
  14. python 模拟自己的手写字体
  15. jieba 同义词_jieba分词详解
  16. 拼团团长模式小程序源码, 团长开团,团员参与!
  17. c++ 11 中显式默认设置的函数和已删除的函数 总结
  18. vue v-if 判断某个元素满足多个条件的写法
  19. 如何利用HTML5快速开发一款小游戏
  20. 国密SSL证书保障网站安全

热门文章

  1. Pulseaudio之libsndfile for android(二十)
  2. Linux 的简单钩子
  3. rtmp之谷歌浏览器默认启用flash
  4. win10磁盘管理界面各系统分区介绍
  5. 有哪些可以远程连接控制云服务器的软件?
  6. 华夏银行招聘计算机笔试题,2019华夏银行招聘结构化面试试题及答案
  7. 2018linux市场份额数据,2018年7月Windows 10市场份额上涨,Linux仅占1.35%
  8. java list 赋值jsp,在Struts中使用JavaBean和List(多行数据)类型属性-JSP教程,Java技巧及代码...
  9. inline函数_inline内联函数
  10. java的linux内核构建,构建一个Docker 的Java编译环境