上篇:connect的使用

project [DataStream → DataStream]

project 主要用于获取 tuples 中的指定字段集

    public static void project(StreamExecutionEnvironment env) {DataStreamSource<Tuple3<String, Integer, String>> streamSource = env.fromElements(new Tuple3<>("li", 22, "2018-09-23"),new Tuple3<>("ming", 33, "2020-09-23"));streamSource.project(0, 2).print();}

运行结果:

 (ming,2020-09-23)(li,2018-09-23)

来一个实例,直接上代码:

package cn._51doit.flink.day03;import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/*** project的使用* 有界流** 直接运行,控制台打印输出:* 2> (99.99,laoduan)* 1> (9999.99,laozhao)*/
public class ProjectDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());DataStreamSource<String> lines = env.fromElements("laozhao,18,9999.99","laoduan,28,99.99");SingleOutputStreamOperator<Tuple3<String, Integer, Double>> tpDataStream = lines.map(e -> {String[] fields = e.split(",");return Tuple3.of(fields[0], Integer.parseInt(fields[1]), Double.parseDouble(fields[2]));}).returns(Types.TUPLE(Types.STRING,Types.INT,Types.DOUBLE));SingleOutputStreamOperator<Tuple> projected = tpDataStream.project(2, 0);projected.print();env.execute();}
}

代码改造后:

package cn._51doit.flink.day03;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** project的使用:有界流** 直接运行,控制台打印输出[代码改造之前]* 4> (99.99,laoduan)* 3> (9999.99,laozhao)** 直接运行,控制台打印输出[代码改造之后]* 3> (9999.99,laozhao)* 4> (99.99,laoduan)*/
public class ProjectDemo_01 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());DataStreamSource<String> lines = env.fromElements("laozhao,18,9999.99","laoduan,28,99.99");SingleOutputStreamOperator<Tuple3<String, Integer, Double>> tpDataStream = lines.map(e -> {String[] fields = e.split(",");return Tuple3.of(fields[0], Integer.parseInt(fields[1]), Double.parseDouble(fields[2]));}).returns(Types.TUPLE(Types.STRING,Types.INT,Types.DOUBLE));SingleOutputStreamOperator<Tuple2<Double, String>> projected = tpDataStream.map(new MapFunction<Tuple3<String, Integer, Double>, Tuple2<Double, String>>() {@Overridepublic Tuple2<Double, String> map(Tuple3<String, Integer, Double> value) throws Exception {return Tuple2.of(value.f2, value.f0);}});projected.print();env.execute();}
}

project的使用相关推荐

  1. ideal如何创建dynamic web project

    步骤如下 ① file -> new -> project ② 选择 Java Enterprise -> next ③ create project from template - ...

  2. This version of Android Studio cannot open this project, please retry with Android Studio 3.5 or new

    今天github 下载一个库 导入 as 提示 This version of Android Studio cannot open this project, please retry with A ...

  3. Error:The SDK Build Tools revision (23.0.3) is too low for project ':app'. Minimum required is 25.0.

    导入github上项目的时候出现 Error:The SDK Build Tools revision (23.0.3) is too low for project ':app'. Minimum ...

  4. Error:(49, 1) A problem occurred evaluating project ':guideview'. Could not read script 'https://r

    出现问题如下: Error:(49, 1) A problem occurred evaluating project ':guideview'. > Could not read script ...

  5. IntelliJ IDEA 的Project structure说明

    IntelliJ IDEA 的Project structure可以在File->Project structure中打开,同时,在新建项目是IDE一般用向导的方式让你填写Project str ...

  6. 将Project的内容导出成单独的XPO文件

    AX跟VSS整合的版本管理可以通过创建知识库将当前层的代码全部签入到VSS中,但是如果不是一个团队开发solution,而是针对客户的需求随时做得一些小改动,一般都希望以Project的形式组织代码和 ...

  7. linux vim project,vim插件project的用法

    用任何编辑器写代码,文件管理的方便与否对编码效率影响很大.一般的IDE都有文件管理功能,并且用来的不错.在vim中,要实现较好的文件管理功能一般都靠插件.在有米实习的第一个月,自己一直用NERDTre ...

  8. android studio 同类,让Android Studio的Project视图和Anroid视图类似

    关于AS的Project与Android视图的不同,可以看我的博文 http://blog.csdn.net/siyehuazhilian/article/details/42123563 Andro ...

  9. invalid project description._[Project教程] 在Project软件中如何处理加班工时

    在微软Project软件中有处理加班工时的功能,但是我在系统课程中没有讲,为什么呢?就像Project软件中的[进度线]功能,这个功能还不够完善,所以暂时不建议大家使用.加班工时这个问题也是如此,总体 ...

  10. There's no Qt version assigned to this project for platform Win32/Win64

    Qt5.8+ vs2010和vs2015都出现这个问题,无论是32bit还是64bit平台 解决方法: 1:注意:打开main.cpp文件,否则 "Qt Project Settings&q ...

最新文章

  1. javascript自定义cookie
  2. C/C++ 指针函数 与 函数指针
  3. php new redis错误,解决PHP Redis扩展无法加载的问题(zend_new_interned_string in Unknown on line 0)...
  4. Poj 3522 最长边与最短边差值最小的生成树
  5. 【Android 热修复】热修复原理 ( 合并两个 Element[] dexElements | 自定义 Application 加载 Dex 设置 | 源码资源 )
  6. python正则表达式中的转义字符_详解python中正则表达式的反斜线的转义功能
  7. python super()(转载)
  8. Transformer太深不行?NUS字节发现注意力坍缩,提出重注意机制!
  9. 代码质量第 5 层 - 只是实现了功能
  10. 神经网络的激活函数、并通过python实现激活函数
  11. 吴恩达深度学习神经网络基础编程作业Planar data classification with one hidden layer
  12. 自制Dede网站地图的秘诀
  13. 蛋白质中二硫键特征的质谱分析技术及其应用
  14. Windows系统蓝屏机制
  15. 带农历日历的DatePicker控件!Xamarin控件开发小记
  16. 咏南中间件2019新年促销
  17. LINUX Ubuntu 平台上安装PGI的pgf90编译器
  18. 应用进程和内核的关系
  19. 玩游戏的时候怎么没觉得补刀这么难…
  20. 错误:“java:找不到符号“

热门文章

  1. 根据二叉树遍历结果反推树
  2. 市场整改篇之应用宝报告
  3. RUST——互斥锁的使用
  4. mtk+android+之mt6577驱动笔记,MTK6577+Android之音频(audio)移植
  5. IntelliJ IDEA 15款 神级超级牛逼插件推荐(自用,真的超级牛逼)
  6. 光盘加密软件测试自学,SecurDisc光盘加密功能实战
  7. Android Nes模拟器,nes/fc任天堂/小霸王游戏SDK集成
  8. 卡尔曼滤波 | Matlab实现非线性卡尔曼滤波(Nonlinear KF)
  9. xp显示rpc服务器不可用,绿茶XP系统提示“RPC服务器不可用”如何解决
  10. php 模拟 cas,CAS的PHP客户端实践:PHP程序实现单点登录