Transformations on DStreams之transform 实现黑名单操作/指定过滤

官网:http://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations-on-dstreams
transform(func)
Return a new DStream by applying a RDD-to-RDD function to every RDD of the source DStream. This can be used to do arbitrary(任意的) RDD operations on the DStream.

rdd实现

package g5.learningimport org.apache.spark.{SparkConf, SparkContext}import scala.collection.mutable.ListBufferobject BlackListApp {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[2]").setAppName(" LogAppscala")val sc = new SparkContext(sparkConf)val input  =new ListBuffer[(String,String)]
//相当于输入数据input.append(("doudou","doudou info"))input.append(("huahua","huahua info"))input.append(("zhang","zhanginfo"))input.append(("xiaoxiao","xiaoxiao info"))
val inputRDD =sc.parallelize(input)//转化成RDDval blackTuple = new ListBuffer[(String,Boolean)]
blackTuple.append(("doudou",true))
val blackRDD =sc.parallelize(blackTuple)inputRDD.leftOuterJoin(blackRDD).filter(x =>{x._2._2.getOrElse(false)!=true
}).map(_._2._1).collect().foreach(println)sc.stop()}
}

transform的实现

package g5.learningimport org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}import scala.collection.mutable.ListBufferobject TransformApp {def main(args: Array[String]): Unit = {//准备工作val conf = new SparkConf().setMaster("local[2]").setAppName("TransformApp")val ssc = new StreamingContext(conf, Seconds(10))val blackTuple = new ListBuffer[(String,Boolean)]blackTuple.append(("doudou",true))val blackRDD =ssc.sparkContext.parallelize(blackTuple)这里发现sc没有,要用ssc.sparkContext来获取//业务逻辑val lines = ssc.socketTextStream("hadoop001", 9999)lines.map(x=>(x.split(",")(0),x)).transform(rdd =>{rdd.leftOuterJoin(blackRDD).filter(x =>{x._2._2.getOrElse(false)!=true}).map(_._2._1)}).print()//streaming的启动ssc.start() // Start the computationssc.awaitTermination() // Wait for the computation to terminate}
}

在这之前要nc -lk 9999
将他启动

主要:

是通过transform这个算子,把rdd和streaming联系到一起,单独使用这个streaming是完成不了的,

扩展:

不停作业读,把你的东西配到数据库里面就可以了,让他定时的读数据库,索要上线的配到数据库里面就可以了
假设有2000个,你今天配一个,明天配2个,,,,难道你的表里面配2000个才能上线么
肯定不是的,对于这种场景,肯定是不行的,这里就需要你个开关
你今天2000个都上完了,明天又来了50个,难道还要加到数据库里面去吗
2000上完,说明你整个功能没有问题了,后面就加的话就做一个开关
开关是什么意思呢
要读配置,是否读配置 0 1
–conf spark.filter.switch 放开关
–conf spark.filter.domains 放域名

好处,每次上线就不用再配置到这张表里面去了
生产用的非常多

Transformations on DStreams之transform的使用 实现黑名单操作/指定过滤相关推荐

  1. Transformations on DStreams之updateStateByKey 的使用和状态累加

    Transformations on DStreams之transform的使用 实现黑名单操作/指定过滤 https://blog.csdn.net/qq_43688472/article/deta ...

  2. 《认清C++语言》的random_shuffle()和transform()算法

    1)STL中的函数random_shuffle()用来对一个元素序列进行重新排序(随机的),包含在头文件 algorithm.h中 函数原型如下: template<class RandomAc ...

  3. 《认清C++语言》のrandom_shuffle()和transform()算法

    1)STL中的函数random_shuffle()用来对一个元素序列进行重新排序(随机的),函数原型如下: template<class RandomAccessIterator> voi ...

  4. CSS3与页面布局学习笔记(六)——CSS3新特性(阴影、动画、渐变、变形( transform)、透明、伪元素等)...

    一.阴影 1.1.文字阴影 text-shadow <length>①: 第1个长度值用来设置对象的阴影水平偏移值.可以为负值 <length>②: 第2个长度值用来设置对象的 ...

  5. pandas数据分组聚合——groupby()、aggregate()、apply()、transform()和filter()方法详解

    数据分组 数据分组就是根据一个或多个键(可以是函数.数组或df列名)将数据分成若干组,然后对分组后的数据分别进行汇总计算,并将汇总计算后的结果进行合并,被用作汇总计算的函数称为聚合函数.数据分组的具体 ...

  6. [Rotation Transform] 旋转变换

    [Rotation Transform] 旋转变换 旋转矩阵 绕当前坐标系指定轴的变换矩阵 给定欧拉角的变换 欧拉角表示旋转的缺点 四元数与旋转 旋转顺序的验证 Shader中的方向变换 法线变换 这 ...

  7. CSS transform属性

    一,transform(变形) 1.transform字面的意思就是变形的意思,在CSS3中,transform支持的几个操作有 (1)旋转rotate. (2)扭曲skew. (3)缩放scale ...

  8. Gradle Transform API 和 Annotation Processor 简要介绍

    原始网页直通车 Transform 对字节码进行操作,作用在 java 代码编译生成 class 文件之后, class 打包生成 apk 之前,在这段时间内,遍历修改 class 文件. Trans ...

  9. C++/C++11中std::transform的使用

    std::transform函数是将某操作应用于指定范围的每个元素.要使用std::transform函数需要包含<algorithm>头文件. 以下是对std::transform的解释 ...

最新文章

  1. DB2load遇到SQL3508N错误
  2. main方法_错误: 在类 ZiFUChuan.Pyramid 中找不到 main 方法, 请将 main 方法定义为:
  3. mysql提示performance_schema缺表
  4. Java字符串那些事儿
  5. 命令行下操作常用语句
  6. keypair java_如何在Java中序列化和反序列化RSA KeyPair
  7. 判断 小程序_第五届美亚杯赛前必备:从案情资料到小程序解题
  8. c语言循环字符,字符串 非暴力for循环法(内附C语言代码)
  9. hdu3579(中国剩余问题经典)
  10. jquery定时滑出可最小化的底部提示层
  11. java集群_「Java知识」MyCat的图文视频讲解,MyCat分片集群分表分库策略
  12. python冒泡循环示例_Python for循环示例
  13. 华为路由器配置静态路由和下一跳,使PC互通
  14. 论车牌识别与电子警察关系
  15. Friedman 检验后的two-tailed Nemenyi test和the two-tailed Bonferroni-Dunn test的关键值
  16. 基于深度学习的恶意软件检测Python代码及数据
  17. FTX创办人SBF:区块链并不是炒作,有三大应用场景可大放异彩
  18. 常见锁的区别及适用场景
  19. 2023年智能无人系统与人工智能国际会议(SIUSAI 2023)
  20. S300V的前世今生

热门文章

  1. nginx命令和配置
  2. 23亿美元大市场,NFV做好了准备吗?
  3. 【数据结构笔记42】哈希表应用:文件中单词词频统计
  4. 【操作系统/OS笔记11】并发执行的必要性,产生的问题,原子操作,为什么引入锁机制,面包购买的类比
  5. 【数据结构笔记01】什么是数据结构
  6. verilog实现多周期处理器之——(五)移动操作(通用数据传送)指令的实现
  7. java hashset 源码_Java集合:HashSet的源码分析
  8. vue.js环境部署
  9. 了解了这些才能开始发挥jQuery的威力
  10. mysql支持数据安全的引擎_MySQL 支持的存储引擎