目录

1. transformation函数之map(),filter()函数的简单应用:

2. union

3.Split与Select


1. transformation函数之map(),filter()函数的简单应用:

object DataStreamTransformationAPP {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentfilterFunction(env)env.execute("DataStreamTransformationAPP")}def filterFunction(env: StreamExecutionEnvironment): Unit ={val data: DataStream[Long] = env.addSource(new CustomerNonParallesourceFunction)data.map(x=>{println("received:"+x)  //此处为接收到的数据,即未处理的数据x}).filter(_%2==0)//此处过滤.print().setParallelism(1)}
}

2. union

dataStream.union(otherStream1, otherStream2, ...);

Union of two or more data streams creating a new stream containing all the elements from all the streams.

Note: If you union a data stream with itself you will get each element twice in the resulting stream.:如果你union数据流和它本身,你会的到这个数据流两次

def unionFunction(env: StreamExecutionEnvironment): Unit ={val data1: DataStream[Long] = env.addSource(new CustomerNonParallesourceFunction)val data2: DataStream[Long] = env.addSource(new CustomerNonParallesourceFunction)data1.union(data2).print().setParallelism(1)}

3.Split与Select

1)DataStream → SplitStream:Split the stream into two or more streams according to some criterion.

val split = someDataStream.split(
  (num: Int) =>
    (num % 2) match {
      case 0 => List("even")//奇数
      case 1 => List("odd")//偶数
    }
)

2)SplitStream → DataStream:Select one or more streams from a split stream.

val even = split select "even"
val odd = split select "odd"
val all = split.select("even","odd")

  def splitSelectFunction(env: StreamExecutionEnvironment): Unit ={val data: DataStream[Long] = env.addSource(new CustomerNonParallesourceFunction)val splits: SplitStream[Long] = data.split(new OutputSelector[Long] {override def select(value: Long): lang.Iterable[String] = {val list = new util.ArrayList[String]()if (value % 2 != 0) {list.add("even")} else {list.add("odd")}list}})splits.select("odd").print().setParallelism(1)}

5-10transformation中部分函数使用相关推荐

  1. python函数式编程什么意思_函数式编程中“部分函数”到底是什么意思?

    答案说明了一切,我将在每种语言中添加一个示例:def add(x,y): return x+y f = add(1) print(f(3)) f = add(1) TypeError: add() m ...

  2. CImg库中部分函数的作用和用法

    部分内容来自于CImg参考手册或CImg的Doxygen手册 1.宏cimg_usage(usage):可以被用来描述程序的目的和使用情况.它通常被插入到 int main(int argc, cha ...

  3. DllMain中不当操作导致死锁问题的分析——DllMain中要谨慎写代码(完结篇)

    之前几篇文章主要介绍和分析了为什么会在DllMain做出一些不当操作导致死锁的原因.本文将总结以前文章的结论,并介绍些DllMain中还有哪些操作会导致死锁等问题.(转载请指明出于breaksoftw ...

  4. python中functools_Python functools模块完全攻略

    functools 模块中主要包含了一些函数装饰器和便捷的功能函数.在 Python 的交互式解释器中先导入 functools 模块,然后输入 [e for e in dir(functools) ...

  5. Unity3D RTS游戏中帧同步实现

    帧同步技术是早期RTS游戏常用的一种同步技术,本篇文章要给大家介绍的是RTX游戏中帧同步实现,帧同步是一种前后端数据同步的方式,一般应用于对实时性要求很高的网络游戏,想要了解更多帧同步的知识,继续往下 ...

  6. GNU Make 使用手册(于凤昌中译版)

    GNU Make 使用手册(中译版) 翻译:于凤昌 GNU make Version 3.79 April 2000 Richard M. Stallman and Roland McGrath 1 ...

  7. 发现一个windows7(32bit或64bit)DirectUI的bug

    前段时间发现一个windows7的一个bug,不是什么严重的问题,我在此记录下.(转载请指明出于breaksoftware的csdn博客) 重现步骤如下: 0 在文件夹的"更改您的视图&qu ...

  8. c++引用matlab类,matlab调用C++函数浅谈(一)

    由于在下才疏学浅,在网上看各高手指南时亦觉云里雾里,遂决定一切说明从最基础说起,一是方便自己(记性奇差),二是方便似我的小白.以下部分是我从各网站论坛等摘抄.重组.改写过的,以求更加详实明朗,由于参考 ...

  9. 【Python】学习笔记总结(第一阶段(1-6)——汇总篇)

    文章目录 一.Python基础 1.注释 2.输入输出 3.数据类型 4.运算符 4.1.算数运算符 4.2.比较运算符 4.3.逻辑运算符 4.3.1.短路运算 4.4.赋值运算符 5. 判断与循环 ...

最新文章

  1. Java实现markdown转html,并且生成TOC目录
  2. 资源丨机器学习进阶路上不可错过的28个视频
  3. 4月25日日志(4.25)
  4. 如何用css将超出部分变成...
  5. 【换句话说】【等价描述】—— 定义及概念的不同描述
  6. 华为服务器软件维护,服务器系统软件维护
  7. Puppet常用资源使用详解
  8. 数据库系统概论第五版课后习题答案王珊
  9. 根据身份证号计算周岁年龄
  10. SQL Server 触发器
  11. 互联网常见域名后缀含义
  12. opencv normalize()函数详解
  13. bcc语料库下载_语料库汇总
  14. 【SEED Labs 2.0】TCP Attacks Lab
  15. scikit-opt的使用
  16. vue3实现鼠标左键拖拽画矩形框框选功能
  17. windows如何远程桌面mac
  18. 清除H5的浏览器缓存
  19. 原生js实现轮盘抽奖,控制中奖概率(完整示例)
  20. java 拼音首字母_java获取中文拼音首字母的实例

热门文章

  1. 算法我也不知道有没有下一个---一个题目的开端(索引堆与图)
  2. 荷兰籍空乘服务中国春运:对春节有着别样的感受
  3. Apache Spark 2.2.0 中文文档 - 概述 | ApacheCN
  4. Burpsuite如何抓取使用了SSL或TLS传输的 IOS App流量
  5. 【温故而知新-Javascript】窗口效果 (全屏显示窗口、定时关闭窗口)
  6. mysql增备脚本--xtrabackup实现
  7. 关于ireport制作报表模版时的一些注意
  8. 向右挪一个键位使密码好记又安全
  9. 升级php7_Drupal 8.8.0 正式发布,升级须知
  10. php计时器每过24小时结果加一倍,单片机时钟24小时慢了21.5秒 如何修改初值校正呢?...