一、需求3:每隔5秒统计最近1小时内广告的点击量---增量
二、实现步骤

### --- 实现步骤~~~     获取数据源(input)--- flume
~~~     转化
~~~     数据源的格式:area:uid:product_id:time:  样例类中AdClick;过滤操作filter product_id != null
~~~     .水印Watemark、.keyBy(productId) 、.timeWindow
~~~     .aggregate(MyAggFunc,MyWindowFunc)
~~~     MyAggFunc:编写计算逻辑的代码---- 累加广告的点击次数
~~~     MyWindowFunc:apply,将结果数据向下游传递

三、增量统计广告点击率:每隔5秒统计最近1小时内广告的点击量$增量统计
### --- MyAggFunc:编写计算逻辑的代码--累加广告的点击次数{"yanqi_event": [{"name": "goods_detail_loading","json": {"entry": "2","goodsid": "0","loading_time": "92","action": "3","staytime": "10","showtype": "0"},"time": 1595265099584},{"name": "notification","json": {"action": "1","type": "3"},"time": 1595341087663},{"name": "ad","json": {"duration": "10","ad_action": "0","shop_id": "23","event_type": "ad","ad_type": "1","show_style": "0","product_id": "36","place": "placecampaign2_left","sort": "1"},"time": 1595276738208}],"attr": {"area": "东莞","uid": "2F10092A0","app_v": "1.1.0","event_type": "common","device_id": "1FB872-9A1000","os_type": "1.1","channel": "广宣","language": "chinese","brand": "iphone-0"}
}

四、数据类型转换:
### --- ODS:kafka中eventlog中;DIM:无;DWD:event.log文件说明:不规范json格式,需要转换{"data": [{"id": "6","payMethod": "meituan","payName": "美团支付","description": "美团支付","payOrder": "0","online": "-1"}],"database": "dwshow","es": 1604461572000,"id": 6,"isDdl": false,"mysqlType": {"id": "int(11)","payMethod": "varchar(20)","payName": "varchar(255)","description": "varchar(255)","payOrder": "int(11)","online": "tinyint(4)"},"old": null,"pkNames": null,"sql": "","sqlType": {"id": 4,"payMethod": 12,"payName": 12,"description": 12,"payOrder": 4,"online": -6},"table": "yanqi_payments","ts": 1604461572297,"type": "INSERT"
}

{"yanqi_event": [{"name": "goods_detail_loading","json": {"entry": "2","goodsid": "0","loading_time": "92","action": "3","staytime": "10","showtype": "0"},"time": 1595265099584},{"name": "notification","json": {"action": "1","type": "3"},"time": 1595341087663},{"name": "ad","json": {"duration": "10","ad_action": "0","shop_id": "23","event_type": "ad","ad_type": "1","show_style": "0","product_id": "36","place": "placecampaign2_left","sort": "1"},"time": 1595276738208}],"attr": {"area": "东莞","uid": "2F10092A0","app_v": "1.1.0","event_type": "common","device_id": "1FB872-9A1000","os_type": "1.1","channel": "广宣","language": "chinese","brand": "iphone-0"}
}

### --- 转换代码//对Kafka中的JSON日志进行转换val mapEventStream: DataStream[AdClick] = eventLogStream.map(x => {val jsonObj: JSONObject = JSON.parseObject(x)val attr: String = jsonObj.get("attr").toStringval attrJson: JSONObject = JSON.parseObject(attr)val area: String = attrJson.get("area").toStringval uid: String = attrJson.get("uid").toString//[{"name":"praise","json":{"id":0,"type":4,"add_time":"1597851188753","userid":0,"target":8},// "time":1595329059805}]// 此处的时间戳是毫秒val eventData: String = jsonObj.get("yanqi_event").toStringval datas: JSONArray = JSON.parseArray(eventData)val list = new java.util.ArrayList[String]()datas.forEach(x => list.add(x.toString))var productId: String = nullvar timestamp: Long = 0Llist.forEach(x => {//{"name":"ad","json":{"duration":"10","ad_action":"0","shop_id":"23","event_type":"ad","ad_type":"1",//"show_style":"0","product_id":"36","place":"placecampaign2_left","sort":"1"},"time":1595276738208}val xJson: JSONObject = JSON.parseObject(x)if (xJson.get("name").toString.equals("ad")) {val jsonData: String = xJson.get("json").toStringval jsonDatas = JSON.parseObject(jsonData)productId = jsonDatas.get("product_id").toStringtimestamp = TimeUnit.MILLISECONDS.toSeconds(xJson.get("time").toString.toLong)}})AdClick(area, uid, productId, timestamp)
})

CC00047.bdpositions——|Hadoop实时数仓.V27|——|项目.v27|需求三:数据处理增量统计广告.V1|——|需求分析|相关推荐

  1. CC00043.bdpositions——|Hadoop实时数仓.V23|——|项目.v23|需求二:数据处理增量统计.V1|——|需求分析|

    一.需求2:每隔5分钟统计最近1小时内的订单交易情况,显示城市/省份/交易总金额/订单总数---增量统计 二.编程实现流程 ### --- 读取数据源(input)~~~ # input读取数据源: ...

  2. 原创|实时数仓实战项目-第三节(数仓治理)

    马上就要到国庆节了,提前祝大家国庆快乐,最近比较忙,考虑到粉丝一直要求我更新文章,我今天就加班更新一下文章. 实时数仓如何做数据治理 在做技术分享之前,我就尽量画图,少写字,最好能让大家看一天图,就能 ...

  3. 大数据Flink电商实时数仓实战项目流程全解(六)DWM层业务实现

    项目概要 之前我们已经通过动态分流把数据分到了我们想要的位置,为了方便后续内容的讲解方便,所以接下来我们可以把配置表的信息进行导入了,然后通过动态分流的方法,把数据发往对应的kafka主题或者是hba ...

  4. 实时数仓 大数据 Hadoop flink kafka

    ⼀.实时数仓建设背景 实时需求⽇趋迫切 ⽬前各⼤公司的产品需求和内部决策对于数据实时性的要求越来越迫切,需要实时数仓的能⼒来赋能.传统离 线数仓的数据时效性是 T+1,调度频率以天为单位,⽆法⽀撑实时 ...

  5. 揭秘!阿里实时数仓分布式事务Scale Out设计

    简介: Hybrid Transaction Analytical Processing(HTAP) 是著名信息技术咨询与分析公司Gartner在2014年提出的一个新的数据库系统定义,特指一类兼具O ...

  6. 从阿里核心场景看实时数仓的发展趋势

    简介:随着2021年双11的完美落幕,实时数仓技术在阿里双11场景也经历了多年的实践和发展.从早期的基于不同作业的烟囱式开发,到基于领域分层建模的数仓引入,再到分析服务一体化的新型融合式一站式架构,开 ...

  7. 快手基于 Flink 构建实时数仓场景化实践

    摘要:本文整理自快手数据技术专家李天朔在 5 月 22 日北京站 Flink Meetup 分享的议题<快手基于 Flink 构建实时数仓场景化实践>,内容包括: 快手实时计算场景 快手实 ...

  8. 20000字详解大厂实时数仓建设(好文收藏)

    一.实时数仓建设背景 1. 实时需求日趋迫切 目前各大公司的产品需求和内部决策对于数据实时性的要求越来越迫切,需要实时数仓的能力来赋能.传统离线数仓的数据时效性是 T+1,调度频率以天为单位,无法支撑 ...

  9. 他山之石|大厂实时数仓建设全解析

    点击上方蓝色字体,选择"设为星标" 回复"面试"获取更多惊喜 八股文教给我,你们专心刷题和面试 Hi,我是王知无,一个大数据领域的原创作者. 放心关注我,获取更 ...

最新文章

  1. [svc]caffe安装笔记-显卡购买
  2. 鸽巢原理(抽屉原理)的详解
  3. openstack学习笔记三 创建第一个实例
  4. 12.10课堂学习----实例化、构造方法案例
  5. 阿里云MVP:如何设计实现一个通用的微服务架构?
  6. qt android 设备权限,QtScrcpy: Android实时投屏软件,此应用程序提供USB(或通过TCP/IP)连接的Android设备的显示和控制。它不需要任何root访问权限...
  7. 12-17 学习记录
  8. 画直线的算法之DDA算法+代码实现(法一)
  9. kettle | error working with XUL definition
  10. 研究称在家中工作可提高员工工作效率
  11. 如何把用Word文档转换成PNG图片
  12. 考研专业类计算机是什么类别,计算机类考研科目种类都有哪些分别是什么
  13. 读书笔记2014第11本:历史上最伟大的10个方程
  14. el-menu菜单下划线解决办法
  15. 2019-11软考报名网站汇总,陆续更新
  16. 国内投资者投资港股的四种方法和港股必知25件事
  17. github镜像网站_Jenkins把GitHub项目做成Docker镜像
  18. java并发编程之再学习
  19. elasticsearch的.security-7索引崩溃恢复笔记
  20. 终端怎么退出python命令行

热门文章

  1. 微信小程序入门教程+案例demo
  2. rasa.exceptions.ModelNotFound: No NLU or Core data for unpacked model at:
  3. Chinese New Year
  4. Visual Studio C++ 输出调试信息在调试-输出窗口
  5. linux dns chroot,chroot环境下dns解析不好用
  6. Python的学习(二十一)----Python的静态变量
  7. Java线程状态总结
  8. 谷歌翻译用不了,失效的最新解决方法之一
  9. hexo笔记十一:next主题添加留言页面
  10. 认证资料大全(八)------ SUN认证列表