一、广告点击流量实时统计模块介绍

网站 / app,是不是通常会给一些第三方的客户,打一些广告;也是一些互联网公司的核心收入来源;广告在网站 / app某个广告位打出去,在用户来使用网站 / app的时候,广告会显示出来;此时,有些用户可能就会去点击那个广告。

广告被点击以后,实际上,我们就是要针对这种用户行为(广告点击行为),实时数据,进行实时的大数据计算和统计。

每次点击一个广告以后,通常来说,网站 / app中都会有埋点(前端的应用中,比如JavaScript Ajax;app中的socket网络请求,往后台发送一条日志数据);日志数据而言,通常,如果要做实时统计的话,那么就会通过某些方式将数据写入到分布式消息队列中(Kafka);

日志写入到后台web服务器(nginx),nginx产生的实时的不断增加 / 更新的本地日志文件,就会被日志监控客户端(比如flume agent),写入到消息队列中(kafka),我们要负责编写实时计算程序,去从消息队列中(kafka)去实时地拉取数据,然后对数据进行实时的计算和统计。

这个模块的意义在于,让产品经理、高管可以实时地掌握到公司打的各种广告的投放效果。以便于后期持续地对公司的广告投放相关的战略和策略,进行调整和优化;以期望获得最好的广告收益。

二、需求分析:

1、实现实时的动态黑名单机制:将每天对某个广告点击超过100次的用户拉黑
2、基于黑名单的非法广告点击流量过滤机制:
3、每天各省各城市各广告的点击流量实时统计:
4、统计每天各省top3热门广告
5、统计各广告最近1小时内的点击量趋势:各广告最近1小时内各分钟的点击量
6、使用高性能方式将实时统计结果写入MySQL
7、实现实时计算程序的HA高可用性(Spark Streaming HA方案)
8、实现实时计算程序的性能调优(Spark Streaming Performence Tuning方案)

三、数据设计

3.1、数据格式介绍:

timestamp   1450702800
province    Jiangsu
city    Nanjing
userid  100001
adid    100001相关表
每个用户对某个广告的点击量
CREATE TABLE `ad_user_click_count` (`date` varchar(30) DEFAULT NULL,`user_id` int(11) DEFAULT NULL,`ad_id` int(11) DEFAULT NULL,`click_count` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8黑名单
CREATE TABLE `ad_blacklist` (`user_id` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8每天每个省份某个广告的点击数
CREATE TABLE `ad_stat` (`date` varchar(30) DEFAULT NULL,`province` varchar(100) DEFAULT NULL,`city` varchar(100) DEFAULT NULL,`ad_id` int(11) DEFAULT NULL,`click_count` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8top3
CREATE TABLE `ad_province_top3` (`date` varchar(30) DEFAULT NULL,`province` varchar(100) DEFAULT NULL,`ad_id` int(11) DEFAULT NULL,`click_count` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8广告的点击趋势,
CREATE TABLE `ad_click_trend` (`date` varchar(30) DEFAULT NULL,`hour` varchar(30) default null,`minute` varchar(30) DEFAULT NULL,`ad_id` int(11) DEFAULT NULL,`click_count` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8

四、技术方案设计及具体实现

4.1、实时计算各batch中的每天各用户对各广告的点击次数

4.1.1、创建topic

 ./kafka-topics.sh --zookeeper chb0-179005:2181,chb0-179004:2181,chb1-179006:2181 --describe --topic topic-ad

4.1.2、构件Spark Streaming

        // 构建Spark Streaming上下文SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("AdClickRealTimeStatSpark");//构件Java Streaming ContextJavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));  //每个batch的时间间隔

4.1.3、构件kafkaDirectStream

        // 构建kafka参数map// 主要要放置的就是,你要连接的kafka集群的地址(broker集群的地址列表)Map<String, String> kafkaParams = new HashMap<String, String>();kafkaParams.put(Constants.KAFKA_METADATA_BROKER_LIST, Constants.propsUtils.get(Constants.KAFKA_METADATA_BROKER_LIST));// 构建topic setString kafkaTopics = Constants.propsUtils.get(Constants.KAFKA_TOPICS);String[] kafkaTopicsSplited = kafkaTopics.split(",");  Set<String> topics = new HashSet<String>();for(String kafkaTopic : kafkaTopicsSplited) {topics.add(kafkaTopic);}//构建kafakDStreamJavaPairInputDStream<String, String> adRealTimeLogDStream = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);

4.1.4、通过kafkaDirectStream获取数据, 对数据进行mapToPair格式化成我们需要的key,

每天每个广告每个用户的点击量, 所以key为yyyyMMdd_userid_adid,value=1
然后reduceyByKey 进行合并。
输出的就是我们需要的结果, 每天每个用户对某个广告的点击数。

 源源不断的,每个5s的batch中,当天每个用户对每支广告的点击次数<yyyyMMdd_userid_adid, clickCount>

4.2、使用高性能方式将每天各用户对各广告的点击次数写入MySQL中(更新)

对于咱们这种实时计算程序的mysql插入,有两种pattern(模式)1、比较挫:每次插入前,先查询,看看有没有数据,如果有,则执行insert语句;如果没有,则执行update语句;好处在于,每个key就对应一条记录;坏处在于,本来对一个分区的数据就是一条insert batch,现在很麻烦,还得先执行select语句,再决定是insert还是update。j2ee系统,查询某个key的时候,就直接查询指定的key就好。2、稍微好一点:每次插入记录,你就插入就好,但是呢,需要在mysql库中,给每一个表,都加一个时间戳(timestamp),对于同一个key,5秒一个batch,每隔5秒中就有一个记录插入进去。相当于在mysql中维护了一个key的多个版本。j2ee系统,查询某个key的时候,还得限定是要order by timestamp desc limit 1,查询最新时间版本的数据通过mysql来用这种方式,不是很好,很不方便后面j2ee系统的使用不用mysql;用hbase(timestamp的多个版本,而且它不却分insert和update,统一就是去对某个行键rowkey去做更新)

4.3、使用filter过滤出每天对某个广告点击超过100次的黑名单用户,并写入MySQL中

4.3.1、首先过滤每个批次中的记录, 在数据库中点击数查过100的记录

        JavaPairDStream<String, Long> blacklistDStream = dailyUserAdClickCountDStream.filter(new Function<Tuple2<String,Long>, Boolean>() {@Overridepublic Boolean call(Tuple2<String, Long> v1) throws Exception {String key = v1._1;String[] keySplited = key.split("_");  // yyyyMMdd -> yyyy-MM-ddString date = DateUtils.formatDate(DateUtils.parseDateKey(keySplited[0]));  long userid = Long.valueOf(keySplited[1]);  long adid = Long.valueOf(keySplited[2]);  // 从mysql中查询指定日期指定用户对指定广告的点击量IAdUserClickCountDAO adUserClickCountDAO = DAOFactory.getAdUserClickCountDAO();int clickCount = adUserClickCountDAO.findClickCountByMultiKey(date, userid, adid);// 判断,如果点击量大于等于10,ok,那么不好意思,你就是黑名单用户// 那么就拉入黑名单,返回trueif(clickCount >= 10) {return true;}// 反之,如果点击量小10的,那么就暂时不要管它了return false;}});

4.3.2、由于我们是获取黑名单, 所以只需要userid, 通过map,只返回userid的RDD

        JavaDStream<Long> blaskListUseridDStream = blacklistDStream.map(new Function<Tuple2<String,Long>, Long>() {@Overridepublic Long call(Tuple2<String, Long> v1) throws Exception {String key = v1._1;String[] keySplited = key.split("_");  Long userid = Long.valueOf(keySplited[1]);  return userid;}});

4.3.3、但是在一个batch中,可以有多个条相同用户的记录, 所以我们需要进行去重

此处利用tranform, transform操作允许将任意RDD到RDD函数应用于DStream

//去重JavaDStream<Long> distblaskListUseridDStream = blaskListUseridDStream.transform(new Function<JavaRDD<Long>, JavaRDD<Long>>() {@Overridepublic JavaRDD<Long> call(JavaRDD<Long> v1) throws Exception {return v1.distinct();}});

4.3.4、通过foreachRDD,将黑名单用户userid存入mysql中。

4.3.5、动态生成的黑名单, 用于日志信息进行过滤

对4.1.4进行升级, 动态获取数据库中的黑名单, 如果是黑名单中的用于, 记录就不用继续考虑了。

4.4、用transform操作,对每个batch RDD进行处理,都动态加载MySQL中的黑名单生成RDD,然后进行join后,过滤掉batch RDD中的黑名单用户的广告点击行为

通过transformToPair, 可以对DStream中每个RDD进行算子操作,
获取数据库中的黑名单, 通过parallelizePairs,转化成backRDD, <userId, true>
为了能够与backRDD进行join, 先将点击行为RDD进行map, 得到<userId, rdd>
join操作, 但是需要注意, 由于点击行为的用于不一定在blackRDD中, 所以需要使用leftOuterJoin
通过filter,过滤处非黑名单中的点击行为信息。
输出我们需要的数据,
    /*** 对源数据进行过滤* @param adRealTimeLogDStream 源数据* @return*/private static JavaPairDStream<String, String> filterByBlackList(JavaPairInputDStream<String, String> adRealTimeLogDStream) {JavaPairDStream<String, String> filterDStream = adRealTimeLogDStream.transformToPair(new Function<JavaPairRDD<String,String>, JavaPairRDD<String,String>>() {private static final long serialVersionUID = -8650685273099590863L;@Overridepublic JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception {//获取黑名单IAdBlacklistDAO adBlacklistDAO = DAOFactory.getAdBlacklistDAO();List<AdBlacklist> adBlacklists = adBlacklistDAO.findAll();List<Tuple2<Long, Boolean>> blackTuples = new ArrayList<Tuple2<Long, Boolean>>();for(AdBlacklist adBlacklist : adBlacklists) {blackTuples.add(new Tuple2<Long, Boolean>(adBlacklist.getUserid(), true));}//转化成一个RDDJavaSparkContext sc = new JavaSparkContext(rdd.context());JavaPairRDD<Long, Boolean> blacListRDD = sc.parallelizePairs(blackTuples);//为了与blackListRDD进行join, 需要将原始数据rdd,转化为<userid, 原始数据>JavaPairRDD<Long, Tuple2<String, String>> mapRDD = rdd.mapToPair(new PairFunction<Tuple2<String,String>, Long, Tuple2<String, String>>() {private static final long serialVersionUID = 7001016275687081936L;@Overridepublic Tuple2<Long, Tuple2<String, String>> call(Tuple2<String, String> t) throws Exception {String logInfo = t._2;long userId = Long.valueOf(logInfo.split(" ")[3]);return new Tuple2<Long, Tuple2<String,String>>(userId, t);}});// 将原始日志数据rdd,与黑名单rdd,进行左外连接// 如果说原始日志的userid,没有在对应的黑名单中,join不到,左外连接// 用inner join,内连接,会导致数据丢失JavaPairRDD<Long, Tuple2<Tuple2<String, String>, Optional<Boolean>>> joinRDD = mapRDD.leftOuterJoin(blacListRDD);//过滤JavaPairRDD<Long, Tuple2<Tuple2<String, String>, Optional<Boolean>>> filterRDD = joinRDD.filter(new Function<Tuple2<Long,Tuple2<Tuple2<String,String>,Optional<Boolean>>>, Boolean>() {private static final long serialVersionUID = 2749100847991384506L;@Overridepublic Boolean call(Tuple2<Long, Tuple2<Tuple2<String, String>, Optional<Boolean>>> v1)throws Exception {Optional<Boolean> optional = v1._2._2;// 如果这个值存在,那么说明原始日志中的userid,join到了某个黑名单用户if(optional.isPresent() && optional.get()) {return false;}return true;}});JavaPairRDD<String, String> resultRDD = filterRDD.mapToPair(new PairFunction<Tuple2<Long,Tuple2<Tuple2<String,String>,Optional<Boolean>>>, String, String>() {private static final long serialVersionUID = 8270722683260778935L;@Overridepublic Tuple2<String, String> call(Tuple2<Long, Tuple2<Tuple2<String, String>, Optional<Boolean>>> t)throws Exception {return t._2._1;}});return resultRDD;}});return filterDStream;}

4.5、使用updateStateByKey操作,实时计算每天各省各城市各广告的点击量,并时候更新到MySQL

4.5.1、通过mapToPair 将源数据格式化(yyyyMMdd_province_city_adid,clickCount)

4.5.2、updateStateByKey统计实时的点击量

        JavaPairDStream<String, Long> aggrDStream = mapDStream.updateStateByKey(new Function2<List<Long>, Optional<Long>, Optional<Long>>() {@Overridepublic Optional<Long> call(List<Long> values, Optional<Long> optional)throws Exception {long clickCount = 0L;if(optional.isPresent()){clickCount = optional.get();}//values 表示一个batch的所有值for (Long value : values) {clickCount += value;}return Optional.of(clickCount);}});

4.6、使用transform结合Spark SQL,统计每天各省份top3热门广告:首先以每天各省各城市各广告的点击量数据作为基础,首先统计出每天各省份各广告的点击量;然后启动一个异步子线程,使用Spark SQL动态将数据RDD转换为DataFrame后,注册为临时表;最后使用Spark SQL开窗函数,统计出各省份top3热门的广告,并更新到MySQL中

4.7、使用window操作,对最近1小时滑动窗口内的数据,计算出各广告各分钟的点击量,并更新到MySQL中

4.7.1、对过滤后的原始数据,使用mapToPair 映射成<yyyyMMddHHMM_adid,1L>格式

4.7.2、使用windwo operation统计最近一小时的 广告点击量

8、实现实时计算程序的HA高可用性

9、对实时计算程序进行性能调优

广告点击流量实时统计相关推荐

  1. 114.Spark大型电商项目-广告点击流量实时统计-使用高性能方式将实时计算结果写入MySQL中

    目录 误区 Spark Streaming foreachRDD的正确使用方式 对于这种实时计算程序的mysql插入,有两种pattern(模式) 代码 AdUserClickCount.java I ...

  2. SparkStreaming 实时计算 广告点击流量实时统计 需求 技术方案数据库设计

    几大模块内容 用户访问session分析模块:会话(session),用户的基础访问行为 页面单跳转化率模块:页面(page),用户的页面访问和页面跳转行为 各区域热门商品统计模块:商品(produc ...

  3. Spark日志分析项目Demo(8)--SparkStream,广告点击流量实时统计

    广告点击统计需求: (1)对接kafka,获得数据 (2)发现某个用户某天对某个广告的点击量已经大于等于100,写入黑名单,进行过滤 (3)计算广告点击流量实时统计结果 (4)实时统计每天每个省份to ...

  4. 112.Spark大型电商项目-广告点击流量实时统计-需求分析、技术方案设计以及数据设计

    目录 需求分析 技术方案设计 数据表设计 ad_user_click_count //用户点击广告表 ad_blacklist //用户黑名单 ad_stat  //广告状态表 ad_province ...

  5. Spark Streaming 项目实战 (4) | 得到最近1小时广告点击量实时统计并写入到redis

      大家好,我是不温卜火,是一名计算机学院大数据专业大二的学生,昵称来源于成语-不温不火,本意是希望自己性情温和.作为一名互联网行业的小白,博主写博客一方面是为了记录自己的学习过程,另一方面是总结自己 ...

  6. python流量实时统计_Python实现获取nginx服务器ip及流量统计信息功能示例

    本文实例讲述了Python实现获取nginx服务器ip及流量统计信息功能.分享给大家供大家参考,具体如下: #!/usr/bin/python #coding=utf8 log_file = &quo ...

  7. 大数据IMF传奇行动绝密课程第104-114课:Spark Streaming电商广告点击综合案例

    Spark Streaming电商广告点击综合案例 需求分析和技术架构 广告点击系统实时分析 广告来自于广告或者移动App等,广告需要设定在具体的广告位,当用户点击广告的时候,一般都会通过ajax或S ...

  8. 广告效果数据的实时计算与分析(Druid)(一)

    我是做SSP-供应方平台服务的,工作中除了负责SSP 管理后台的需求开发(如,媒体应用和位置的管理.流量分配.效果数据的报表展示.SDK性能分析等)之外,最主要的是负责SSP广告效果数据的实时统计与分 ...

  9. 流量专家为114搜索提供权威流量访问统计

    一:系统介绍 互联网流量实时统计产品是一套网站流量统计分析系统.致力于为所有网站.第三方统计等用户提供网站流量监控.统计.分析等专业服务. 通过互联网流量实时统计产品 ,站长可以随时知道自己网站的被访 ...

最新文章

  1. StringBuffer 案例
  2. 都说程序员是吃青春饭!
  3. 【前端】【cornerstone】【未完善】cornerstone重新加载图像大小问题——拒绝花里胡哨
  4. Lecture 1:强化学习简介
  5. Android Studio 打包AAR和第三方静态库
  6. DLL错误之——无法加载DLL***.dll:找不到指定的模块(异常来自HRESULT:0x8007007E)问题的终极感悟
  7. 微信Android端如何安全降级
  8. 激光传感器构建栅格地图
  9. 亚马逊云科技云知识总结
  10. java-不死神兔百钱百鸡
  11. Android各版本分布
  12. 百度IFE前端学院-DAY1-Web开发概览
  13. 期货开户需要具备⼀定的条件
  14. 内存管理(一)MRC
  15. Java基础易错面试题,初级程序员面试必看!(会不断更新)
  16. 罗斯蒙特3051S1CD3A2E12A1AB4D2M5变送器
  17. 分享 7 个 AI 优质开源项目!文本生成、自动化数据搜集...
  18. (一)永磁同步电机矢量控制(三电平)——浅谈三电平逆变器
  19. 让AI 作画更快一点
  20. Rabbitmq 入坑教程

热门文章

  1. 【c#视频】——面向对象——多态
  2. pytorch学习(五)---torch.nn模块
  3. 工作小结:端正态度,细心!
  4. 优盘制作服务器引导盘,USBOS V3.0彪悍版U盘启动盘制作工具-用于PC/工控机/服务器/Surface/Mac...
  5. 简单明了实现Java地图小程序项目
  6. ACWing算法提高课 友好城市
  7. 中国石油大学《社会学概论》第二次在线作业
  8. java.lang.IllegalArgumentException: not found @HttpResponse from class java.lang.Object解决方法
  9. 计算正整数的阶乘math.factorial()
  10. js-只能输入数字(正则)