spark kafka java api_java实现spark streaming与kafka集成进行流式计算
java实现spark streaming与kafka集成进行流式计算
2017/6/26补充:接手了搜索系统,这半年有了很多新的心得,懒改这篇粗鄙之文,大家看综合看这篇新博文来理解下面的粗鄙代码吧,http://blog.csdn.net/yujishi2/article/details/73849237。
背景:网上关于spark streaming的文章还是比较多的,可是大多数用scala实现,因我们的电商实时推荐项目以java为主,就踩了些坑,写了java版的实现,代码比较意识流,轻喷,欢迎讨论。
流程:spark streaming从kafka读用户实时点击数据,过滤数据后从redis读商品相似度矩阵,从db读user历史行为,实时计算兴趣度,并将结果写入redis一份,供api层读取展示,写入hdfs一份供离线计算准确率召回率。
补充:据了解,大型实时推荐系统里面,协同过滤一般用作生成候选集,计算兴趣读会被ctr等策略的 rerank代替,在calculateinterest中调用在线rerank服务排序。
12/13补充:召回不变,目前采用ctr预估加上规则排序,后续上ltr。
废话少说,上代码:
public class Main {
static final String ZK_QUORUM = "*.*.*.*:2181,*.*.*.*:2181,*.*.*.*:2181/kafka";
static final String GROUP = "test-consumer-group";
static final String TOPICSS = "user_trace";
static final String NUM_THREAD = "64";
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("main.java.computingCenter");
// Create the context with 2 seconds batch size
//每两秒读取一次kafka
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
int numThreads = Integer.parseInt(NUM_THREAD);
Map topicMap = new HashMap();
String[] topics = TOPICSS.split(",");
for (String topic: topics) {
topicMap.put(topic, numThreads);
}
JavaPairReceiverInputDStream messages =
KafkaUtils.createStream(jssc, ZK_QUORUM, GROUP, topicMap);
JavaDStream lines = messages.map(new Function, String>() {
public String call(Tuple2 tuple2) {
return tuple2._2();
}
});
JavaDStream words = lines.flatMap(new FlatMapFunction() {
public Iterable call(String lines) {
//kafka数据格式:"{\"Topic\":\"user_trace\",\"PartitionKey\":\"0\",\"TimeStamp\":1471524044018,\"Data\":\"0=163670589171371918%3A196846178238302087\",\"LogId\":\"0\",\"ContentType\":\"application/x-www-form-urlencoded\"}";
List arr = new ArrayList();
for (String s : lines.split(" ")) {
Map j = JSON.parseObject(s);
String s1 = "";
String s2 = "";
try {
s1 = URLDecoder.decode(j.get("Data").toString(), "UTF-8");
s2 = s1.split("=")[1];
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
arr.add(s2);
}
return arr;
}
});
JavaPairDStream goodsSimilarityLists = words.filter(new Function() {
@Override
public Boolean call(String s) throws Exception {
//过滤非法的数据
if (s.split(":").length == 2) {
return true;
}
return false;
}
}).mapPartitionsToPair(new PairFlatMapFunction, String, String>() {
//此处分partition对每个pair进行处理
@Override
public Iterable> call(Iterator s) throws Exception {
ArrayList> result = new ArrayList>();
while (s.hasNext()) {
String x = s.next();
String userId = x.split(":")[0];
String goodsId = x.split(":")[1];
System.out.println(x);
LinkedHashMap recommendMap = null;
try {
//此service从redis读数据,进行实时兴趣度计算,推荐结果写入redis,供api层使用
CalculateInterestService calculateInterestService = new CalculateInterestService();
try {
recommendMap = calculateInterestService.calculateInterest(userId, goodsId);
} catch (Exception e) {
e.printStackTrace();
}
String text = "";
int count = 0;
for (Map.Entry entry : recommendMap.entrySet()) {
text = text + entry.getKey();
if (count == recommendMap.size() - 1) {
break;
}
count = count + 1;
text = text + "{/c}";
}
text = System.currentTimeMillis() + ":" + text;
result.add(new Tuple2(userId, text));
} catch (Exception e) {
e.printStackTrace();
}
}
return result;
}
});
goodsSimilarityLists.foreachRDD(new Function, Void>() {
@Override
public Void call(JavaPairRDD rdd) throws Exception {
//打印rdd,调试方便
System.out.println(rdd.collect());
return null;
}
});
JavaPairDStream goodsSimilarityListsText = goodsSimilarityLists.mapToPair(new PairFunction, Text, Text>(){
@Override
public Tuple2 call(Tuple2 ori) throws Exception {
//此处要将tuple2转化为org.apache.hadoop.io.Text格式,使用saveAsHadoopFiles方法写入hdfs
return new Tuple2(new Text(ori._1), new Text(ori._2));
}
});
//写入hdfs
goodsSimilarityListsText.saveAsHadoopFiles("/user/hadoop/recommend_list/rl", "123", Text.class, Text.class, SequenceFileOutputFormat.class);
jssc.start();
jssc.awaitTermination();
}
}
public class CalculateInterestService {
private String dictKey = "greate_item_sim_2.0";
private String recommendTable = "great_recommend_table_2.0";
static final String HIGO_BASE_URL = "jdbc:mysql://*.*.*.*:3212/*";
static final String HIGO_BASE_USER = "*";
static final String HIGO_BASE_PASS = "*";
public LinkedHashMap calculateInterest(String userId, String traceGoodsId) {
LinkedHashMap sortedMap = new LinkedHashMap();
String[] simGoods = RedisHelper.getInstance().hget(dictKey, traceGoodsId).split(",");
//用户的历史记录,应该存action:goodsId:timestamp格式,要重构,bi写入单独的数据表中
HashMap userTrace = null;
try {
userTrace = getUserTrace(userId);
} catch (ClassNotFoundException e) {
e.printStackTrace();
return sortedMap;
}
HashMap recommendMap = new HashMap();
String[] simGoodsIds = new String[simGoods.length];
for (int i = 0; i < simGoods.length; i++) {
simGoodsIds[i] = simGoods[i].split(":")[0];
}
List pSimGoodsIds = RedisHelper.getInstance().hmget(dictKey, simGoodsIds);
HashMap predictSimGoodsIds = new HashMap();
for (int i = 0; i < simGoodsIds.length; i++) {
predictSimGoodsIds.put(Long.parseLong(simGoodsIds[i]), pSimGoodsIds.get(i));
}
for (String item : simGoods) {
//need optimised
Double totalSum = 0.0;
Double sum = 0.0;
Long originGoodsId = Long.parseLong(item.split(":")[0]);
for (String predictGoods : predictSimGoodsIds.get(originGoodsId).split(",")) {
Long goodsId = Long.parseLong(predictGoods.split(":")[0].toString());
Double sim = Double.valueOf(predictGoods.split(":")[1].toString());
totalSum = totalSum + sim;
Double score = 0.0;
if (!userTrace.containsKey(goodsId)) {
//TODO 用户评分矩阵过于稀疏,需要svd补充评分,暂时无评分score为默认0.1
userTrace.put(goodsId, "default");
}
String action = userTrace.get(goodsId);
if (action.equals("click")) {
score = 0.2;
} else if (action.equals("favorate")) {
} else if (action.equals("add_cart")) {
score = 0.6;
} else if (action.equals("order")) {
score = 0.8;
} else if (action.equals("default")) {
score = 0.1;
}
//相似度词典应存 goodsid:sim格式,要重构
sum = sum + score * sim;
}
Double predictResult = sum / totalSum;
recommendMap.put(originGoodsId, predictResult);
}
//sort recommend list
List> list = new ArrayList>(recommendMap.entrySet());
Collections.sort(list, new Comparator>() {
@Override
public int compare(Map.Entry o1, Map.Entry o2) {
return o2.getValue().compareTo(o1.getValue());
}
});
Map.Entry tmpEntry = null;
Iterator> iter = list.iterator();
while (iter.hasNext()) {
tmpEntry = iter.next();
sortedMap.put(tmpEntry.getKey(), tmpEntry.getValue());
}
writeRecommendListToRedis(userId, sortedMap);
return sortedMap;
}
private HashMap getUserTrace(String userId) throws ClassNotFoundException {
//SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
Class.forName("com.mysql.jdbc.Driver");
PreparedStatement stmt = null;
Connection conn = null;
UserTrace userTrace = new UserTrace();
try {
conn = DriverManager.getConnection(HIGO_BASE_URL, HIGO_BASE_USER, HIGO_BASE_PASS);
String sql = "select * from t_pandora_goods_record where account_id=" + userId;
stmt = (PreparedStatement)conn.prepareStatement(sql);
ResultSet rs = stmt.executeQuery();
while(rs.next()) {
userTrace.setId(Long.parseLong(rs.getString(1)));
userTrace.setAccountId(Long.parseLong(rs.getString(2)));
userTrace.setGoodsIds(rs.getString(3));
userTrace.setMtime(rs.getString(4));
}
stmt.close();
conn.close();
} catch (Exception e) {
e.printStackTrace();
}
String[] goodsActionTimestamp = userTrace.getGoodsIds().split(",");
HashMap hm = new HashMap();
for (String ac : goodsActionTimestamp) {
Long goodsId = Long.parseLong(ac.split(":")[0]);
//String action = ac.split(":")[1];
//String timestamp = ac.split(":")[2];
//hack 下一步要bi把用户历史行为写入表中, action:goodsId:timestamp格式, timestamp后期将参与权重计算
String action = "click";
hm.put(goodsId, action);
}
return hm;
}
private void writeRecommendListToRedis(String userId, LinkedHashMap sortedMap) {
String recommendList = "";
int count = 0;
for (Map.Entry entry : sortedMap.entrySet()) {
recommendList = recommendList + entry.getKey();
if (count == sortedMap.size() - 1) {
break;
}
count = count + 1;
recommendList = recommendList + ",";
}
RedisHelper.getInstance().hset(recommendTable, userId, recommendList);
}
}
spark kafka java api_java实现spark streaming与kafka集成进行流式计算相关推荐
- 大数据学习系列----基于Spark Streaming流式计算
2019独角兽企业重金招聘Python工程师标准>>> 个性化的需求 随着互联网知识信息指数级膨胀,个性化的需求对于用户来说越来越重要,通过推荐算法和用户点击行为的流式计算可以很简单 ...
- Spark Streaming 流式计算实战
这篇文章由一次平安夜的微信分享整理而来.在Stuq 做的分享,原文内容. 业务场景 这次分享会比较实战些.具体业务场景描述: 我们每分钟会有几百万条的日志进入系统,我们希望根据日志提取出时间以及用户名 ...
- 流式计算的代表:Storm、Flink、Spark Streaming
learn from 从0开始学大数据(极客时间) 文章目录 1. Storm 2. Spark Streaming 3. Flink 对存储在磁盘上的数据进行大规模计算处理,大数据批处理 对实时产生 ...
- 专访阿里云高级技术专家吴威:Kafka、Spark和Flink类支持流式计算的软件会越来越流行...
杭州·云栖大会将于2016年10月13-16日在云栖小镇举办,在这场标签为互联网.创新.创业的云计算盛宴上,众多行业精英都将在这几天里分享超过450个演讲主题. 为了帮助大家进一步了解这场全球前言技术 ...
- spark 流式计算_流式传输大数据:Storm,Spark和Samza
spark 流式计算 有许多分布式计算系统可以实时或近实时处理大数据. 本文将从对三个Apache框架的简短描述开始,并试图对它们之间的某些相似之处和不同之处提供一个快速的高级概述. 阿帕奇风暴 在风 ...
- Spark流式计算概念
流式计算 特点: 数据是⽆界的(unbounded) 数据是动态的 计算速度是⾮常快的 计算不⽌⼀次 计算不能终⽌ 相对应,离线计算特点: 数据是有界的(Bounded) 数据静态的 计算速度通常较慢 ...
- python 流式计算框架_流式计算的三种框架:Storm、Spark和Flink
我们知道,大数据的计算模式主要分为批量计算(batch computing).流式计算(stream computing).交互计算(interactive computing).图计算(graph ...
- 流式计算的三种框架:Storm、Spark和Flink
我们知道,大数据的计算模式主要分为批量计算(batch computing).流式计算(stream computing).交互计算(interactive computing).图计算(graph ...
- Kafka设计解析(七)- 流式计算的新贵 Kafka Stream
http://www.infoq.com/cn/articles/kafka-analysis-part-7 Kafka Stream背景 Kafka Stream是什么 Kafka Stream是A ...
最新文章
- 第六章-Hadoop优化与发展
- VS2010 MFC多文档中的工具栏CMFCToolBar停靠的问题
- 百度音乐 android,千千音乐(com.ting.mp3.android) - 8.2.3.4 - 应用 - 酷安
- 在Windows平台上安装Dubbox框架
- 数组的去重-----------------------来自大牛的讲解
- HDU1276 士兵队列训练问题【模拟+array+vector+list】
- 安装Windows版C / GCC编译器
- 区块链技术对大数据有哪些影响
- 九大内置对象及四个域对象的总结
- 用计算机和电视机组成家庭影院,请问家庭影院的音响能接在电脑上用么?我的电脑没有配置音响能用家庭 爱问知识人...
- 中国地质大学网络计算机考试试题,中国地质大学《计算机》考试题答案
- 使用拟合方法实现光敏电阻传感器数值与光照强度的近似转换
- MySQL第七章之后的
- 高通设备进入高通9008模式
- No silver bullet——没有银弹理论
- 斯坦福21秋季:实用机器学习-李沐课程笔记
- Python学习,用python制作字符版gif图
- 管理每日日程提醒以及待办清单的备忘便签有哪些
- 出入库管理系统php,php销售供应链管理系统
- SpringCloud快速上手
热门文章
- 免费直播:1 小时带你体验 Python 车牌识别实战
- “抗疫”新战术:世卫组织联合IBM、甲骨文、微软构建了一个开放数据的区块链项目!...
- 如何用Python画一只机器猫?| 原力计划
- 武汉新增职位数同比下降44.25%,这些企业却在猛招人,“来多少,要多少”
- 华为百度美团驰援抗击疫情;自由软件基金会建议开源 Windows 7;印度超越美国成第二大智能手机市场 | 极客头条...
- 召唤新一代超参调优开源新神器,集十大主流模块于一身
- 华为顶尖应届生最高年薪超 200 万;抖音服务器宕机;GitLab 12.1 发布 | 极客头条...
- 行,这本 Python 书彻底火了!
- 人工智能是 6G 诞生的关键!| 极客头条
- 亚马逊不仅将弃用 Oracle,还要抢 Java 饭碗!