前言:
transform不是transformation,后者是对所有的转换算子的统称,transform也是transformation算子中的一个。transform算子的主要作用就是为了弥补Streaming没有提供的相关功能的操作,比如:一个DStream和RDD进行关联操作join,或者减少分区数量。

一、使用transform来完成分区减少coalsce的操作

dstream.transform(_.coalesce(newPartitionNum))

二、使用transform来完成DStream和RDD的join操作

计费系统,是电商必不可少的一个功能点。为了防止恶意的广告点击(假设商户A和B同时在某电商做了广告,A和B为竞争对手,那么如果A使用点击机器人进行对B的广告的恶意点击,那么B的广告费用将很快被用完),必须对广告点击进行黑名单过滤。黑名单的过滤可以是ID,可以是IP等等,黑名单就是过滤的条件,利用SparkStreaming的流处理特性,可实现实时黑名单的过滤实现。可以使用leftouter join 对目标数据和黑名单数据进行关联,将命中黑名单的数据过滤掉。

?:
/*** 使用transform进行在线黑名单过滤** 使用ip作为黑名单*     27.19.74.143*     110.52.250.126* 数据的格式:*     27.19.74.143##2016-05-30 17:38:20##GET /static/image/common/faq.gif HTTP/1.1##200##1127**/
object Transform2BlacklistFileter {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("Transform2BlacklistFileter").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(2))
//准备黑名单val blacklist:RDD[(String, Int)] = ssc.sparkContext.parallelize(Map("27.19.74.143" -> 1,"110.52.250.126" -> 1).toList)
//网络请求,包含了黑名单中的数据val lines:DStream[String] = ssc.socketTextStream("bigdata01", 9999)//数据示例//27.19.74.143##2016-05-30 17:38:20##GET /static/image/common/faq.gif HTTP/1.1##200##1127val ip2Info:DStream[(String, String)] = lines.map(line => {val ip = line.substring(0, line.indexOf("##"))val info = line.substring(line.indexOf("##") + 2)(ip, info)})
val filtered:DStream[(String, String)] = ip2Info.transform(infoRDD => {//            infoRDD.join(blacklist)//结果是黑名单中的数据,必须要val joinedRDD:RDD[(String, (String, Option[Int]))] = infoRDD.leftOuterJoin(blacklist)
joinedRDD.filter{case (ip, (info, option)) => !option.isDefined}.map{case (ip, (info, option)) => (ip, info)}})filtered.print()ssc.start()ssc.awaitTermination()}
}

sparkStreaming算子之transform相关推荐

  1. 【SparkStreaming学习之二】 SparkStreaming算子操作

    环境 虚拟机:VMware 10 Linux版本:CentOS-6.5-x86_64 客户端:Xshell4 FTP:Xftp4 jdk1.8 scala-2.10.4(依赖jdk1.8) spark ...

  2. Bookmarks(三)

    Bookmarks 书签栏 tooltips提示效果,支持点击与经过显示,位置和效果可以自定义 - CSDN博客 疯狂的小萝卜头 - 博客园 [Kettle从零开始]第九弹之Kettle定时任务介绍 ...

  3. SparkStreaming DStream入门及其算子应用

    什么是Dstream? 离散流(DStream)是Spark Streaming中的基本抽象,是表示相同数据流的RDD(相同类型)的连续序列. DStreams可以使用StreamingContext ...

  4. Spark _30_SparkStreaming算子操作Driver HA

    SparkStreaming算子操作 foreachRDD output operation算子,必须对抽取出来的RDD执行action类算子,代码才能执行. import org.apache.sp ...

  5. Dstream如何应用RDD特有算子?

    比如collectAsMap算子Dstream并没有,如何用呢? 可以通过foreachRDD或者transform算子间接使用. 比如 //sparkStreaming 算子测试案例wordcoun ...

  6. SparkStreaming实时数仓——日活

    文章目录 一.日活需求概述 思路: 二.搭建实时处理模块 前期准备: 2.1 创建module 2.2 pom.xml文件中导入依赖.创建需要的package 2.3 添加需要的配置文件 2.3.1 ...

  7. sparkStreaming:实时流数据详解

    目录 一.概述 二.wordCount示例 三.初始化StreamingContext 四.DStreams(离散数据流) 五.输入DStream和接收器 Basic sources File Str ...

  8. spark算子大全glom_(七)Spark Streaming 算子梳理 — repartition算子

    目录 天小天:(一)Spark Streaming 算子梳理 - 简单介绍streaming运行逻辑 天小天:(二)Spark Streaming 算子梳理 - flatMap和mapPartitio ...

  9. 大数据工程师入职京东年薪37w(附:面试真题分享)

    总结:引导面试官到自己擅长的领域,掌握主动权,问题回答不一定完整,说出关键点即可. 1.项目规模,一天/月数据量,各组件版本? 数据规模:一般100M数据由300万条数据:数据量:上百G:条数:达到几 ...

  10. Spark期末考试练习题

    一.单选题 1. 下面的端口不是 Spark 自带的服务端口的是___________. A. 8080 B. 4040 C. 8090 D. 18080 2. 下面的描述不是 RDD 的特点的是__ ...

最新文章

  1. 自定义 DataLoader
  2. 一步一步搭建ELK日志处理集群(自己做过测试)
  3. HADOOP学习笔记(一):HDFS
  4. JavaSE 6之脚本引擎让程序如虎添翼
  5. WAF自动化Fuzz工具-WAFNinja(绕WAF、绕过WAF)
  6. 基于汇编的 C/C++ 协程 - 实现
  7. 【撸码师的备忘录】JedisPool.returnResource()遭弃用
  8. 高等代数(第三版)北大(参考答案)
  9. 2021-01-13事件对象
  10. 苹果电脑桌面计算机图标不见了怎么办,苹果电脑桌面文件都不见了怎么显示
  11. 十大热门经典历史小说,大有希望获得第四届橙瓜网络文学奖
  12. 最新版cleanmymac4.11.3专业的Mac清理软件
  13. 走近棒球运动·堪萨斯城皇家队·MLB棒球创造营
  14. 数据挖掘--数据流挖掘
  15. 淘客基地免费商城CMS增加拾牛APP下载公告
  16. 【centos】安装wget------转发自【小姜dot】
  17. 程序员与代码之间的搞笑日常,笑的人肚子痛!
  18. 超时空智慧办公白皮书(2023)
  19. 世界上有多少Java开发人员?
  20. yum mysql的安装目录在哪_yum安装的mysql 目录结构

热门文章

  1. 杭州电子科技大学计算机学院院长,杭州电子科技大学计算机学院导师教师师资介绍简介-彭勇...
  2. 修改FTP和MSTSC默认端口号
  3. 宝塔利用同一个ip的不同端口号架设多个网站
  4. Java获取区间随机数公式
  5. min-max之间取随机数公式
  6. 工商阿里忙互殴 苏宁高调打假争做主角?
  7. 湖南成考新生如何查询学籍信息
  8. Kiwi Syslog日志服务器的安装及配置使用
  9. 鸡你太美,用大数据扒一扒蔡徐坤的真假流量粉
  10. php安装时候的源是什么,Windows安装源无效怎么办