使用idea编写SparkStreaming消费kafka中的数据,并对数据进行某一指标的处理【小案例】(五)
接 使用idea编写SparkStreaming消费kafka中的数据【小案例】(四)
https://georgedage.blog.csdn.net/article/details/103508619
先对上篇做一个回顾,在上一篇我们编写消费者,并且使用sparkStreaming对kafka中的数据进行批处理。
这篇对我们接收来的数据进行一个指标的统计。
这是原始的数据
1576139418467 4 1 henan kaifeng
1576139419467 3 0 hebei handan
1576139420467 3 2 henan kaifeng
1576139421467 0 0 beijing daxing
1576139422467 3 0 beijing daxing
1576139423467 1 2 henan zhengzhou
1576139424467 4 2 henan kaifeng
1576139425477 0 0 henan kaifeng
1576139426483 0 0 beijing haidian
1576139427483 3 0 beijing haidian
1576139428483 4 2 henan kaifeng
1576139429483 1 2 henan kaifeng
1576139430483 1 0 guangzhou zhongshan
1576139431483 3 0 henan zhengzhou
1576139432483 4 0 guangzhou zhuhai
1576139433483 1 0 guangzhou zhongshan
对时间戳进行转化,然后统计这一天中访问的同一ip多次点击同一广告id的前几位,可以经过一个阈值的过滤,将其列为黑名单。也就是我们所生产的日志,例如你进行搜索时百度第一页的某广告,点击是需要向百度进行付款的,所以我们对于恶意点击者进行拉黑处理。算是一个风控措施。
友情提示:如果按照之前我们对于生产者的随机数的话,不容易看到效果,所以讲生产者代码中随机数范围进行缩小。
消费者代码如下:
package com.kafkaimport java.text.SimpleDateFormat
import java.util.Dateimport org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}object KafkaConsumerStreamingDemo {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[3]").setAppName("kks")val sc = new SparkContext(conf)val ssc = new StreamingContext(sc,Seconds(5))sc.setLogLevel("error")val topic = List("george")val map = Map("bootstrap.servers" -> "henu2:9092","group.id" -> "george","key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer")ssc.checkpoint("D:\\ckpoint")val ds = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topic, map))val mapDs = ds.map(_.value())val sdf = new SimpleDateFormat("yyyy-MM-dd")val userDs: DStream[(String, Int)] = mapDs.transform(x => x.map(line => {val arr = line.split(" ")val date = sdf.format(new Date(arr(0).toLong))val userId = arr(1)val adId = arr(2)(date + "," + userId + "," + adId, 1)}))val reduceDs = userDs.reduceByKey(_+_)val resDs: DStream[(String, Int)] = reduceDs.updateStateByKey((currentValues: Seq[Int], preValue: Option[Int]) => {val now = currentValues.sumval pre = preValue.getOrElse(0)Option(now + pre)})resDs.print()ssc.start()ssc.awaitTermination()}
}
部分结果展示:
(2019-12-12,2,2,1)
(2019-12-12,4,0,3)
(2019-12-12,1,0,4)
(2019-12-12,3,1,1)
(2019-12-12,4,1,3)
(2019-12-12,3,2,2)
(2019-12-12,2,0,2)
(2019-12-12,4,2,2)
(2019-12-12,2,1,1)
(2019-12-12,0,0,5)
后续就是我们假如将一天内单个ip同一adid的访问数值大于100的进行过滤处理,然后存储即可。下回分解!!!
使用idea编写SparkStreaming消费kafka中的数据,并对数据进行某一指标的处理【小案例】(五)相关推荐
- 使用idea编写SparkStreaming消费kafka中的数据【小案例】(四)
继续接 使用idea编写消费者,接收生产者的持续日志输出[小案例](三) https://georgedage.blog.csdn.net/article/details/103506165 ...
- 用idea编写代码作为生产者,Kafka接收其【持续】发来的广告日志信息【小案例】(二)
接我们上一篇使用idea编写代码作为生产者,Kafka接收其发来的信息[小案例](一) https://georgedage.blog.csdn.net/article/details/1035034 ...
- 修改kafka中某一主题的数据存留时间
我们都知道,kafka中默认消息的保留时间是7天,若想更改,需在配置文件 server.properties里更改选项: log.retention.hours=168 但是有的时候我们需要对某一个主 ...
- flink实时消费kafka中oracle的DML数据写入mysql
1.需要环境 zookeeper,小编安装环境为zookeeper-3.4.10 kakfa,小编安装环境为kafka_2.13-2.8.0 kafka-connect-oracle,此为kafka- ...
- SparkStreaming消费kafka数据时出现序列化问题 org.apache.kafka.common.serialization.StringDeserializer could not b
问题呈现 Invalid value org.apache. kafka.common.serialization.StringSerializer for configuration key.ser ...
- kafka jar包_Windows环境下Flink消费Kafka实现热词统计
前言碎语 昨天博主写了<windows环境下flink入门demo实例>实现了官方提供的最简单的单词计数功能,今天升级下,将数据源从socket流换成生产级的消息队列kafka来完成一样的 ...
- SparkStreaming整合Kafka(0.8.2.1)计算不同业务指标并实现累加(结合Redis)
业务是订单成交信息,要求计算出成交总金额,每一类商品的金额,区域成交的金额这三个指标. 数据格式:C 202.102.152.3 家具 婴儿床 2000 SparkStreaming读取Kafka中的 ...
- SparkStreaming读取Kafka的Json数据然后保存到MySQL
一般我们使用SparkStreaming消费kafka数据,获取到数据后解析,使用JDBC的方式写入数据库,如下所示. 以上的方式没什么毛病,但是当我们消费的kafka数据类型比较多样的时候,我们需要 ...
- sparkStreaming连接kafka整合hbase和redis
sparkStreaming消费kafka数据,并将数据保存到redis和hbase当中去,实现实时 import org.apache.hadoop.hbase.client.{Admin, Con ...
最新文章
- android读取大图片并缓存
- 如何查看,当运行一个hibernate 方法后到底执行了哪些SQL语句
- less新手入门(四)—— Mixin Guards
- com.mysql.jdbc.NotUpdatable: Result Set not updatable (references no primary keys).(解决方法)
- 基于SVD的降维优化
- 程序员应具备的职业素质
- 如何使用T-SQL生成随机SQL Server测试数据
- 最好的ppt转pdf软件
- es String 内部实现逻辑标准
- 【JSTL】<c:if test=“”>没有else的解决方法
- JavaScript组成
- 基于JMP的神经网络设计案例分析
- java导出excel水印_java解决poi导出excel文字水印,导出excel不可操作问题
- 最新版本kindle安卓app导入mobi图书和设置自定义字体的方法2020.01.09
- 一、Fiddler抓包工具 — Fiddler介绍与安装
- linux7怎么关闭更新,如何让centos7关闭yum自动更新系统
- 2018新年计划-雄起
- linux 小括号 中括号 双小括号 双中括号
- 城市交通公众号2021年头部内容发布
- 【open3d】安装open3d.whl之后,import报错ModuleNotFoundError: No module named ‘open3d.cpu‘