案例:文章推荐

论坛进入文章页面后,显示一个推荐列表:看过这篇文章的人还看过哪些文章,包含列为文章article、点击数count。

可能有很好很简单的解决办法,但是到最后再讲。

传统的方法是:建一张表,字段有article和user。每点击一次,增加一条记录。一个大论坛几天之内记录数就能达到千万条。而没有必要建索引,其他优化的办法,我还想不到,这样的查询别提多慢了。

传统数据库解决不了,那么分布式就该上场了。如果功能特别简单,完全可以不去使用MAPREDUCE和Hbase,自己动手搞一个吧。

这里最简单的实现:数据保存在txt文件,用Java IO读写,for循环扫描全表进行筛选,现成的Collections排序。

sql:

[sql]  view plain copy
  1. SELECT T1.ARTICLE,COUNT(*) C
  2. FROM ATB2 T1 INNER JOIN (SELECT T.USER FROM ATB2 T WHERE T.ARTICLE=888) T2
  3. WHERE T1.USER=T2.USER
  4. AND T1.ARTICLE!=888
  5. GROUP BY T1.ARTICLE
  6. ORDER BY C DESC
  7. LIMIT 10;

先查看过文章的用户列表,再查这些用户看过的文章列表,聚合,排序

[java]  view plain copy
  1. package com.src.reader;
  2. import java.io.BufferedReader;
  3. import java.io.File;
  4. import java.io.FileReader;
  5. import java.util.ArrayList;
  6. import java.util.Collections;
  7. import java.util.Comparator;
  8. import java.util.HashMap;
  9. import java.util.HashSet;
  10. import java.util.Iterator;
  11. import java.util.List;
  12. import java.util.Map;
  13. import java.util.Map.Entry;
  14. import java.util.Set;
  15. import com.src.entity.ATB;
  16. public class DataReader {
  17. public static void main(String[] args) throws Exception{
  18. long start = System.currentTimeMillis();
  19. select(888);
  20. long end = System.currentTimeMillis();
  21. System.out.println("Select cost time: "+(end-start)/1000.0+" seconds.");
  22. }
  23. public static void select(int article) throws Exception{
  24. //读文件到字符串
  25. File file = new File("d://b.txt");
  26. String str = reader1(file);
  27. //字符串切割为数组
  28. ATB[] all = converStr2Array1(str);
  29. System.out.println("数组长度为"+all.length);
  30. //查看过文章的用户列表,去重
  31. Set<Integer> users = getUsersByArticle(article, all);
  32. //遍历用户列表,查每个用户看过的文章,去掉参数文章(:先遍历全表再遍历用户列表)
  33. List<Integer> articles = getArticlesByUsers(all, users, article);
  34. //以文章分类,查每篇文章的总数
  35. Map<Integer,Integer> map = groupBy(articles);
  36. //排序
  37. //      List<String> result = orderAll(map);
  38. List<String> result = limitAndOrder(map, 10);
  39. //      System.out.println(result);
  40. }
  41. public static String reader1(File file) throws Exception{
  42. long start = System.currentTimeMillis();
  43. BufferedReader br = new BufferedReader(new FileReader(file));
  44. StringBuffer sb = new StringBuffer();
  45. while(br.ready()){
  46. sb.append(br.readLine());
  47. }
  48. br.close();
  49. long end = System.currentTimeMillis();
  50. System.out.println("读文件完成,用时"+(end-start)/1000.0+"秒。");
  51. return sb.toString();
  52. }
  53. public static ATB[] converStr2Array1(String str){
  54. long start = System.currentTimeMillis();
  55. String[] arr = str.split(";");
  56. System.out.println("字符串切割用时"+(System.currentTimeMillis()-start)/1000.0+"秒。");
  57. ATB[] all = new ATB[arr.length];
  58. for(int i=0;i<arr.length;i++){
  59. int article = Integer.parseInt(arr[i].split(",")[0]);
  60. int user = Integer.parseInt(arr[i].split(",")[1]);
  61. all[i] = new ATB(article,user);
  62. }
  63. long end = System.currentTimeMillis();
  64. System.out.println("字符串转换为数组完成,用时"+(end-start)/1000.0+"秒。");
  65. return all;
  66. }
  67. public static Set<Integer> getUsersByArticle(int article,ATB[] all){
  68. long start = System.currentTimeMillis();
  69. Set<Integer> set = new HashSet<Integer>();
  70. for(ATB a:all){
  71. if(a.getArticle()==article){
  72. set.add(a.getUser());
  73. }
  74. }
  75. long end = System.currentTimeMillis();
  76. System.out.println("查询user列表完成,用时"+(end-start)/1000.0+"秒。");
  77. return set;
  78. }
  79. public static List<Integer> getArticlesByUsers(ATB[] all,Set<Integer> users,int article){
  80. long start = System.currentTimeMillis();
  81. List<Integer> list = new ArrayList<Integer>();
  82. for(ATB a:all){
  83. if(article!=a.getArticle()&&users.contains(a.getUser())){
  84. list.add(a.getArticle());
  85. }
  86. }
  87. long end = System.currentTimeMillis();
  88. System.out.println("由user列表查询article列表完成,用时"+(end-start)/1000.0+"秒。");
  89. return list;
  90. }
  91. public static Map<Integer, Integer> groupBy(List<Integer> list){
  92. long start = System.currentTimeMillis();
  93. Map<Integer,Integer> map = new HashMap<Integer, Integer>();
  94. for(Integer i:list){
  95. if(map.containsKey(i)){
  96. map.put(i, map.get(i)+1);
  97. }else{
  98. map.put(i, 1);
  99. }
  100. }
  101. long end = System.currentTimeMillis();
  102. System.out.println("group 完成,用时"+(end-start)/1000.0+"秒。");
  103. return map;
  104. }
  105. public static List<String> limitAndOrder(Map<Integer,Integer> map,int limit){
  106. //排序办法:把value排序,取限制条数,去重,遍历map,由value取key
  107. long start = System.currentTimeMillis();
  108. List<String> result = new ArrayList<String>();
  109. List<Integer> values = new ArrayList<Integer>(map.values());
  110. Collections.sort(values,new Comparator<Integer>() {
  111. public int compare(Integer i,Integer j){
  112. return (j - i);
  113. }
  114. });
  115. long end = System.currentTimeMillis();
  116. System.out.println("value排序完成,用时"+(end-start)/1000.0+"秒。");
  117. values = values.subList(0, limit);
  118. //去重省略
  119. Iterator<Entry<Integer, Integer>> itr = map.entrySet().iterator();
  120. while(itr.hasNext()){
  121. Map.Entry<Integer, Integer> entry = (Entry<Integer, Integer>) itr.next();
  122. int article = entry.getKey();
  123. int count = entry.getValue();
  124. if(values.contains(count)){
  125. String str = leftFillWith0(String.valueOf(count)) + "," + String.valueOf(article);
  126. result.add(str);
  127. //由value查到的key可能有多个,一种办法是在添加前判断到达长度限制时删除result列表中count最小的行
  128. //或者再次排序和取限
  129. }
  130. }
  131. //再次排序和取限
  132. Collections.sort(result, new Comparator<String>() {
  133. public int compare(String str1,String str2){
  134. return - str1.compareTo(str2);
  135. }
  136. });
  137. result = result.subList(0, limit);
  138. long end2 = System.currentTimeMillis();
  139. System.out.println("排序和取限完成,总共用时"+(end2-start)/1000.0+"秒。");
  140. return result;
  141. }
  142. public static List<String> orderAll(Map<Integer,Integer> map){
  143. //排序办法分两种:1把value排序,由value取key;2重组字符串
  144. long start = System.currentTimeMillis();
  145. List<String> result = new ArrayList<String>();
  146. Iterator<Entry<Integer, Integer>> itr = map.entrySet().iterator();
  147. while(itr.hasNext()){
  148. Map.Entry<Integer, Integer> entry = (Entry<Integer, Integer>) itr.next();
  149. int article = entry.getKey();
  150. int count = entry.getValue();
  151. String str = leftFillWith0(String.valueOf(count)) + "," + String.valueOf(article);
  152. result.add(str);
  153. }
  154. Collections.sort(result, new Comparator<String>() {
  155. public int compare(String str1,String str2){
  156. return - str1.compareTo(str2);
  157. }
  158. });
  159. long end = System.currentTimeMillis();
  160. System.out.println("排序完成,用时"+(end-start)/1000.0+"秒。");
  161. return result;
  162. }
  163. public static String leftFillWith0(String str){
  164. int length = 8;
  165. String s = "";
  166. for(int i=0;i<length-str.length();i++){
  167. s = s + "0";
  168. }
  169. return s + str;
  170. }
  171. }
  172. //读文件完成,用时0.253秒。
  173. //字符串转换为数组完成,用时4.054秒。
  174. //数组长度为543243
  175. //查询user列表完成,用时0.0080秒。
  176. //由user列表查询article列表完成,用时0.085秒。
  177. //group 完成,用时0.04秒。
  178. //排序完成,用时0.069秒。
  179. //Select cost time: 4.526 seconds.
  180. //读文件完成,用时0.25秒。
  181. //字符串切割用时0.72秒。
  182. //字符串转换为数组完成,用时4.634秒。
  183. //数组长度为543243
  184. //查询user列表完成,用时0.0090秒。
  185. //由user列表查询article列表完成,用时0.096秒。
  186. //group 完成,用时0.036秒。
  187. //value排序完成,用时0.034秒。
  188. //排序和取限完成,总共用时0.064秒。
  189. //Select cost time: 5.113 seconds.

这段代码很多地方可以优化。

现在用十台主机作为分布式节点NODE,每台开启一个hessian服务器,提供一个处理数据的接口。一台主项目MASTER中调用这十台NODE。可以开10个线程去调用。

假如5000万条记录,新增记录时平均分布到每个节点,这样每台主机有500万数据。然后保存在100个文本文件,每个文件就是5万条记录。

然后再同时开100个甚至更多线程,同时处理这100个文件,把CPU撑到爆。

对于现在这个案例来说,分布式的处理过程是这样的:

MASTER通过hessian发起请求,只有一个参数article,每个节点接下来做的事情一样,最终要得到一个列表,如

[plain]  view plain copy
  1. [00000020,9980, 00000020,9731, 00000020,8783, 00000020,8374, 00000018,9908, 00000018,9391, 00000018,8728, 00000018,8725, 00000017,9789, 00000017,9511]

补0是为方便排序,左边是count右边是article。

这些节点得到的列表可能存在重复,如9980在节点1里面查出来点击了20次,在节点2里面查到点击了19次,这样所以要在MASTER做一个汇总,话说回来,前面一步是MAP,这一步就是REDUCE。

汇总的过程先是merge,得到以article为key,count为value的一个HashMap,然后是排序order by,然后分页。

merge和排序的开销可能又会很大,那还是老办法,再想办法分发到各个节点去做。其中排序我想到的方法,同时做分页的话比较容易,比如取点击量最大的100条,那在每个节点先做排序,取前100条返回到MASTER,然后MASTER给这1000条排序。如果查100-200条,在节点里面全表排序取前面200条,MASTER要排序的有2000条。依次下去假如每个节点总共查出10000条记录,分页在4900-5000的话,每个节点返回给MASTER有5000行,(查9000-10000行可以倒序排列只返回100行),所以这样下去还不是个完美好办法。

无所谓,再开线程,加节点就是了。

一来,在节点之中查最大100条,可以分给多个线程或者节点去做,意思是把10000条记录分成几段,查出每一段的前100条,然后汇总。

二来,在10个节点查出各自的100条之后,不会由MASTER全部处理,而是分成5份每份200条发送到五个节点分别去前100个,然后剩下500条数据,如果数据量大就再加节点。

---------------------------------------------------------------------------------------------------------------------------------------------

(三,查最中间100条的时候,能运用分布式的办法,是先查出之前的所有数据,比如用一个线程查第一个100条,第二个线程查第二个100条,全部查出,最后减去这些数据,剩下就不多了。这个方法确实是分布式,但是笨到家了。

四,全表排序如果要运用分布式,还是可以用上面的方法,100条100条的查出来,拼一下。)

---------------------------------------------------------------------------------------------------------------------------------------------

这样行不通,后来才发现其实分布式排序很简单。

比如MASTER有1000个数字,根据数字大小,分到10个节点,第一个节点保存0-100,依次101-200。然后每个节点查出来可以直接合并,这才是达到分布式的效果。

而首先还要做一个数据分布采样,以保证每个节点分到的数据量平均。采样的过程,也很容易分布化。

扩展:

如果节点数据保存在MySQL而不是文本文件上面,貌似更加方便的很。

节点可以使用内存保存管理数据。

数据异常与备份。异常的处理。

最后,这个案例还有一个办法,数据表设置两个字段,article为唯一主键,第二个列记录所有user的点击数。如果嫌这个字符串太大,那就放到文件里用Java IO读吧。

这样article和user都是唯一的。可以建索引。

如果用java做,那就保存在一个HashMap,article作为key,value也是一个HashMap,记录user和count。

这种实现,估计是最理想的。

所以,能用数据库和java做好的,就不要搞分布式了。尽量还是要用传统的方法。

java分布式简单实现相关推荐

  1. java分布式锁解决方案 redisson or ZooKeeper

    redis 分布式锁 Redisson 是 redis 官方推荐的Java分布式锁第三方框架. 高效分布式锁 当我们在设计分布式锁的时候,我们应该考虑分布式锁至少要满足的一些条件,同时考虑如何高效的设 ...

  2. 5个强大的Java分布式缓存框架推荐

    2019独角兽企业重金招聘Python工程师标准>>> 在开发中大型Java软件项目时,很多Java架构师都会遇到数据库读写瓶颈,如果你在系统架构时并没有将缓存策略考虑进去,或者并没 ...

  3. 5个强大的Java分布式缓存框架

    5个强大的Java分布式缓存框架 在开发中大型Java软件项目时,很多Java架构师都会遇到数据库读写瓶颈,如果你在系统架构时并没有将缓存策略考虑进去,或者并没有选择更优的缓存策略,那么到时候重构起来 ...

  4. java分布式对象——远程方法中的参数和返回值+远程对象激活

    [0]README 1)本文文字描述转自 core java volume 2, 旨在学习 java分布式对象--远程方法中的参数和返回值+远程对象激活 的相关知识: [1]远程方法中的参数和返回值 ...

  5. java分布式对象(RMI+部署使用RMI的程序)

    [0]README 1)本文文字转自 core java volume 2, 旨在学习 java 分布式对象的相关知识: 2) RMI 的实例程序为原创: 3) RMI部署步骤的测试用例,参见 htt ...

  6. Java分布式篇6——RabbitMQ

    Java分布式篇6--RabbitMQ 1.MQ(Message Queue)消息队列 消息队列中间件,是分布式系统中的重要组件 主要解决,异步处理,应用解耦,流量削峰等问题 实现高性能,高可用,可伸 ...

  7. Java分布式篇4——Redis

    Java分布式篇4--Redis 1.互联网架构的演变历程 1.1.第一阶段 数据访问量不大,简单的架构即可搞定! 1.2.第二阶段 数据访问量大,使用缓存技术来缓解数据库的压力 不同的业务访问不同的 ...

  8. JAVA分布式篇3——Dubbo

    JAVA分布式篇3--Dubbo 1.架构演变 1.1.单应用架构 当网站流量很小时,只需要一个应用,将所有的功能部署到一起(所有业务都放在一个tomcat 里),从而减少部署节点和成本 用于简化 增 ...

  9. java中如何合并两个网格,Hazelcast: Java分布式内存网格框架(平台)

    转自:http://blog.csdn.net/iihero/article/details/7385641 下边是它的宣传内容: hazelcast是一个开放源码集群和高度可扩展的数据分发平台,这是 ...

最新文章

  1. java exchange发邮件_java发送exchange邮件问题
  2. SQL SERVER的统计信息
  3. RTX 2080时代,如何打造属于自己的深度学习机器
  4. 过渡效果_12个酷炫创意的动画过渡效果AE模板
  5. 如何使用代码区分service contract和service contract quotation
  6. 统一社会信用代码 正则验证
  7. 近期找工作无果的一些感想
  8. linux下C语言简单实现线程池
  9. 【C++】CCFCSP201803-1跳一跳
  10. Android 关于fragment切换重新加载的解决分享给大家
  11. matlab 特征选择算法,特征选择、特征提取matlab算法实现(模式识别)
  12. JAVA编写俄罗斯方块
  13. 单片机定时器TMOD与TCON详解!
  14. 冰点还原无法修改计算机时间,系统还原后无法更改系统时间?这个方法必须会...
  15. CodeSmith介绍
  16. 基于线性回归房价预测散点图和折线图
  17. vmware虚拟机网络设置详解
  18. 丹麦皇家音乐学院大师班-本特历钢琴
  19. 本人GitHub网址https://github.com/LH34128
  20. 确定十二星座的日期范围

热门文章

  1. WiFi模块(ESP8266)获取时间、天气API AT指令串口调试
  2. 7.深入浅出:互补输出级——参考《模拟电子技术基础》清华大学华成英主讲
  3. The ADB binary found at XX is obsolete and has seriousperformance problems with the Android Emulator
  4. 完善跨境金融区块链服务平台,支持区域开放创新和特殊区域建设
  5. Three.js实现分区Bloom辉光效果
  6. 我们年轻时,为什么要辛苦赚钱,这是我听过的最好回答!
  7. 软件定义存储2.0,谁领风骚?
  8. 梦想照进现实:挣扎中的国产操作系统
  9. nii与nii.gz格式的关系
  10. CAD C#二次开发 圆和圆弧转Polyline线