flink map返回Tuple3时,如果不指定returns则会报错

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
Properties kafkaProp = new Properties();
FlinkKafkaConsumer010<String> myConsumer = new FlinkKafkaConsumer010<String>("test", new SimpleStringSchema(), kafkaProp);DataStream<Tuple3<Integer, String, Integer>> dataStream = env.addSource(myConsumer).map(record -> {JSONObject jsonObject = JSON.parseObject(record);return new Tuple3<>(jsonObject.getInteger("id"), jsonObject.getString("name"), jsonObject.getInteger("age"));});
env.execute();

运行上述代码,错误信息如下:

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(TestFlinkTable.java:43)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.at org.apache.flink.streaming.api.transformations.StreamTransformation.getOutputType(StreamTransformation.java:420)at org.apache.flink.streaming.api.datastream.DataStream.getType(DataStream.java:175)at org.apache.flink.streaming.api.datastream.DataStream.union(DataStream.java:217)at com.miaoke.sync.test.TestFlinkTable.main(TestFlinkTable.java:50)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Tuple3' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.MapFunction' interface. Otherwise the type has to be specified explicitly using type information.at org.apache.flink.api.java.typeutils.TypeExtractionUtils.validateLambdaType(TypeExtractionUtils.java:350)at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:579)at org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:175)at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:585)at com.miaoke.sync.test.TestFlinkTable.main(TestFlinkTable.java:43)

根据错误提示,加上returns(),则正常通过

DataStream<Tuple3<Integer, String, Integer>> dataStream = env.addSource(myConsumer).map(record -> {JSONObject jsonObject = JSON.parseObject(record);return new Tuple3<>(jsonObject.getInteger("id"), jsonObject.getString("name"), jsonObject.getInteger("age"));}).returns(Types.TUPLE(Types.INT, Types.STRING, Types.INT));

在一般情况下,Java会擦除泛型类型信息。 Flink尝试使用Java保留的少量位(主要是函数签名和子类信息)通过反射重建尽可能多的类型信息。对于函数的返回类型取决于其输入类型的情况,此逻辑还包含一些简单类型推断:

public class AppendOne<T> implements MapFunction<T, Tuple2<T, Long>> {public Tuple2<T, Long> map(T value) {return new Tuple2<T, Long>(value, 1L);}
}

在Flink无法重建已擦除的泛型类型信息的情况下,Java API提供所谓的类型提示。类型提示告诉系统函数生成的数据流或数据集的类型:

DataSet<SomeType> result = dataSet.map(new MyGenericNonInferrableFunction<Long, SomeType>()).returns(SomeType.class);

flink DataStream returns 设置返回类型相关推荐

  1. Retrofit2设置返回类型是字符串(String)类型

    之前返回的是对象,但是我想看看返回的字符串. 之前的代码: package com.example.z.study;import android.os.Bundle; import android.s ...

  2. C++ 返回类型协变

    在C++中,只要原来的返回类型是指向类的指针或引用,新的返回类型是指向派生类的指针或引用,覆盖的方法就可以改变返回类型.这样的类型称为协变返回类型(Covariant returns type). 返 ...

  3. java返回类型自动_java-Apache Flink:由于类型擦除,无法自动确定函数的返回类型...

    我在Java中用Flink编写了一个简单的程序,它以文件或文本作为输入,然后使用flatMap函数打印所有单词. 这是我的代码: final ParameterTool params = Parame ...

  4. MySQL函数(CREATE FUNCTION 函数名(参数列表) RETURNS 返回类型)

    MySQL函数 /* 含义:一组预先编译好的SQL语句的集合,理解成批处理语句 1.提高代码的重用性 2.简化操作 3.减少了编译次数并且减少了和数据库服务器的连接次数,提高了效率 区别: 存储过程: ...

  5. IDEA 方法返回值和返回类型自动补全快捷键设置

    IDEA 方法返回值和返回类型自动补全快捷键设置 今天遇到一个很奇怪的问题,我新装的IDEA,默认的返回类型补全快捷键应该是 Ctrl + Alt +V 但是怎么也不起作用了,所以想去设置一下,然后找 ...

  6. Flink DataStream API(基础版)

    概述   DataStream(数据流)本身是 Flink 中一个用来表示数据集合的类(Class),我们编写的 Flink 代码其实就是基于这种数据类型的处理,所以这套核心API 就以DataStr ...

  7. 【基础】Flink -- DataStream API

    Flink -- DataStream API 执行环境 Execution Environment 创建执行环境 设置执行模式 触发程序执行 源算子 Source 从集合中读取数据 从文件读取数据 ...

  8. asp.net core系列 38 WebAPI 返回类型与响应格式--必备

    一.返回类型 ASP.NET Core 提供以下 Web API Action方法返回类型选项,以及说明每种返回类型的最佳适用情况: (1) 固定类型 (2) IActionResult (3) Ac ...

  9. Flink DataStream 编程入门

    流处理是 Flink 的核心,流处理的数据集用 DataStream 表示.数据流从可以从各种各样的数据源中创建(消息队列.Socket 和 文件等),经过 DataStream 的各种 transf ...

最新文章

  1. 使用Python,OpenCV实现图像之间超快速的颜色转移
  2. gettimeofday
  3. Linux /dev目录详解和Linux系统各个目录的作用
  4. JDK10的新特性:var泛型和多个接口实现
  5. power bi_如何将Power BI模型的尺寸减少90%!
  6. unity3d Json解析工具类
  7. mysql死锁分析_MySQL死锁分析
  8. Android的JNI【实战教程】2⃣️--AS下NDK环境配置及第一个工程
  9. gcms基峰有什么用_金鉴实验室 | 气相质谱(GCMS)
  10. c lua mysql 扩展_lua安装及luarocks安装mysql扩展
  11. SIEBEL代码分析
  12. 如何提升原创文章排名与百度冰桶算法
  13. VisionPro如何引用VPP?
  14. 尚硅谷nodejs入门教程_笔记
  15. 一文看懂摄像头测距技术
  16. STM32之DAC音频播放
  17. 如何做好SEM竞价营销
  18. 淘宝API接口:获取sku详细信息
  19. dubbo系列三、 服务发现RegistryDirectory
  20. 用 Vue 改造 Bootstrap,渐进提升项目框架[转]

热门文章

  1. TCP/IP 1.概述
  2. 学军OJ题解——1179 约会
  3. 去掉Holo主题下Dialog的蓝色线
  4. 《算法和数据结构》排序篇
  5. 从零开始学架构 01-架构基础【笔记】
  6. 四柱排盘系统--命理学和程序开发的结合
  7. 阿里云服务器实时计算Flink/Blink首选大数据型d2c、d2s实例
  8. matlab归一化方法,数据归一化的基本方法
  9. 个人计算机中的防毒软件无法防御,win10系统中无法启动defender防御软件的处理办法...
  10. 计算机硬件知识比赛策划,计算机硬件知识讲座活动策划案.doc