Transformations on DStreams之transform的使用 实现黑名单操作/指定过滤
Transformations on DStreams之transform 实现黑名单操作/指定过滤
官网:http://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations-on-dstreams
transform(func)
Return a new DStream by applying a RDD-to-RDD function to every RDD of the source DStream. This can be used to do arbitrary(任意的) RDD operations on the DStream.
rdd实现
package g5.learningimport org.apache.spark.{SparkConf, SparkContext}import scala.collection.mutable.ListBufferobject BlackListApp {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[2]").setAppName(" LogAppscala")val sc = new SparkContext(sparkConf)val input =new ListBuffer[(String,String)]
//相当于输入数据input.append(("doudou","doudou info"))input.append(("huahua","huahua info"))input.append(("zhang","zhanginfo"))input.append(("xiaoxiao","xiaoxiao info"))
val inputRDD =sc.parallelize(input)//转化成RDDval blackTuple = new ListBuffer[(String,Boolean)]
blackTuple.append(("doudou",true))
val blackRDD =sc.parallelize(blackTuple)inputRDD.leftOuterJoin(blackRDD).filter(x =>{x._2._2.getOrElse(false)!=true
}).map(_._2._1).collect().foreach(println)sc.stop()}
}
transform的实现
package g5.learningimport org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}import scala.collection.mutable.ListBufferobject TransformApp {def main(args: Array[String]): Unit = {//准备工作val conf = new SparkConf().setMaster("local[2]").setAppName("TransformApp")val ssc = new StreamingContext(conf, Seconds(10))val blackTuple = new ListBuffer[(String,Boolean)]blackTuple.append(("doudou",true))val blackRDD =ssc.sparkContext.parallelize(blackTuple)这里发现sc没有,要用ssc.sparkContext来获取//业务逻辑val lines = ssc.socketTextStream("hadoop001", 9999)lines.map(x=>(x.split(",")(0),x)).transform(rdd =>{rdd.leftOuterJoin(blackRDD).filter(x =>{x._2._2.getOrElse(false)!=true}).map(_._2._1)}).print()//streaming的启动ssc.start() // Start the computationssc.awaitTermination() // Wait for the computation to terminate}
}
在这之前要nc -lk 9999
将他启动
主要:
是通过transform这个算子,把rdd和streaming联系到一起,单独使用这个streaming是完成不了的,
扩展:
不停作业读,把你的东西配到数据库里面就可以了,让他定时的读数据库,索要上线的配到数据库里面就可以了
假设有2000个,你今天配一个,明天配2个,,,,难道你的表里面配2000个才能上线么
肯定不是的,对于这种场景,肯定是不行的,这里就需要你个开关
你今天2000个都上完了,明天又来了50个,难道还要加到数据库里面去吗
2000上完,说明你整个功能没有问题了,后面就加的话就做一个开关
开关是什么意思呢
要读配置,是否读配置 0 1
–conf spark.filter.switch 放开关
–conf spark.filter.domains 放域名
好处,每次上线就不用再配置到这张表里面去了
生产用的非常多
Transformations on DStreams之transform的使用 实现黑名单操作/指定过滤相关推荐
- Transformations on DStreams之updateStateByKey 的使用和状态累加
Transformations on DStreams之transform的使用 实现黑名单操作/指定过滤 https://blog.csdn.net/qq_43688472/article/deta ...
- 《认清C++语言》的random_shuffle()和transform()算法
1)STL中的函数random_shuffle()用来对一个元素序列进行重新排序(随机的),包含在头文件 algorithm.h中 函数原型如下: template<class RandomAc ...
- 《认清C++语言》のrandom_shuffle()和transform()算法
1)STL中的函数random_shuffle()用来对一个元素序列进行重新排序(随机的),函数原型如下: template<class RandomAccessIterator> voi ...
- CSS3与页面布局学习笔记(六)——CSS3新特性(阴影、动画、渐变、变形( transform)、透明、伪元素等)...
一.阴影 1.1.文字阴影 text-shadow <length>①: 第1个长度值用来设置对象的阴影水平偏移值.可以为负值 <length>②: 第2个长度值用来设置对象的 ...
- pandas数据分组聚合——groupby()、aggregate()、apply()、transform()和filter()方法详解
数据分组 数据分组就是根据一个或多个键(可以是函数.数组或df列名)将数据分成若干组,然后对分组后的数据分别进行汇总计算,并将汇总计算后的结果进行合并,被用作汇总计算的函数称为聚合函数.数据分组的具体 ...
- [Rotation Transform] 旋转变换
[Rotation Transform] 旋转变换 旋转矩阵 绕当前坐标系指定轴的变换矩阵 给定欧拉角的变换 欧拉角表示旋转的缺点 四元数与旋转 旋转顺序的验证 Shader中的方向变换 法线变换 这 ...
- CSS transform属性
一,transform(变形) 1.transform字面的意思就是变形的意思,在CSS3中,transform支持的几个操作有 (1)旋转rotate. (2)扭曲skew. (3)缩放scale ...
- Gradle Transform API 和 Annotation Processor 简要介绍
原始网页直通车 Transform 对字节码进行操作,作用在 java 代码编译生成 class 文件之后, class 打包生成 apk 之前,在这段时间内,遍历修改 class 文件. Trans ...
- C++/C++11中std::transform的使用
std::transform函数是将某操作应用于指定范围的每个元素.要使用std::transform函数需要包含<algorithm>头文件. 以下是对std::transform的解释 ...
最新文章
- DB2load遇到SQL3508N错误
- main方法_错误: 在类 ZiFUChuan.Pyramid 中找不到 main 方法, 请将 main 方法定义为:
- mysql提示performance_schema缺表
- Java字符串那些事儿
- 命令行下操作常用语句
- keypair java_如何在Java中序列化和反序列化RSA KeyPair
- 判断 小程序_第五届美亚杯赛前必备:从案情资料到小程序解题
- c语言循环字符,字符串 非暴力for循环法(内附C语言代码)
- hdu3579(中国剩余问题经典)
- jquery定时滑出可最小化的底部提示层
- java集群_「Java知识」MyCat的图文视频讲解,MyCat分片集群分表分库策略
- python冒泡循环示例_Python for循环示例
- 华为路由器配置静态路由和下一跳,使PC互通
- 论车牌识别与电子警察关系
- Friedman 检验后的two-tailed Nemenyi test和the two-tailed Bonferroni-Dunn test的关键值
- 基于深度学习的恶意软件检测Python代码及数据
- FTX创办人SBF:区块链并不是炒作,有三大应用场景可大放异彩
- 常见锁的区别及适用场景
- 2023年智能无人系统与人工智能国际会议(SIUSAI 2023)
- S300V的前世今生