connect的使用
上篇:union的使用
connect的使用
- 可以将两个流不一致的类型连接一起
- 可以共享状态
直接上代码
package cn._51doit.flink.day03;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
/*** connect的使用* 程序操作:* nc -l 8888的窗口输入字符串,将会转成大写输出;* nc -l 8888的窗口输入数字,将会运算输出*/
public class ConnectDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());DataStreamSource<String> lines1 = env.socketTextStream("Master", 8888);SingleOutputStreamOperator<Integer> lines2 = env.socketTextStream("Master", 9999).map(s -> Integer.parseInt(s));//将两个流connect【连接】ConnectedStreams<String, Integer> connected = lines1.connect(lines2);SingleOutputStreamOperator<String> result = connected.map(new CoMapFunction<String, Integer, String>() {//对第一个流进行map运算的方法@Overridepublic String map1(String value) throws Exception {return value.toUpperCase();}//对第二个流进行map运算的方法@Overridepublic String map2(Integer value) throws Exception {return (value * 10) + "";}//这两个流的map方法执行完的返回值会放到新的流中});result.print();env.execute();}}
connect的使用相关推荐
- Docker使用遇到问题Got permission denied while trying to connect to the Docker daemon socket解决方案
Got permission denied while trying to connect to the Docker daemon socket at unix:///var/run/docker. ...
- Can 't connect to local MySQL server through socket '/tmp/mysql.sock '(2)
安装了mysql, 使用命令mysql -u root -p 弹出Can 't connect to local MySQL server through socket '/tmp/mysql.soc ...
- oracle 10035 err 942,案例:Oracle日志报错 Fatal NI connect error 12170 TNS-12535 TNS-00505
天萃荷净 Oracle数据库alert日志文件报错Fatal NI connect error 12170,通过查看mos相关文章找到解决办法 今天在一台服务器的日志文件中,发现如下信息: Fatal ...
- socket connect阻塞和非阻塞处理
建立socket后默认connect()函数为阻塞连接状态,在大多数实现中,connect的超时时间在75s至几分钟之间,想要缩短超时时间,可解决问题的两种方法:方法一.将socket句柄设置为非阻塞 ...
- 【Qt】Qt再学习(十六):QObject::connect: Cannot queue arguments of type ‘QString‘
1.问题描述 跨线程使用信号和槽时,如果是非const的引用传参,就会报如下的错误: QObject::connect: Cannot queue arguments of type 'QString ...
- 【网络编程】非阻塞connect详解
一.为什么使用非阻塞connect TCP连接的建立涉及一个在三路握手过程,阻塞的connect一直等到客户收到自己的SYN的ACK才返回,这需要至少一个RTT时间,RTT时间波动很大从几毫秒到几秒. ...
- pyqt designer connect无响应_如何用PyQt编写桌面程序,创建并打开播放列表?
由于电脑上的短视频太多了,并且分别存放在各个子目录下,每次更新后想要整理视频比较麻烦,因此想用Python编写程序来辅助管理视频文件.其实写个Python脚本程序即可实现大器的需求,但为了多练习PyQ ...
- Fastlane 入门实战教程从打包到上传iTunes connect
有关神器 Fastlane 持续集成\部署的文章网上挺多,本文定位是入门教程,针对 iOS 应用的持续部署,只需一条命令就可实现从 Xcode 项目到 编译\打包\构建\提交审核 文章稍微有点长,涵盖 ...
- 1.3 Quick Start中 Step 7: Use Kafka Connect to import/export data官网剖析(博主推荐)
不多说,直接上干货! 一切来源于官网 http://kafka.apache.org/documentation/ Step 7: Use Kafka Connect to import/export ...
- netbackup错误之can not connect on socket(25)
rhel5.5上安装netbackup 7.0,这个版本只能安装在64位系统上.安装完netbackup 7.0后,发现登录界面一直报java认证失败,查看了下日志文件,报如下内容: 查了下系统设置, ...
最新文章
- python threading多线程计算
- 【Codeforces 986B】Petr and Permutations
- Sublime Text 2 使用心得
- 安装 | OpenCV4.2.0 + VS2017安装教程
- Linux进程 excel族函数的用法
- 网页聊天室win10界面源码
- android消息机制 Message, Looper,Handler
- php session gc_maxlifetime,PHPsession 有效期 session.gc_maxlifetime
- 【2019-08-18】时间是有密度的
- python websockets(wss)
- masscan端口扫描
- WinRAR 密码 模板的秘密:
- 阿里技术专家十五问,真题面试刀刀见肉,走进面试间(答案解析)
- gulp+webpack工具整合简介
- 联想340c笔记本cpu能升级吗_笔记本电脑可以升级CPU吗
- python输入什么就输出什么意思_python中的输入与输出是什么?(实例详解)
- 表格提示html内容消失,如何解决Word里面的表格插入题注后页面上内容消失、无法编辑的问题...
- 别说华为语音助手不智能了,这3大隐藏功能都知道吗?实用又贴心
- 根据 commit message 自动生成 changelog
- mongoDB使用及简单命令(忘记了密码怎么办、mongoDB密码重置、创建数据库、mongoDB启动停止)
热门文章
- PS 学习笔记 08-矩形工具组
- Fuzzing101 Exercise 5 - LibXML2 学习笔记
- PHP中使用Imagick实现各种图片效果实例_php技巧
- HorizontalScrollView重置滑动位置问题
- AI时代下零售商的新商业模式
- 机器视觉软件能够做什么
- 读书笔记《Outlier Analysis》 第九章 时间序列和多维流的异常检测
- 无法更新sudo apt-get update(N: 无法安全地用该源进行更新,所以默认禁用该源。N: 参见 apt-secure(8) 手册以了解仓库创建和用户配置方面的细节)
- 快手滑块验证2023/01/31
- 北京突传大消息!又一“铁饭碗”将被砸!