window expects a time attribute for grouping in a stream environment.
完整报错如下:
Exception in thread "main" org.apache.flink.table.api.ValidationException: A group window expects a time attribute for grouping in a stream environment.at org.apache.flink.table.operations.utils.AggregateOperationFactory.validateStreamTimeAttribute(AggregateOperationFactory.java:293)at org.apache.flink.table.operations.utils.AggregateOperationFactory.validateTimeAttributeType(AggregateOperationFactory.java:278)at org.apache.flink.table.operations.utils.AggregateOperationFactory.getValidatedTimeAttribute(AggregateOperationFactory.java:271)at org.apache.flink.table.operations.utils.AggregateOperationFactory.createResolvedWindow(AggregateOperationFactory.java:233)at org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:250)at org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:794)at GroupByWindowAggregation.main(GroupByWindowAggregation.java:44)
牢记概念:
Flink的Table必定有Schema
调试手段
代码中加入:
System.out.println(orders.getSchema());
root
|-- user: BIGINT
|-- product: STRING
|-- amount: INT
|-- rowtime: TIMESTAMP(3) *ROWTIME*
解决方案
Table orders = tEnv.fromDataStream(orderA, $("user"), $("product"), $("amount"),$("rowtime").rowtime());
对应的OrderStream是:
// *************************************************************************
// USER DATA TYPES
// *************************************************************************/** Simple POJO.*/import java.sql.Timestamp;
import org.apache.flink.streaming.api.windowing.time.Time;public class OrderStream
{public Long user;public String product;public int amount;public Long rowtime;public OrderStream(){}public OrderStream(Long user, String product, int amount,Long rowtime){this.user = user;this.product = product;this.amount = amount;this.rowtime=rowtime;}@Overridepublic String toString() {return "Order{" +"user=" + user +", product='" + product + '\'' +", amount ='" + amount + '\'' +", rowtime=" + rowtime +'}';}
}
对应的主程序为:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);DataStream<OrderStream> orderA = env.fromCollection(Arrays.asList(new OrderStream(1L, "beer", 3, 1505529000L), //2017-09-16 10:30:00new OrderStream(1L, "beer", 3, 1505529000L), //2017-09-16 10:30:00new OrderStream(3L, "rubber", 2,1505527800L),//2017-09-16 10:10:00new OrderStream(3L, "rubber", 2,1505527800L),//2017-09-16 10:10:00new OrderStream(1L, "diaper", 4,1505528400L),//2017-09-16 10:20:00new OrderStream(1L, "diaper", 4,1505528400L)//2017-09-16 10:20:00));Table orders = tEnv.fromDataStream(orderA, $("user"), $("product"), $("amount"),$("rowtime").rowtime());System.out.println(orders.getSchema());
来自网友的补充:
Hunter x Hunter:
$("rowtime2").proctime()是表示自动生成一个字段 rowtime2
Hunter x Hunter:
$("rowtime2").rowtime()是表示用已经存在的字段 rowtime2用作eventime
注意,时间属性不是数据库表格里面的一个字段。[1]
[1]Time Attributes
window expects a time attribute for grouping in a stream environment.相关推荐
- 053试题 158/449/637 - Scheduler Window
题目: 158.You create two resource plans, one for data warehouse loading jobs at night and the other fo ...
- 彻底搞清 Flink 中的 Window 机制
[CSDN 编者按]Window是处理无限流的核心.Flink 认为 Batch 是 Streaming 的一个特例,所以 Flink 底层引擎是一个流式引擎,在上面实现了流处理和批处理.Flink提 ...
- Flink Window Function
窗口函数定义了要对窗口中收集的数据做的计算操作,根据处理的方式可以分为两类:增量聚合函数和全窗口函数. 文章目录 1.增量聚合函数 1.1 ReduceFunction 1.2 AggregateFu ...
- 彻底搞清Flink中的Window(Flink版本1.8)
flink-window 窗口 在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理.当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分 ...
- 最新 Flink 1.13 时间和窗口(时间语义、Watermark、Window 窗口、Trigger)快速入门、详细教程
时间和窗口 文章目录 时间和窗口 一.Flink 的三种时间语义 二.水位线(Watermark) 1. Flink 中的 Watermark 机制 2. 如何生成水位线 3. 水位线的传递 三.窗口 ...
- Flink Window机制详解
Flink 认为 Batch 是 Streaming 的一个特例,所以 Flink 底层引擎是一个流式引擎,在上面实现了流处理和批处理.而窗口(window)就是从 Streaming 到 Batch ...
- 【基础】Flink -- Time and Window
Flink -- Time and Window Flink 时间语义 水位线 Watermark 水位线的概念 有序流中的水位线 乱序流中的水位线 水位线的特性 水位线的基本使用 水位线生成策略 内 ...
- 数据挖掘流程_数据流挖掘
数据挖掘流程 1-简介 (1- Introduction) The fact that the pace of technological change is at its peak, Silicon ...
- Bluetooth ATT介绍
阅读目录 1 介绍 2 详细内容 3 Attribute PDU 4 Attribute Protocol PDU 回到顶部 1 介绍 ATT,Attribute Protocol,用于发现.读.写对 ...
最新文章
- C和C++的关键字区别
- Samba服务器问题汇总
- [C++STL]常用查找算法
- Django/Flask/Tornado三大web框架性能分析
- Java8————方法引用
- 先发不一定制人:美韩5G网络体验差遭吐槽
- javascript常见的数组方法
- 安装Exchange2007邮件系统
- 安卓pdf阅读器_PDF阅读用哪款软件好?推荐这7款,简单又好用!
- 三维激光LiDAR点云数据处理,我帮您!
- csdn七牛云存储作为网站的图片外链
- 10步搞定App内测发布(蒲公英内测平台)
- (六)springMvc 和 mybatis 整合
- 大数据学习总结(2021版)---Mysql基础
- java实现文本纠错功能_调用百度API进行文本纠错
- mysql创建新闻发布时间_基于PHP+mysql实现新闻发布系统的开发
- 切比雪夫,霍夫丁不等式证明
- Variant类型在各语言中的参数传递
- 供应链金融业务发展态势及提升路径
- 凡客“小米化”改造:雷军与陈年最基友的商业故事
热门文章
- Java程序员如何成为优秀的架构师
- python数学建模(三)插值常用库和模块
- 全国大学生智能汽车竞赛图像采集处理上位机开源!
- 【Visual Studio 2019 - Unknown override specifier error】Problems when compiling dbghelp.h
- java四大名著知乎_《西游记》是否被高估了?四大名著该如何排名?
- 什么是内卷?什么是囚徒困境?故事叙述
- #Vue3篇:watch、watchEffect、watchPostEffect、watchSyncEffect的区别
- ESP8266-WIFI模块使用AT指令连接外网服务器
- 窥一斑而知全豹,几分钟带你读懂Java字节码,再也不怕了
- This iPhone 8 Plus (Model A1864, A1897, A1898, A1899) is running iOS 12.2 (16E227)