测试数据(通过Socket传入):

20180808,zs
20180808,ls
20180808,ww

黑名单列表(生产存在表):

zs
ls

思路:

1、原始日志可以通过Streaming直接读取成一个DStream
2、名单通过RDD来模拟一份

逻辑实现:

1、将DStream转成以下格式(黑名单只有名字)

(zs,(20180808,zs))(ls,(20180808,ls))(ww,( 20180808,ww))
2、然后将黑名单转成

(zs, true)(ls, true)
3、然后DStram与RDD进行LeftJoin(DStream能与RDD进行Join就是借用的transform算子)

具体代码实现及注释:

package com.soul.spark.Streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/*** @author soulChun* @create 2019-01-10-16:12*/
object TransformApp {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setAppName("StatafulFunApp").setMaster("local[2]")val ssc = new StreamingContext(sparkConf,Seconds(10))//构建黑名单val blacks = List("zs", "ls")//通过map操作将黑名单结构转换成(zs, true)(ls, true)val blackRDD = ssc.sparkContext.parallelize(blacks).map(x => (x, true))val lines = ssc.socketTextStream("localhost", 8769)//lines (20180808,zs)//lines 通过map.split(1)之后取得就是zs,然后加一个x就转成了(zs,(20180808,zs)).就可以和blackRDD进行Join了val clicklog = lines.map(x => (x.split(",")(1), x)).transform(rdd => {//Join之后数据结构就变成了(zs,[(20180808,zs),true]),过滤掉第二个元素中的第二个元素等于true的rdd.leftOuterJoin(blackRDD).filter(x => x._2._2.getOrElse(false) != true)//我们最后要输出的格式是(20180808,zs),所以取Join之后的第二个元素中的第一个元素.map(x => x._2._1)})ssc.start()ssc.awaitTermination()}
}

最后输出:

生产Spark Streaming 黑名单过滤案例相关推荐

  1. Spark Streaming 图片处理案例介绍

    Spark Streaming 图片处理案例介绍 本文首先介绍了流式处理框架的设计原理.Spark Streaming 的工作原理,然后通过一个基于 Spark Streaming 编写的读取.分析. ...

  2. java,spark实现黑名单过滤

    /** * java,spark实现黑名单过滤 */ public class BlackListFilter {public static void main(String[] args){Spar ...

  3. 通过案例对 spark streaming 透彻理解三板斧之三:spark streaming运行机制与架构

    本期内容: 1. Spark Streaming Job架构与运行机制 2. Spark Streaming 容错架构与运行机制 事实上时间是不存在的,是由人的感官系统感觉时间的存在而已,是一种虚幻的 ...

  4. Spark Streaming实时流处理学习

    目录 1.初识实时流处理 2.分布式日志收集框架Flume 3.分布式发布订阅消息系统Kafka 4.实战环境搭建 5.Spark Streaming入门 6.Spark Streaming核心概念与 ...

  5. 【原创 HadoopSpark 动手实践 11】Spark Streaming 应用与动手实践

    [原创 Hadoop&Spark 动手实践 11]Spark Streaming 应用与动手实践 目标: 1. 掌握Spark Streaming的基本原理 2. 完成Spark Stream ...

  6. Spark Streaming 实时计算在甜橙金融监控系统中的应用、性能优化、任务监控

    1 写在前面 目前公司对实时性计算的需要及应用越来越多,本文选取了其中之一的 Spark Streaming 来介绍如何实现高吞吐量并具备容错机制的实时流应用.在甜橙金融监控系统项目中,需要对每天亿万 ...

  7. 用spark streaming实现黑名单实时过滤

    项目介绍: 本项目用spark streaming实现简单的黑名单实时过滤,用scala语言编写,用到的知识点如下: 1.RDD,弹性分布式数据集 2.ssc.socketTextStream(&qu ...

  8. 第103课:动手实战联合使用Spark Streaming、Broadcast、Accumulator实现在线黑名单过滤和计数

    第103课:动手实战联合使用Spark Streaming.Broadcast.Accumulator实现在线黑名单过滤和计数 /* 王家林老师授课http://weibo.com/ilovepain ...

  9. 通过案例对 spark streaming 透彻理解三板斧之一: spark streaming 另类实验

    本期内容 : spark streaming另类在线实验 瞬间理解spark streaming本质 一.  我们最开始将从Spark Streaming入手 为何从Spark Streaming切入 ...

最新文章

  1. springmvc是什么_当一个http请求来临时,SpringMVC究竟偷偷帮你做了什么?
  2. TS Decorator
  3. 鸿蒙第三代手机,荣耀Magic 3最新确认,鸿蒙系统+双6400万,最期待的荣耀来了
  4. python123m与n的数学运算_python小白进阶之路三——循环结构入门练习+Random库练习...
  5. C# 判断上传图片是否被PS修改过的方法
  6. 遇到问题的时候,要学会问问题
  7. [jQuery]20+ Brilliant and Advanced jQuery Effects
  8. Atitit usrQC27模块化的规范模块化法 v4 t77 目录 1. 模块化层级(软件项目 1 1.1. 子项目》命名空间package机制》类》类文件》方法函数级别》语句 1 2. 常见的
  9. GPS数据包格式+数据解析
  10. NovelAi + Webui + Stable-diffusion本地配置
  11. 良好的编程习惯有哪些?
  12. 被告知孩子学校偷钱后
  13. pil.unidentifiedimageerror: cannot identify image file
  14. eclipse启动报错: Could not reserve enough space for object heap error
  15. 【回归分析】[6]--残差分析
  16. 工具 | Windows 功能猎手 (WFH)
  17. 智力题:一次测试找出1000瓶酒中的唯一一瓶毒酒
  18. 游戏AI三大难:样本大、成本高、灵活性差
  19. 读者福利!多达 2048G 各种资源免费赠送
  20. 简述Mysql创建用户和权限设置

热门文章

  1. linux oracle 10g dataguard 实施详细记录
  2. printf和sprintf
  3. 数据库下午怎么插入_数据库性能调优大全(附某大型医院真实案例)
  4. 【数据结构笔记09】二叉树的定义、性质、实现
  5. 观看台式计算机组成观后感,计算机组成原理实验一:运算器实验
  6. linux挂载nfs权限不够,无法写入挂载点(nfs-server),获得“权限被拒绝”
  7. 该行已经属于另一个表 的解决方法
  8. linux sql server调优,SQL SERVER性能优化(转)
  9. python executemany_Python MySQLdb executemany
  10. 详细解读八大无线网络安全技术利弊