和 ReduceFunction 相似,AggregateFunction 也是基于中间状态计算结果的增量计算 函数,但 AggregateFunction 在窗口计算上更加通用。AggregateFunction 接口相对 ReduceFunction 更加灵活,实现复杂度也相对较高。AggregateFunction 接口中定义了三个 需要复写的方法,其中 add()定义数据的添加逻辑,getResult 定义了根据 accumulator 计 算结果的逻辑,merge 方法定义合并 accumulator 的逻辑。

package windowimport org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector/*** @Author yqq* @Date 2021/12/27 20:42* @Version 1.0*/
case class StationLog(sid:String,callOut:String,callInput:String,callType:String,callTime:Long,duration:Long)
object AggregatFunctionTest {def main(args: Array[String]): Unit = {val environment = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.streaming.api.scala._//每隔5秒统计最近8秒内,每个基站的日志数量//读取数据源val stream: DataStream[StationLog] = environment.socketTextStream("node1", 8888).map(line => {val arr: Array[String] = line.split(",")new StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)})//开窗stream.map(log=>{(log.sid,1)}).keyBy(_._1).window(SlidingProcessingTimeWindows.of(Time.seconds(8),Time.seconds(5)))//开窗,滑动窗口.aggregate(new MyAggregateFuntion,new MyWindowFunction).print()environment.execute()}//MyWindowFunction 输入数据来自于 MyAggregateFuntion,在窗口结束的时候先执行MyAggregateFuntion对象的getResult,然后在执行apply方法class MyWindowFunction extends WindowFunction[Long,(String,Long),String,TimeWindow] {override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[(String, Long)]): Unit = {out.collect((key,input.iterator.next()))//next得到的第一个值,迭代器中只有一个值}}class MyAggregateFuntion extends AggregateFunction[(String,Int),Long,Long] {//初始化一个累加器,开始的时候为0override def createAccumulator(): Long = 0//来一条数据执行一次override def add(value: (String, Int), accumulator: Long): Long = accumulator+value._2//在窗口结束的时候执行一次override def getResult(accumulator: Long): Long = accumulatoroverride def merge(a: Long, b: Long): Long = a+b}
}


Flink中Window详解之Window的聚合函数AggregateFunction相关推荐

  1. Python中lambda详解(包括内置函数map、reduce、filter、sorted、max)

    文章目录 一.lambda是什么? 1.lambda语法 2.语法详解 二.lambda的使用 1.定义 2.调用 3.替换 4.作返回值 三.lambda作参数 1.map函数 2.reduce函数 ...

  2. 【转】图形流水线中坐标变换详解:模型矩阵、视角矩阵、投影矩阵

    转自:图形流水线中坐标变换详解:模型矩阵.视角矩阵.投影矩阵_sherlockreal的博客-CSDN博客_视角矩阵 图形流水线中坐标变换详解:模型矩阵.视角矩阵.投影矩阵 图形流水线中坐标变换过程 ...

  3. android中getSystemService详解

    原文地址:android中getSystemService详解作者:邹斌 http://blog.sina.com.cn/s/blog_71d1e4fc0100o8qr.html http://blo ...

  4. 19. linux中权限详解,Linux权限位,读写执行权限真正含义,chmod详解

    linux中权限详解,Linux权限位,读写执行权限真正含义,chmod详解 文章目录 Linux权限位 读写执行 三种权限真正含义和作用 权限对文件的作用 权限对目录的作用 示例 chmod 使用数 ...

  5. 浏览器中location详解

    浏览器中location详解 window.location对象用于获取当前页面的URL信息. 属性解析 1. href 当前页面的URL. 比如访问github.com,在控制台中输入locatio ...

  6. 图形流水线中坐标变换详解:模型矩阵、视角矩阵、投影矩阵

    图形流水线中坐标变换详解:模型矩阵.视角矩阵.投影矩阵 图形流水线中坐标变换过程 模型矩阵:模型局部坐标系和世界坐标系之间的桥梁 1.模型局部坐标系存在的意义 2.根据模型局部坐标系中点求其在世界坐标 ...

  7. 函数中{}输出格式详解(C#)

    Console.WriteLine()函数中{}输出格式详解(C#) Console.WriteLine()函数的格式一直没怎么注意.今天同事问起Console.WriteLine({0:D3},a) ...

  8. Java中CAS详解

    转载自  Java中CAS详解 在JDK 5之前Java语言是靠synchronized关键字保证同步的,这会导致有锁 锁机制存在以下问题: (1)在多线程竞争下,加锁.释放锁会导致比较多的上下文切换 ...

  9. C++学习笔记章节中 面向对象详解

    C++ 类&对象 C++类定义 本质上是一个数据类型的蓝图,定义了类的对象包含的信息,以及可以在这个类对象上执行哪些操作. 类的定义是以class开头,后面接类的名称. 类的主体是包含在一个花 ...

最新文章

  1. 用Python分析5000+抖音大V,粉丝最喜欢的视频类型是它
  2. 生产管理要点:快执行、高品质、看板追踪!
  3. PowerShell导出共存环境下的Exchange数据库列表
  4. 怎么用js实现jq的removeClass方法
  5. html调用servlet(JDBC在Servlet中的使用)(2)
  6. 《HTML5和CSS3快速参考》——1.3HTML5的品牌化
  7. 新浪sae平台进行数据库的连接
  8. java使用httpclient封装post请求和get的请求
  9. Byobu(tmux)的使用与定制
  10. Java 8 lambda初试
  11. 难得干货,揭秘支付宝的2维码扫码技术优化实践之路
  12. 在RFID标准协议中 ISO18000-6B和ISO18000-6C的优点及区别
  13. openCV 简单实现身高测量(二)
  14. 教育变革背景下幼儿园园长领导力研究
  15. 服务器数码管不显示,数码管常见故障及检修方法
  16. 深度学习:GCN(图卷积神经网络)理论学习总结
  17. JS纯前端实现文件保存
  18. yui2 datatable转换至yui3
  19. Linux下c语言的图形编程
  20. Python 办公自动化:全网最强最详细 PDF 文件操作手册!

热门文章

  1. qcqa是什么职位_QA/QE/QC/SQE的区别是什么?
  2. 何谓网站优化中的“内容为王”
  3. Atmel跑Linux的arm芯片,Atmel针对Linux的低成本嵌入式设计推出新
  4. 找不到该类NoClassDefFoundError:net/sf/ezmorph/Morpher
  5. 天线工程手册_技术大神给工控工程师快速成长的六点建议
  6. netbox-docker安装
  7. 流量分析题目(流量检索,数据提取,数据重组,伪加密,图片提取)
  8. km之路--010 jquery 002 开发一个 手风琴/折叠面板 插件
  9. 使用Quixel Bridge导入资源到UE4
  10. 切图设计工具软件或平台