Flink 创建流处理运行环境
import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}/** DATE:2022/10/1 19:10 * AUTHOR:GX */ object streamWordCount {def main(args: Array[String]): Unit = {//创建流处理运行环境val env = StreamExecutionEnvironment.getExecutionEnvironment // env.setParallelism(16) //设置当前并行度 //执行在哪一个线程是根据当前单词的哈希值决定的//从外部命令中提取参数作为socket主机名和端口号val paramTool:ParameterTool = ParameterTool.fromArgs(args)val host = paramTool.get("host")val port = paramTool.getInt("port")//接受一个socket文本流val inputDataStream:DataStream[String] = env.socketTextStream(host,port)//进行转换处理统计val resultDataStream:DataStream[(String,Int)] = inputDataStream.flatMap(x=>x.split(" ")).filter(_.nonEmpty).map((_,1)).keyBy(0) //分组基于当前key的hash值取模.sum(1)//打印输出resultDataStream.print()// .setParallelism(1) //输出数据可能乱序,网络传输的时候已经乱了,并行度为1时不乱,不能实现高并发,高吞吐量//启动一个进程,等待数据输入//启动任务执行env.execute("streamWordCount") /* 3> (hello,1) 前面的数字并行的线程 当前机器的核数5> (world,1)7> (flink,1)7> (flink,2)*/} }
--host master --port 7777
master: nc -lk 7777
Flink 创建流处理运行环境相关推荐
- OpenShift 4 - 创建Service Mesh运行环境
<OpenShift 4.x HOL教程汇总> 说明:本文已经在OpenShift 4.10环境中验证 文章目录 安装 OpenShift Service Mesh 和相关依赖 Opera ...
- OpenGL基础知识梳理——Windows上搭建opengles运行环境
1.概念介绍 1)OpenGLES 官方介绍:https://www.khronos.org/opengles/ OpenGLES(OpenGL for embeded systems)是用于嵌入式设 ...
- 2.树莓派4B 64位操作系统 从零搭建深度学习项目运行环境
树莓派的环境配置 文章目录 树莓派的环境配置 1.系统烧录 1.1 系统选择 1.1.1 Raspbian OS:官方的树莓派操作系统 1.1.2 Ubuntu MATE:适合通用计算需求 1.1.3 ...
- Jupyter notebook 运行环境创建和切换 (Win10+Anaconda)
1. 激活Jupyter notebook运行需要的虚拟环境 activate pytorch-gpu pytorch-gpu 为环境名 2. 在当前环境下安装 Jupyter (pytorch-gp ...
- OpenShift 4 - Knative教程 (1) 创建Serverless运行环境
<OpenShift 4.x HOL教程汇总> 说明:本文已经在OpenShift 4.10环境中验证 文章目录 说明 客户端环境 配置Knative环境 安装OpenShift Serv ...
- python 的 virtualenv 环境搭建及 sublime 手动创建运行环境
一.安装 virtual env sudo pip install virtualenv 二.进入一个空白的目录初始化 virtual env 的环境 cd ~/workspace/python/ v ...
- 使用CentOS7创建Docker运行环境
使用CentOS7创建Docker运行环境 目录: 一.网络基础设置: 1.1 IP地址.网关 1.2 hostname.DNS 1.3 Host主机列表文件 1.4检查IP地址配置 二.安全及其他设 ...
- oracle最佳环境,创建最适合的Oracle运行环境
在Oracle数据库中,提供了一套默认的用户操作环境.如用户查询的时候,从数据库中一次提取的行数;列之间的分隔符;每行显示的最大宽度; 每页默认显示的行数等等.这些都是靠数据库的环境变量来控制.这些参 ...
- Flink教程(10)- Flink批流一体API(其它)
文章目录 01 引言 02 累加器 2.1 相关API 2.2 示例代码 03 广播变量 3.1 原理 3.2 示例代码 04 分布式缓存 4.1 原理 4.2 示例代码 05 文末 01 引言 在前 ...
最新文章
- Centos6.5升级系统自带gcc4.4.7到gcc4.8.0
- 线程访问临界区的问题 实例,需解决
- 在家也能做化学实验!VR教育机构MEL Science获250万美元融资
- linux ssh互免密配置
- 783. 二叉搜索树节点最小距离
- 阿里云Tech Insight 企业迁云实战专场强势来袭!
- java当前时间查询,Java实现查询记录的时间相对于当前时间
- iOS 代码命名规范 及Android 代码命名规范(2)Android
- 拓客系统专用服务器,北京拓客系统
- python 实例化对象_python如何实例化对象
- 工业级串口Modbus数据绘制曲线及上位机监控软件DotTrend
- 开发用的到java数组吗_java数组
- 爬虫抓取暗黑3玩家数据
- java json 乱码问题_java中json传输数据乱码问题
- PAT乙级 1072 开学寄语
- 使用Python实现微信发送文本消息、图片以及附件
- 基础测绘1:10000成果检验需注意的问题
- vue番茄钟 electron打包
- 红米2 手机root
- 弘辽科技:拼多多没出单改销量吗?拼多多如何提高销量?
热门文章
- 美团 CEO 王兴:从 0 到干到 300 亿美金,格局上输了,再多努力都不可能赢
- 基于Java的网络编程实践
- 【公平锁和非公平锁有什么区别?】
- Python调用百度AI,实现音频转换文字(标准版)
- linux 行首加特定字符_linux shell 用sed命令在文本的行尾或行首添加字符
- 使用Post不传Body,出现socket hang up报错
- 关于报错An unexpected error occurred: “https://registry.yarnpkg.com/react: socket hang up“
- 关于onKeyDown方法
- c语言stdoux串口流,嵌入式C语言代码优化的一些经验
- 2018年 10月份开始执行的最新税率表 2019年1月 小孩住房贷款等怎么扣