完整报错如下:

Exception in thread "main" org.apache.flink.table.api.ValidationException: A group window expects a time attribute for grouping in a stream environment.at org.apache.flink.table.operations.utils.AggregateOperationFactory.validateStreamTimeAttribute(AggregateOperationFactory.java:293)at org.apache.flink.table.operations.utils.AggregateOperationFactory.validateTimeAttributeType(AggregateOperationFactory.java:278)at org.apache.flink.table.operations.utils.AggregateOperationFactory.getValidatedTimeAttribute(AggregateOperationFactory.java:271)at org.apache.flink.table.operations.utils.AggregateOperationFactory.createResolvedWindow(AggregateOperationFactory.java:233)at org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:250)at org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:794)at GroupByWindowAggregation.main(GroupByWindowAggregation.java:44)

牢记概念:

Flink的Table必定有Schema

调试手段

代码中加入:

System.out.println(orders.getSchema());

root
 |-- user: BIGINT
 |-- product: STRING
 |-- amount: INT
 |-- rowtime: TIMESTAMP(3) *ROWTIME*

解决方案

Table orders = tEnv.fromDataStream(orderA, $("user"), $("product"), $("amount"),$("rowtime").rowtime());

对应的OrderStream是:


// *************************************************************************
//     USER DATA TYPES
// *************************************************************************/** Simple POJO.*/import java.sql.Timestamp;
import org.apache.flink.streaming.api.windowing.time.Time;public class OrderStream
{public Long user;public String product;public int amount;public Long rowtime;public OrderStream(){}public OrderStream(Long user, String product, int amount,Long rowtime){this.user = user;this.product = product;this.amount = amount;this.rowtime=rowtime;}@Overridepublic String toString() {return "Order{" +"user=" + user +", product='" + product + '\'' +", amount ='" + amount  + '\'' +", rowtime="  + rowtime +'}';}
}

对应的主程序为:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);DataStream<OrderStream> orderA = env.fromCollection(Arrays.asList(new OrderStream(1L, "beer", 3, 1505529000L), //2017-09-16 10:30:00new OrderStream(1L, "beer", 3, 1505529000L), //2017-09-16 10:30:00new OrderStream(3L, "rubber", 2,1505527800L),//2017-09-16 10:10:00new OrderStream(3L, "rubber", 2,1505527800L),//2017-09-16 10:10:00new OrderStream(1L, "diaper", 4,1505528400L),//2017-09-16 10:20:00new OrderStream(1L, "diaper", 4,1505528400L)//2017-09-16 10:20:00));Table orders = tEnv.fromDataStream(orderA, $("user"), $("product"), $("amount"),$("rowtime").rowtime());System.out.println(orders.getSchema());

来自网友的补充:

Hunter x Hunter:
$("rowtime2").proctime()是表示自动生成一个字段 rowtime2
Hunter x Hunter:
$("rowtime2").rowtime()是表示用已经存在的字段 rowtime2用作eventime

注意,时间属性不是数据库表格里面的一个字段。[1]

[1]Time Attributes

window expects a time attribute for grouping in a stream environment.相关推荐

  1. 053试题 158/449/637 - Scheduler Window

    题目: 158.You create two resource plans, one for data warehouse loading jobs at night and the other fo ...

  2. 彻底搞清 Flink 中的 Window 机制

    [CSDN 编者按]Window是处理无限流的核心.Flink 认为 Batch 是 Streaming 的一个特例,所以 Flink 底层引擎是一个流式引擎,在上面实现了流处理和批处理.Flink提 ...

  3. Flink Window Function

    窗口函数定义了要对窗口中收集的数据做的计算操作,根据处理的方式可以分为两类:增量聚合函数和全窗口函数. 文章目录 1.增量聚合函数 1.1 ReduceFunction 1.2 AggregateFu ...

  4. 彻底搞清Flink中的Window(Flink版本1.8)

    flink-window 窗口 在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理.当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分 ...

  5. 最新 Flink 1.13 时间和窗口(时间语义、Watermark、Window 窗口、Trigger)快速入门、详细教程

    时间和窗口 文章目录 时间和窗口 一.Flink 的三种时间语义 二.水位线(Watermark) 1. Flink 中的 Watermark 机制 2. 如何生成水位线 3. 水位线的传递 三.窗口 ...

  6. Flink Window机制详解

    Flink 认为 Batch 是 Streaming 的一个特例,所以 Flink 底层引擎是一个流式引擎,在上面实现了流处理和批处理.而窗口(window)就是从 Streaming 到 Batch ...

  7. 【基础】Flink -- Time and Window

    Flink -- Time and Window Flink 时间语义 水位线 Watermark 水位线的概念 有序流中的水位线 乱序流中的水位线 水位线的特性 水位线的基本使用 水位线生成策略 内 ...

  8. 数据挖掘流程_数据流挖掘

    数据挖掘流程 1-简介 (1- Introduction) The fact that the pace of technological change is at its peak, Silicon ...

  9. Bluetooth ATT介绍

    阅读目录 1 介绍 2 详细内容 3 Attribute PDU 4 Attribute Protocol PDU 回到顶部 1 介绍 ATT,Attribute Protocol,用于发现.读.写对 ...

最新文章

  1. C和C++的关键字区别
  2. Samba服务器问题汇总
  3. [C++STL]常用查找算法
  4. Django/Flask/Tornado三大web框架性能分析
  5. Java8————方法引用
  6. 先发不一定制人:美韩5G网络体验差遭吐槽
  7. javascript常见的数组方法
  8. 安装Exchange2007邮件系统
  9. 安卓pdf阅读器_PDF阅读用哪款软件好?推荐这7款,简单又好用!
  10. 三维激光LiDAR点云数据处理,我帮您!
  11. csdn七牛云存储作为网站的图片外链
  12. 10步搞定App内测发布(蒲公英内测平台)
  13. (六)springMvc 和 mybatis 整合
  14. 大数据学习总结(2021版)---Mysql基础
  15. java实现文本纠错功能_调用百度API进行文本纠错
  16. mysql创建新闻发布时间_基于PHP+mysql实现新闻发布系统的开发
  17. 切比雪夫,霍夫丁不等式证明
  18. Variant类型在各语言中的参数传递
  19. 供应链金融业务发展态势及提升路径
  20. 凡客“小米化”改造:雷军与陈年最基友的商业故事

热门文章

  1. Java程序员如何成为优秀的架构师
  2. python数学建模(三)插值常用库和模块
  3. 全国大学生智能汽车竞赛图像采集处理上位机开源!
  4. 【Visual Studio 2019 - Unknown override specifier error】Problems when compiling dbghelp.h
  5. java四大名著知乎_《西游记》是否被高估了?四大名著该如何排名?
  6. 什么是内卷?什么是囚徒困境?故事叙述
  7. #Vue3篇:watch、watchEffect、watchPostEffect、watchSyncEffect的区别
  8. ESP8266-WIFI模块使用AT指令连接外网服务器
  9. 窥一斑而知全豹,几分钟带你读懂Java字节码,再也不怕了
  10. This iPhone 8 Plus (Model A1864, A1897, A1898, A1899) is running iOS 12.2 (16E227)