上篇:union的使用

connect的使用

  • 可以将两个流不一致的类型连接一起
  • 可以共享状态

直接上代码

package cn._51doit.flink.day03;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
/*** connect的使用* 程序操作:* nc -l 8888的窗口输入字符串,将会转成大写输出;* nc -l 8888的窗口输入数字,将会运算输出*/
public class ConnectDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());DataStreamSource<String> lines1 = env.socketTextStream("Master", 8888);SingleOutputStreamOperator<Integer> lines2 = env.socketTextStream("Master", 9999).map(s -> Integer.parseInt(s));//将两个流connect【连接】ConnectedStreams<String, Integer> connected = lines1.connect(lines2);SingleOutputStreamOperator<String> result = connected.map(new CoMapFunction<String, Integer, String>() {//对第一个流进行map运算的方法@Overridepublic String map1(String value) throws Exception {return value.toUpperCase();}//对第二个流进行map运算的方法@Overridepublic String map2(Integer value) throws Exception {return (value * 10) + "";}//这两个流的map方法执行完的返回值会放到新的流中});result.print();env.execute();}}

connect的使用相关推荐

  1. Docker使用遇到问题Got permission denied while trying to connect to the Docker daemon socket解决方案

    Got permission denied while trying to connect to the Docker daemon socket at unix:///var/run/docker. ...

  2. Can 't connect to local MySQL server through socket '/tmp/mysql.sock '(2)

    安装了mysql, 使用命令mysql -u root -p 弹出Can 't connect to local MySQL server through socket '/tmp/mysql.soc ...

  3. oracle 10035 err 942,案例:Oracle日志报错 Fatal NI connect error 12170 TNS-12535 TNS-00505

    天萃荷净 Oracle数据库alert日志文件报错Fatal NI connect error 12170,通过查看mos相关文章找到解决办法 今天在一台服务器的日志文件中,发现如下信息: Fatal ...

  4. socket connect阻塞和非阻塞处理

    建立socket后默认connect()函数为阻塞连接状态,在大多数实现中,connect的超时时间在75s至几分钟之间,想要缩短超时时间,可解决问题的两种方法:方法一.将socket句柄设置为非阻塞 ...

  5. 【Qt】Qt再学习(十六):QObject::connect: Cannot queue arguments of type ‘QString‘

    1.问题描述 跨线程使用信号和槽时,如果是非const的引用传参,就会报如下的错误: QObject::connect: Cannot queue arguments of type 'QString ...

  6. 【网络编程】非阻塞connect详解

    一.为什么使用非阻塞connect TCP连接的建立涉及一个在三路握手过程,阻塞的connect一直等到客户收到自己的SYN的ACK才返回,这需要至少一个RTT时间,RTT时间波动很大从几毫秒到几秒. ...

  7. pyqt designer connect无响应_如何用PyQt编写桌面程序,创建并打开播放列表?

    由于电脑上的短视频太多了,并且分别存放在各个子目录下,每次更新后想要整理视频比较麻烦,因此想用Python编写程序来辅助管理视频文件.其实写个Python脚本程序即可实现大器的需求,但为了多练习PyQ ...

  8. Fastlane 入门实战教程从打包到上传iTunes connect

    有关神器 Fastlane 持续集成\部署的文章网上挺多,本文定位是入门教程,针对 iOS 应用的持续部署,只需一条命令就可实现从 Xcode 项目到 编译\打包\构建\提交审核 文章稍微有点长,涵盖 ...

  9. 1.3 Quick Start中 Step 7: Use Kafka Connect to import/export data官网剖析(博主推荐)

    不多说,直接上干货! 一切来源于官网 http://kafka.apache.org/documentation/ Step 7: Use Kafka Connect to import/export ...

  10. netbackup错误之can not connect on socket(25)

    rhel5.5上安装netbackup 7.0,这个版本只能安装在64位系统上.安装完netbackup 7.0后,发现登录界面一直报java认证失败,查看了下日志文件,报如下内容: 查了下系统设置, ...

最新文章

  1. python threading多线程计算
  2. 【Codeforces 986B】Petr and Permutations
  3. Sublime Text 2 使用心得
  4. 安装 | OpenCV4.2.0 + VS2017安装教程
  5. Linux进程 excel族函数的用法
  6. 网页聊天室win10界面源码
  7. android消息机制 Message, Looper,Handler
  8. php session gc_maxlifetime,PHPsession 有效期 session.gc_maxlifetime
  9. 【2019-08-18】时间是有密度的
  10. python websockets(wss)
  11. masscan端口扫描
  12. WinRAR 密码 模板的秘密:
  13. 阿里技术专家十五问,真题面试刀刀见肉,走进面试间(答案解析)
  14. gulp+webpack工具整合简介
  15. 联想340c笔记本cpu能升级吗_笔记本电脑可以升级CPU吗
  16. python输入什么就输出什么意思_python中的输入与输出是什么?(实例详解)
  17. 表格提示html内容消失,如何解决Word里面的表格插入题注后页面上内容消失、无法编辑的问题...
  18. 别说华为语音助手不智能了,这3大隐藏功能都知道吗?实用又贴心
  19. 根据 commit message 自动生成 changelog
  20. mongoDB使用及简单命令(忘记了密码怎么办、mongoDB密码重置、创建数据库、mongoDB启动停止)

热门文章

  1. PS 学习笔记 08-矩形工具组
  2. Fuzzing101 Exercise 5 - LibXML2 学习笔记
  3. PHP中使用Imagick实现各种图片效果实例_php技巧
  4. HorizontalScrollView重置滑动位置问题
  5. AI时代下零售商的新商业模式
  6. 机器视觉软件能够做什么
  7. 读书笔记《Outlier Analysis》 第九章 时间序列和多维流的异常检测
  8. 无法更新sudo apt-get update(N: 无法安全地用该源进行更新,所以默认禁用该源。N: 参见 apt-secure(8) 手册以了解仓库创建和用户配置方面的细节)
  9. 快手滑块验证2023/01/31
  10. 北京突传大消息!又一“铁饭碗”将被砸!