我不确定我必须使用哪个stream Flink transformation来计算某个流的平均值并在5秒的时间内更新一个状态(假设它是一个int array)。

streamSource

.keyBy(0)

.flatMap(new MyRichFlatMapFunction())

.print()

我不能把它写在窗户上。

streamSource

.keyBy(0)

.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))

.aggregate(new MyAggregateFunction())

.print()

我无法通过ValueState保持 array状态。

我试图使用RichAggregateFunction,但遇到了这个线程的相同问题。使用RichAggregateFunction时的Flink错误有没有其他方法来计算平均值并跟踪Flink中的另一个状态?

在 Flink我该如何处理这个问题?

streamStations.filter(new SensorFilter("COUNT_TR"))

.map(new TrainStationMapper())

.keyBy(new MyKeySelector())

.window(TumblingEventTimeWindows.of(Time.seconds(5)));

// THIS AGGREGATE DOES NOT WORK

// .aggregate(new AverageRichAggregator())

// .print();

public static class AverageRichAggregator extends

RichAggregateFunction<Tuple3<Integer, Tuple5<Integer, String, Integer, String, Integer>, Double>, Tuple3<Double, Long, Integer>, Tuple2<String, Double>> {

private static final long serialVersionUID = -40874489412082797L;

private String functionName;

private ValueState<CountMinSketch> countMinSketchState;

@Override

public void open(Configuration parameters) throws Exception {

ValueStateDescriptor<CountMinSketch> descriptor = new ValueStateDescriptor<>("countMinSketchState",

CountMinSketch.class);

this.countMinSketchState = getRuntimeContext().getState(descriptor);

}

@Override

public Tuple3<Double, Long, Integer> createAccumulator() {

this.countMinSketchState.clear();

return new Tuple3<>(0.0, 0L, 0);

}

@Override

public Tuple3<Double, Long, Integer> add(

Tuple3<Integer, Tuple5<Integer, String, Integer, String, Integer>, Double> value,

Tuple3<Double, Long, Integer> accumulator) {

try {

if (value.f1.f1.equals("COUNT_PE")) {

// int count = (int) Math.round(value.f2);

// countMinSketch.updateSketchAsync("COUNT_PE");

} else if (value.f1.f1.equals("COUNT_TI")) {

// int count = (int) Math.round(value.f2);

// countMinSketch.updateSketchAsync("COUNT_TI");

} else if (value.f1.f1.equals("COUNT_TR")) {

// int count = (int) Math.round(value.f2);

// countMinSketch.updateSketchAsync("COUNT_TR");

}

CountMinSketch currentCountMinSketchState = this.countMinSketchState.value();

currentCountMinSketchState.updateSketchAsync(value.f1.f1);

this.countMinSketchState.update(currentCountMinSketchState);

} catch (IOException e) {

e.printStackTrace();

}

return new Tuple3<>(accumulator.f0 + value.f2, accumulator.f1 + 1L, value.f1.f4);

}

@Override

public Tuple2<String, Double> getResult(Tuple3<Double, Long, Integer> accumulator) {

String label = "";

int frequency = 0;

try {

if (functionName.equals("COUNT_PE")) {

label = "PEOPLE average on train station";

// frequency = countMinSketch.getFrequencyFromSketch("COUNT_PE");

} else if (functionName.equals("COUNT_TI")) {

label = "TICKETS average on train station";

// frequency = countMinSketch.getFrequencyFromSketch("COUNT_TI");

} else if (functionName.equals("COUNT_TR")) {

label = "TRAIN average on train station";

// frequency = countMinSketch.getFrequencyFromSketch("COUNT_TR");

}

frequency = this.countMinSketchState.value().getFrequencyFromSketch(functionName);

} catch (IOException e) {

e.printStackTrace();

}

return new Tuple2<>(label + "[" + accumulator.f2 + "] reads[" + frequency + "]",

((double) accumulator.f0) / accumulator.f1);

}

@Override

public Tuple3<Double, Long, Integer> merge(Tuple3<Double, Long, Integer> a, Tuple3<Double, Long, Integer> b) {

return new Tuple3<>(a.f0 + b.f0, a.f1 + b.f1, a.f2);

}

}

错误:

Exception in thread "main" java.lang.UnsupportedOperationException: This aggregation function cannot be a RichFunction.

at org.apache.flink.streaming.api.datastream.WindowedStream.aggregate(WindowedStream.java:692)

at org.sense.flink.examples.stream.MultiSensorMultiStationsReadingMqtt2.(MultiSensorMultiStationsReadingMqtt2.java:71)

at org.sense.flink.App.main(App.java:141)

谢谢

java求平均值过程不输入,java-Flink:有没有其他方法来计算平均值和一个状态变量,而不是使用RichAggregateFunction?...相关推荐

  1. R语言计算平均值的标准误差(standard error of the mean):自定义函数计算平均值的标准误差、使用plotrix包的std.error函数计算平均值的标准误差

    R语言计算平均值的标准误差(standard error of the mean):自定义函数计算平均值的标准误差.使用plotrix包的std.error函数计算平均值的标准误差 目录

  2. Java黑皮书课后题第5章:**5.45(统计:计算平均值和标准方差)在商务应用中……编写一个程序,提示用户输入10个数字,然后运用下面的公式,显示这些数字的平均值以及标准方差

    5.45(统计:计算平均值和标准方差)在商务应用中--编写一个程序,提示用户输入10个数字,然后运用下面的公式,显示这些数字的平均值以及标准方差 题目 题目描述 破题 运行示例 代码 题目 题目描述 ...

  3. java 求导函数_面向对象编程 —— java实现函数求导

    文章目录 首先声明一点,本文主要介绍的是面向对象(OO)的思想,顺便谈下函数式编程,而不是教你如何准确地.科学地用java求出函数在一点的导数. ★引子 defd(f) :defcalc(x) : d ...

  4. java求最大公约数_10道java经典算法题,小白必备,每一题都能提升你的java能力...

    JAVA经典算法 跪求关注,祝关注我的人都:身体健康,财源广进,福如东海,寿比南山,早上贵子,从不掉发! 更多java资料可以私信我领取! [程序1] 题目:古典问题:有一对兔子,从出生后第3个月起每 ...

  5. java求正方形的周长差_Java 作业 重写正方形周长方法

    题目 编写一个应用程序,再创建一个矩形类,类中具有长,宽两个成员变量和求周长的方法. 再创建一个矩形类子类, 正方形类,类中定义求面积方法,重写周长方法,在主类中,输入一个正方形的边长,在创建正方形对 ...

  6. java求最小步数_关于java:查找两点之间的最小步数?

    我有一个网格,网格有两个"材料"- 地板 壁 例如 : 在此网格中,我们具有具有大小和位置的对象(对象的位置是左上角的点). 我们可以在每个对象上执行一些操作,例如- 提升 下移 ...

  7. Java连续获取两个输入,java 获取控制台的输入的两个方法

    链接里面写了两种 http://blog.csdn.net/songylwq/article/details/7714622 第三种: public class GcdLcm{ public stat ...

  8. Java求最大公约数与最小公倍数

    Java求最大公约数与最小公倍数 Java求最大公约数与最小公倍数 import java.util.Scanner;public class Main {public static void mai ...

  9. 年年有余之java求余的技巧集合

    背景 传说里玉皇大帝派龙王马上降雨到共光一带,龙王接到玉皇大帝命令,立马从海上调水,跑去共光施云布雨,但粗心又着急的龙王不小心把海里的鲸鱼随着雨水一起降落在了共光,龙王怕玉皇大帝责怪,灵机一动便声称他 ...

最新文章

  1. 基于python和OpenCV构建智能停车系统
  2. D语言与C语言的互相调用
  3. BC26通过MQTT协议连接ONENET,AT流程
  4. ios 使用gcd 显示倒计时
  5. elasticsearch版本不同,批量查询也不相同
  6. WebForm页面使用Ajax
  7. java实现微信创建菜单_java实现微信公众平台自定义菜单的创建示例
  8. Myeclipse 10 使用之修改字体
  9. 图---Dijstra
  10. Sharepoint学习笔记---如何找到SharePoint List的Template ID
  11. python哈夫曼树_python霍夫曼树
  12. Persistent Memory编程简介
  13. Go语言的IO库那么多,我该怎么选?
  14. 烟台大学计算机学院宋宜斌教授,宋宜斌
  15. javaweb实训第二天上午——jQuery基础
  16. 一个女SEOer的忏悔书
  17. java开发注意事项
  18. win10任务栏透明_谈谈我的Windows系统使用历程和win10桌面美化
  19. NPOI word中插入图片
  20. 传说中开场晕过去一千多人的迈克杰克逊演唱会,有生之年请看下这个视频

热门文章

  1. java B2B2C 源码多租户电子商城系统-Spring Cloud组件详解
  2. vue中使用promise
  3. [swift]-使用JavaScript解决WKWebView无法发送POST参数问题
  4. -webkit-font-smoothing
  5. 《数学建模:基于R》一一2.1 回归分析
  6. 大数据技术在传统企业信息化的应用
  7. 去除Office 2010的右键“共享文件夹同步”菜单
  8. 关系竞争 :今目标可能成为个一个的企业社交网络
  9. 最快的ASP无组件上传类(4M只需10秒)0.96版
  10. linux遭入侵挖矿进程被隐藏案例分析