接    使用idea编写SparkStreaming消费kafka中的数据【小案例】(四)

https://georgedage.blog.csdn.net/article/details/103508619

先对上篇做一个回顾,在上一篇我们编写消费者,并且使用sparkStreaming对kafka中的数据进行批处理。

这篇对我们接收来的数据进行一个指标的统计。

这是原始的数据

1576139418467 4 1 henan kaifeng
1576139419467 3 0 hebei handan
1576139420467 3 2 henan kaifeng
1576139421467 0 0 beijing daxing
1576139422467 3 0 beijing daxing
1576139423467 1 2 henan zhengzhou
1576139424467 4 2 henan kaifeng
1576139425477 0 0 henan kaifeng
1576139426483 0 0 beijing haidian
1576139427483 3 0 beijing haidian
1576139428483 4 2 henan kaifeng
1576139429483 1 2 henan kaifeng
1576139430483 1 0 guangzhou zhongshan
1576139431483 3 0 henan zhengzhou
1576139432483 4 0 guangzhou zhuhai
1576139433483 1 0 guangzhou zhongshan

对时间戳进行转化,然后统计这一天中访问的同一ip多次点击同一广告id的前几位,可以经过一个阈值的过滤,将其列为黑名单。也就是我们所生产的日志,例如你进行搜索时百度第一页的某广告,点击是需要向百度进行付款的,所以我们对于恶意点击者进行拉黑处理。算是一个风控措施。

友情提示:如果按照之前我们对于生产者的随机数的话,不容易看到效果,所以讲生产者代码中随机数范围进行缩小。

消费者代码如下:

package com.kafkaimport java.text.SimpleDateFormat
import java.util.Dateimport org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}object KafkaConsumerStreamingDemo {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[3]").setAppName("kks")val sc = new SparkContext(conf)val ssc = new StreamingContext(sc,Seconds(5))sc.setLogLevel("error")val topic = List("george")val map = Map("bootstrap.servers" -> "henu2:9092","group.id" -> "george","key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer")ssc.checkpoint("D:\\ckpoint")val ds = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topic, map))val mapDs = ds.map(_.value())val sdf = new SimpleDateFormat("yyyy-MM-dd")val userDs: DStream[(String, Int)] = mapDs.transform(x => x.map(line => {val arr = line.split(" ")val date = sdf.format(new Date(arr(0).toLong))val userId = arr(1)val adId = arr(2)(date + "," + userId + "," + adId, 1)}))val reduceDs = userDs.reduceByKey(_+_)val resDs: DStream[(String, Int)] = reduceDs.updateStateByKey((currentValues: Seq[Int], preValue: Option[Int]) => {val now = currentValues.sumval pre = preValue.getOrElse(0)Option(now + pre)})resDs.print()ssc.start()ssc.awaitTermination()}
}

部分结果展示:

(2019-12-12,2,2,1)
(2019-12-12,4,0,3)
(2019-12-12,1,0,4)
(2019-12-12,3,1,1)
(2019-12-12,4,1,3)
(2019-12-12,3,2,2)
(2019-12-12,2,0,2)
(2019-12-12,4,2,2)
(2019-12-12,2,1,1)
(2019-12-12,0,0,5)

后续就是我们假如将一天内单个ip同一adid的访问数值大于100的进行过滤处理,然后存储即可。下回分解!!!

使用idea编写SparkStreaming消费kafka中的数据,并对数据进行某一指标的处理【小案例】(五)相关推荐

  1. 使用idea编写SparkStreaming消费kafka中的数据【小案例】(四)

    继续接     使用idea编写消费者,接收生产者的持续日志输出[小案例](三) https://georgedage.blog.csdn.net/article/details/103506165 ...

  2. 用idea编写代码作为生产者,Kafka接收其【持续】发来的广告日志信息【小案例】(二)

    接我们上一篇使用idea编写代码作为生产者,Kafka接收其发来的信息[小案例](一) https://georgedage.blog.csdn.net/article/details/1035034 ...

  3. 修改kafka中某一主题的数据存留时间

    我们都知道,kafka中默认消息的保留时间是7天,若想更改,需在配置文件 server.properties里更改选项: log.retention.hours=168 但是有的时候我们需要对某一个主 ...

  4. flink实时消费kafka中oracle的DML数据写入mysql

    1.需要环境 zookeeper,小编安装环境为zookeeper-3.4.10 kakfa,小编安装环境为kafka_2.13-2.8.0 kafka-connect-oracle,此为kafka- ...

  5. SparkStreaming消费kafka数据时出现序列化问题 org.apache.kafka.common.serialization.StringDeserializer could not b

    问题呈现 Invalid value org.apache. kafka.common.serialization.StringSerializer for configuration key.ser ...

  6. kafka jar包_Windows环境下Flink消费Kafka实现热词统计

    前言碎语 昨天博主写了<windows环境下flink入门demo实例>实现了官方提供的最简单的单词计数功能,今天升级下,将数据源从socket流换成生产级的消息队列kafka来完成一样的 ...

  7. SparkStreaming整合Kafka(0.8.2.1)计算不同业务指标并实现累加(结合Redis)

    业务是订单成交信息,要求计算出成交总金额,每一类商品的金额,区域成交的金额这三个指标. 数据格式:C 202.102.152.3 家具 婴儿床 2000 SparkStreaming读取Kafka中的 ...

  8. SparkStreaming读取Kafka的Json数据然后保存到MySQL

    一般我们使用SparkStreaming消费kafka数据,获取到数据后解析,使用JDBC的方式写入数据库,如下所示. 以上的方式没什么毛病,但是当我们消费的kafka数据类型比较多样的时候,我们需要 ...

  9. sparkStreaming连接kafka整合hbase和redis

    sparkStreaming消费kafka数据,并将数据保存到redis和hbase当中去,实现实时 import org.apache.hadoop.hbase.client.{Admin, Con ...

最新文章

  1. android读取大图片并缓存
  2. 如何查看,当运行一个hibernate 方法后到底执行了哪些SQL语句
  3. less新手入门(四)—— Mixin Guards
  4. com.mysql.jdbc.NotUpdatable: Result Set not updatable (references no primary keys).(解决方法)
  5. 基于SVD的降维优化
  6. 程序员应具备的职业素质
  7. 如何使用T-SQL生成随机SQL Server测试数据
  8. 最好的ppt转pdf软件
  9. es String 内部实现逻辑标准
  10. 【JSTL】<c:if test=“”>没有else的解决方法
  11. JavaScript组成
  12. 基于JMP的神经网络设计案例分析
  13. java导出excel水印_java解决poi导出excel文字水印,导出excel不可操作问题
  14. 最新版本kindle安卓app导入mobi图书和设置自定义字体的方法2020.01.09
  15. 一、Fiddler抓包工具 — Fiddler介绍与安装
  16. linux7怎么关闭更新,如何让centos7关闭yum自动更新系统
  17. 2018新年计划-雄起
  18. linux 小括号 中括号 双小括号 双中括号
  19. 城市交通公众号2021年头部内容发布
  20. 【open3d】安装open3d.whl之后,import报错ModuleNotFoundError: No module named ‘open3d.cpu‘

热门文章

  1. 洛谷 - P1361 小M的作物(最大流最小割)
  2. java 网络驱动器_删除多余的网络驱动器
  3. router vue 动态改变url_vue动态路由
  4. QT乱码总结4.细谈本地编码
  5. 获取网络时间并刷新本地时间(源码2)
  6. Eclipse CDT Hello World工程makefile分析
  7. MFC内嵌web页面
  8. 第01讲:必知必会,掌握 HTTP 基本原理
  9. Python识别图片的清晰度
  10. MySQL(一)SQL执行流程与MySQL架构