共同好友:求大量集合的两两交集
目标:令U为包含所有用户的一个集合:{U1,U2,...,Un},我们的目标是为每个(Ui,Uj)对(i!=j)找出共同好友。
前提:好友关系是双向的输入:<person><,><friend1>< ><friend2>< >...<friendN>

100,200 300 400 500 600
200,100 300 400
300,100 200 400 500
400,100 200 300
500,100,300
600,100

解决方案1:POJO共同好友解决方案
令{A1,A2,...,Am}是用户User1的好友集合,{B1,B2,...,B}是用户User2的好友集合。因此
User1和User2的共同好友可以定义为两个集合的交集(共同元素)。public static Set<Integer> intersection(Set<Integer> user1friends,Set<Integer> user2friends)
{
    if(user1friends == null || user2friends == null)
        return null;
    if(user1friends.isEmpty() || user2friends.isEmpty())
        return null;
    if(user1friends.size() < user2friends.size())
        return intersect(user1friends,user2friends);
    else
        return intersect(user2friends,user1friends);
}

public static Set<Integer> intersect(Set<Integer> small,Set<Integer> large)
{
    Set<Integer> result = new TreeSet<Integer>();
    for(Integer x : small)//迭代器处理小集合以提高性能{
        if(large.contains(x))
            result.add(x);
    }
}

解决方案2:Hadoop/MapReduce实现思路:
对于100 200 300 400 500 600,生成
([100,200],[200 300 400 500 600]),意为用户100和用户200中有一方的好友列表为[200 300 400 500 600]--------(1)
([100,300],[200 300 400 500 600]),意为用户100和用户300中有一方的好友列表为[200 300 400 500 600]
([100,400],[200 300 400 500 600]),意为用户100和用户400中有一方的好友列表为[200 300 400 500 600]
([100,500],[200 300 400 500 600]),意为用户100和用户50中有一方的好友列表为[200 300 400 500 600]
([100,600],[200 300 400 500 600]),意为用户100和用户600中有一方的好友列表为[200 300 400 500 600]
对于200 100 300 400,生成
([100,200],[100 300 400]),意为用户100和用户200中有一方的好友列表为[100 300 400]--------------------------(2)
([200,300],[100 300 400]),意为用户200和用户300中有一方的好友列表为[100 300 400]
([200,400],[100 300 400]),意为用户200和用户400中有一方的好友列表为[100 300 400]
...
然后按照键进行规约,例如,(1)和(2)会到达同一个规约器
([100,200],([200 300 400 500 600],[100 300 400])
只需要求两个集合的交集即可:
维护一个<String,Integer>的map,统计各个集合各个元素的出现次数
(100,1)
(200,1)
(300,2)
(400,2)
(500,1)
(600,1)
遍历map找出出现2次的键:300 400
加入结果的值中,输出([100,200],[300 400])



实现1:生成类似([100,200],[200 300 400 500 600])的键值对时使用Text保存[200 300 400 500 600]
package commonfriends;import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.commons.lang.StringUtils;public class CommonFriendsMapperextends Mapper<LongWritable, Text, Text, Text> {private static final Text REDUCER_KEY = new Text();private static final Text REDUCER_VALUE = new Text();static String getFriends(String[] tokens) {if (tokens.length == 2) {return "";}StringBuilder builder = new StringBuilder();for (int i = 1; i < tokens.length; i++) {builder.append(tokens[i]);if (i < (tokens.length - 1)) {builder.append(",");}}return builder.toString();}static String buildSortedKey(String person, String friend) {long p = Long.parseLong(person);long f = Long.parseLong(friend);if (p < f) {return person + "," + friend;} else {return friend + "," + person;}}public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {// parse input, delimiter is a single spaceString[] tokens = StringUtils.split(value.toString(), " ");// create reducer valueString friends = getFriends(tokens);REDUCER_VALUE.set(friends);String person = tokens[0];for (int i = 1; i < tokens.length; i++) {String friend = tokens[i];String reducerKeyAsString = buildSortedKey(person, friend);REDUCER_KEY.set(reducerKeyAsString);context.write(REDUCER_KEY, REDUCER_VALUE);}}}
package commonfriends;import java.util.Map;
import java.util.HashMap;
import java.util.List;
import java.util.ArrayList;
import java.util.Iterator;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.commons.lang.StringUtils;public class CommonFriendsReducer extends Reducer<Text, Text, Text, Text> {@Overridepublic void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {System.out.println("key=" + key);Map<String, Integer> map = new HashMap<String, Integer>();Iterator<Text> iterator = values.iterator();int numOfValues = 0;while (iterator.hasNext()) {String friends = iterator.next().toString();System.out.println("friends =" + friends);if (friends.equals("")) {context.write(key, new Text("[]"));return;}addFriends(map, friends);numOfValues++;}// now iterate the map to see how many have numOfValuesList<String> commonFriends = new ArrayList<String>();for (Map.Entry<String, Integer> entry : map.entrySet()) {//System.out.println(entry.getKey() + "/" + entry.getValue());if (entry.getValue() == numOfValues) {commonFriends.add(entry.getKey());}}// sen it to outputcontext.write(key, new Text(commonFriends.toString()));}static void addFriends(Map<String, Integer> map, String friendsList) {String[] friends = StringUtils.split(friendsList, ",");for (String friend : friends) {Integer count = map.get(friend);if (count == null) {map.put(friend, 1);} else {map.put(friend, ++count);}}}}
package commonfriends;import org.apache.log4j.Logger;import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;public class CommonFriendsDriver extends Configured implements Tool {private static final Logger theLogger = Logger.getLogger(CommonFriendsDriver.class);@Overridepublic int run(String[] args) throws Exception {Job job = new Job(getConf());job.setJobName("CommonFriendsDriver");// add jars to distributed cache//HadoopUtil.addJarsToDistributedCache(job, "/lib/");job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);job.setOutputKeyClass(Text.class);            // mapper will generate key as Text (the keys are as (person1,person2))job.setOutputValueClass(Text.class);     // mapper will generate value as Text (list of friends)    job.setMapperClass(CommonFriendsMapper.class);job.setReducerClass(CommonFriendsReducer.class);// args[0] = input directory// args[1] = output directoryFileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));boolean status = job.waitForCompletion(true);theLogger.info("run(): status=" + status);return status ? 0 : 1;}/*** The main driver for word count map/reduce program. Invoke this method to submit the map/reduce job.** @throws Exception When there is communication problems with the job tracker.*/public static void main(String[] args) throws Exception {args = new String[2];args[0] = "input/friends.txt";args[1] = "output/friends1";// Make sure there are exactly 2 parametersif (args.length != 2) {throw new IllegalArgumentException("usage: Argument 1: input dir, Argument 2: output dir");}theLogger.info("inputDir=" + args[0]);theLogger.info("outputDir=" + args[1]);int jobStatus = submitJob(args);theLogger.info("jobStatus=" + jobStatus);System.exit(jobStatus);}/*** The main driver for word count map/reduce program. Invoke this method to submit the map/reduce job.** @throws Exception When there is communication problems with the job tracker.*/public static int submitJob(String[] args) throws Exception {int jobStatus = ToolRunner.run(new CommonFriendsDriver(), args);return jobStatus;}
}


实现2:生成类似([100,200],[200 300 400 500 600])的键值对时使用ArrayListOfLongsWritable保存[200 300 400 500 600]

package commonfriends;import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.commons.lang.StringUtils;
import edu.umd.cloud9.io.array.ArrayListOfLongsWritable;public class CommonFriendsMapperUsingListextends Mapper<LongWritable, Text, Text, ArrayListOfLongsWritable> {private static final Text REDUCER_KEY = new Text();static ArrayListOfLongsWritable getFriends(String[] tokens) {if (tokens.length == 2) {return new ArrayListOfLongsWritable();}ArrayListOfLongsWritable list = new ArrayListOfLongsWritable();for (int i = 1; i < tokens.length; i++) {list.add(Long.parseLong(tokens[i]));}return list;}static String buildSortedKey(String person, String friend) {long p = Long.parseLong(person);long f = Long.parseLong(friend);if (p < f) {return person + "," + friend;} else {return friend + "," + person;}}@Overridepublic void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {// parse input, delimiter is a single spaceString[] tokens = StringUtils.split(value.toString(), " ");// create reducer valueArrayListOfLongsWritable friends = getFriends(tokens);String person = tokens[0];for (int i = 1; i < tokens.length; i++) {String friend = tokens[i];String reducerKeyAsString = buildSortedKey(person, friend);REDUCER_KEY.set(reducerKeyAsString);context.write(REDUCER_KEY, friends);}}}
package commonfriends;import java.util.Map;
import java.util.HashMap;
import java.util.List;
import java.util.ArrayList;
import java.util.Iterator;
import java.io.IOException;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import edu.umd.cloud9.io.array.ArrayListOfLongsWritable;public class CommonFriendsReducerUsingListextends Reducer<Text, ArrayListOfLongsWritable, Text, Text> {@Overridepublic void reduce(Text key, Iterable<ArrayListOfLongsWritable> values, Context context)throws IOException, InterruptedException {// map<k, v> where k is userID, and v is the countMap<Long, Integer> map = new HashMap<Long, Integer>();Iterator<ArrayListOfLongsWritable> iterator = values.iterator();int numOfValues = 0;while (iterator.hasNext()) {ArrayListOfLongsWritable friends = iterator.next();if (friends == null) {context.write(key, null);return;}addFriends(map, friends);numOfValues++;}// now iterate the map to see how many have numOfValuesList<Long> commonFriends = new ArrayList<Long>();for (Map.Entry<Long, Integer> entry : map.entrySet()) {//System.out.println(entry.getKey() + "/" + entry.getValue());if (entry.getValue() == numOfValues) {commonFriends.add(entry.getKey());}}// sen it to outputcontext.write(key, new Text(commonFriends.toString()));}static void addFriends(Map<Long, Integer> map, ArrayListOfLongsWritable friendsList) {Iterator<Long> iterator = friendsList.iterator();while (iterator.hasNext()) {long id = iterator.next();Integer count = map.get(id);if (count == null) {map.put(id, 1);} else {map.put(id, ++count);}}}}
package commonfriends;import org.apache.log4j.Logger;import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import edu.umd.cloud9.io.array.ArrayListOfLongsWritable;public class CommonFriendsDriverUsingList  extends Configured implements Tool {private static Logger theLogger = Logger.getLogger(CommonFriendsDriverUsingList.class);public int run(String[] args) throws Exception {Job job = new Job(getConf());job.setJobName("CommonFriendsDriverUsingList");// add jars to distributed cache//HadoopUtil.addJarsToDistributedCache(job, "/lib/");job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class);// mapper will generate key as Text (the keys are as (person1,person2))job.setOutputKeyClass(Text.class);// mapper will generate value as ArrayListOfLongsWritable (list of friends)        job.setOutputValueClass(ArrayListOfLongsWritable.class);     job.setMapperClass(CommonFriendsMapperUsingList.class);job.setReducerClass(CommonFriendsReducerUsingList.class);// args[0] = input directory// args[1] = output directoryFileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));boolean status = job.waitForCompletion(true);theLogger.info("run(): status="+status);return status ? 0 : 1;}/*** The main driver for word count map/reduce program.* Invoke this method to submit the map/reduce job.* @throws Exception When there is communication problems with the job tracker.*/public static void main(String[] args) throws Exception {// Make sure there are exactly 2 parametersif (args.length != 2) {throw new IllegalArgumentException("usage: Argument 1: input dir, Argument 2: output dir");}theLogger.info("inputDir="+args[0]);theLogger.info("outputDir="+args[1]);int jobStatus = submitJob(args);theLogger.info("jobStatus="+jobStatus);    System.exit(jobStatus);}/*** The main driver for word count map/reduce program.* Invoke this method to submit the map/reduce job.* @throws Exception When there is communication problems with the job tracker.*/public static int submitJob(String[] args) throws Exception {int jobStatus = ToolRunner.run(new CommonFriendsDriverUsingList(), args);return jobStatus;}
}
结果:
100,200    [300, 400]
100,300    [200, 400, 500]
100,400    [200, 300]
100,500    [300]
100,600    []
200,300    [100, 400]
200,400    [100, 300]
300,400    [100, 200]
300,500    [100]
												

Hadoop/MapReduce 共同好友解决方案:求大量集合的两两交集相关推荐

  1. MapReduce案例4——求两两共同好友

    题目如下: A:B,C,D,F,E,O B:A,C,E,K C:F,A,D,I D:A,E,F,L E:B,C,D,M,L F:A,B,C,D,E,O,M G:A,C,D,E,F H:A,C,D,E, ...

  2. MapReduce—案例(五)求两两共同好友

    题目: A:B,C,D,F,E,O B:A,C,E,K C:F,A,D,I D:A,E,F,L E:B,C,D,M,L F:A,B,C,D,E,O,M G:A,C,D,E,F H:A,C,D,E,O ...

  3. HADOOP MapReduce 处理 Spark 抽取的 Hive 数据【解决方案一】

    开端: 今天咱先说问题,经过几天测试题的练习,我们有从某题库中找到了新题型,并且成功把我们干趴下,昨天今天就干了一件事,站起来. 沙问题? java mapeduce 清洗 hive 中的数据 ,清晰 ...

  4. Hadoop MapReduce 保姆级吐血宝典,学习与面试必读此文!

    Hadoop 涉及的知识点如下图所示,本文将逐一讲解: 本文档参考了关于 Hadoop 的官网及其他众多资料整理而成,为了整洁的排版及舒适的阅读,对于模糊不清晰的图片及黑白图片进行重新绘制成了高清彩图 ...

  5. 又双叒叕来更新啦!Hadoop———MapReduce篇

    文章目录 MapReduce(计算) MapReduce概述 MapReduce定义 MapReduce的优缺点 核心思想 MapReduce计算程序运行时的相关进程 官方WordCount源码 Ma ...

  6. Hadoop MapReduce的模式、算法和用例

    本文英文原文发表于知名技术博客<Highly Scalable Blog>,由@juliashine 进行翻译投稿.感谢译者的共享精神! 译者介绍:Juliashine是多年抓娃工程师,现 ...

  7. Hadoop MapReduce的一些相关代码Code

    MapReduce是一种分布式计算模型(distributed programming model),由Google于2004年左右提出,主要用于搜索领域,解决海量数据的计算问题. MapReduce ...

  8. Hadoop MapReduce作业的基本构成要素

    Table of Contents MapReduce模型 我的第一个MapReduce应用程序的架构 Hadoop的核心MapReduce MapReduce任务的构成要素 MapReduceInt ...

  9. hadoop入门6:hadoop查询两两之间有共同好友,及他俩的共同好友都是谁

    A:B,C,D,F,E,O B:A,C,E,K C:F,A,D,I D:A,E,F,L E:B,C,D,M,L F:A,B,C,D,E,O,M G:A,C,D,E,F H:A,C,D,E,O I:A, ...

最新文章

  1. 2022-2028年中国碲化镉薄膜太阳能电池行业发展现状分析及投资前景趋势报告
  2. php聊天程序后端,php写后端运作程序总结
  3. keras 香草编码器_完善纯香草javascript中的拖放
  4. AD 画图 镜像翻转元器件
  5. Windows7虚拟化体验之一:Windows7部署
  6. ANSYS——后处理中单元表(ELEMENT table)的作用、创建、使用
  7. 特意向大家推荐.NET技术圈一些优秀开发者的公众号
  8. 没有检测到磁盘 请关闭计算机并至少,【基本计算机问题】计算机不是遇到非常严重的问题,请看这里解答...
  9. UI设计|搭配色彩素材专辑,轻松掌握要点
  10. 使用entityframework操作sqlite数据库
  11. 做游戏,学编程(C语言) 7 学习EasyX图形交互功能----flappy bird源代码
  12. OpenGL基础34:帧缓冲(中)之附件
  13. MyEclipse部署,将一个项目引入到两个项目;多项目部署
  14. 【调度问题】基于遗传算法求解公交排班系统matlab源码
  15. 计算机图形学——计算机图形系统及硬件基础
  16. 图书条形码跟ISBN号互相转换的类
  17. ArcGIS jsAPI 本地部署字体符号乱码
  18. 电脑误删的文件怎么恢复?分享90%的人都会的这2招
  19. 前端程序员应该去哪个城市发展?
  20. 收集的英语资源(不断更新)

热门文章

  1. 通过人工智能实现内容智能审核及在世界杯的实战
  2. 【漆学军】分享一个自定义K线的指标的源码
  3. 《 ERP高级计划》书的解读之二APS算法分析之单一:内点方法(蔡颖)(转)
  4. RuntimeError: Couldn‘t resolve requests
  5. pandas 数据合并 pd.join() pd.merge() pd.crosstab() pd.concat()
  6. xunsearch使用流程
  7. 读《诗经·邶风·击鼓》有感-间歇博客
  8. 信用卡降额冻结封卡,如何摆脱银行风控?
  9. 如何统计各个分发平台的下载数据
  10. 食品加工企业自营商城小程序开发,帮助企业增加销售渠道,提高销量