flink(一个流处理,一个批处理)
流处理,这里用netcat来完成
package com.smalltiger.flinkWCimport org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._/*** Created by smalltiger on 2019/11/6.* flink基于流处理的一个WordCount统计*/
object StreamWC {def main(args: Array[String]): Unit = {//从外部命令中获取参数var params: ParameterTool = ParameterTool.fromArgs(args)var host: String = params.get("host")var port: Int = params.getInt("port")//1.获取当前执行环境var env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//接受socket文本流val textDstrem:DataStream[String] = env.socketTextStream(host,port)//flatMap和Map需要引入引入隐式转换import org.apache.flink.api.scala._val dataStream:DataStream[(String,Int)] = textDstrem.flatMap(_.split(" ")).filter(_.nonEmpty).map((_,1)).keyBy(0).sum(1)dataStream.print().setParallelism(1)//启动executor,执行任务env.execute("启动任务")}
}
批处理,直接处理文件内容
package com.smalltiger.flinkWC
import org.apache.flink.api.scala._/*** Created by smalltiger on 2019/11/6.* flink基于批处理统计wordcount*/
object WordCount {def main(args: Array[String]): Unit = {//创建执行环境val env = ExecutionEnvironment.getExecutionEnvironment//从文件中读取数据val inpath = "D:\\WorkSpace\\flinkWC\\src\\main\\resources\\abc.txt";var inputDS: DataSet[String] = env.readTextFile(inpath)//按照空格进行一个分词,对单词进行groupby分组,然后用sum进行一个聚合var wordCounts: AggregateDataSet[(String, Int)] = inputDS.flatMap(_.split(" ")).map((_, 1)).groupBy(0).sum(1)wordCounts.print()}
}
flink(一个流处理,一个批处理)相关推荐
- Flink的流处理与批处理
Flink的流处理与批处理 Flink通过执行引擎,能够同时支持批处理与流处理任务. 在执行引擎这一层,流处理系统与批处理系统最大的不同在于节点的数据传输方式. 流处理系统 对于一个流处理系统,其节点 ...
- 橘子学Flink03之Flink的流处理与批处理
一.Flink 处理模型: flink的处理方式主要有两种是流处理与批处理.Flink 专注于无限流处理,有限流处理是无限流处理的一种特殊情况.可以通过调节阈值来设置多少数据处理一次,这是批处理的一种 ...
- c语言fgetc()函数(从指定的流 stream 获取下一个字符(一个无符号字符),并把位置标识符往前移动)
C 标准库 - <stdio.h> 文章目录 描述 声明 参数 返回值 实例 描述 C 库函数 int fgetc(FILE *stream) 从指定的流 stream 获取下一个字符(一 ...
- Boost:构造一个流对象,任何发送到此流将标准输出
Boost:构造一个流对象,任何发送到此流将标准输出 实现功能 C++实现代码 实现功能 构造一个流对象,任何发送到此流将标准输出 C++实现代码 #include "zfstream.h& ...
- 如何复位一个流的failbit和eofbit
如果复位一个流cin的failbit位和eofbit位,那么可以采用如下代码: (1)当复位cin的failbit时,可以用 cin.clear(cin.rdstate() & ~cin.fa ...
- 【Flink】FlinkConsumer是如何保证一个partition对应一个thread的
1.概述 我们都知道flink 连接kafka时,默认是一个partition对应一个thread,它究竟是怎么实现的呢?以及到我们自己定义 RichParallelSourceFunction 的时 ...
- HDU4183 Pahom on Water(来回走最大流,一个点只经过一次)
题意: 有n个圆,每个圆的中心和半径和一个频率都给定,只有一个频率最高的789为紫色,只有一个最低的400为红色,规则如下: 1.当两个圆严格相交时,且人是从红色到紫色的方向运动时可以由低频率向高频率 ...
- java 8流在另一个流_Java 8流– Java流
java 8流在另一个流 Welcome to Java 8 Stream API tutorial. In the last few java 8 posts, we looked into Jav ...
- java 8流在另一个流_Java 8流图
java 8流在另一个流 Java 8 Stream map function can be used to perform some operation on all of it's element ...
最新文章
- 分布式系统咋做同步?虐死人!
- 【报错】ERROR 1064 (42000): You have an error in your SQL syntax; check the manual that corresponds t
- Cpp 对象模型探索 / 含有虚基类的类的内存布局
- 华为鸿蒙harmonyos-面向全场,华为正式官宣鸿蒙手机版相约6月2日,EMUI官博更名为HarmonyOS...
- BZOJ 2442: [Usaco2011 Open]修剪草坪 单调队列
- [LeetCode] Generate Parentheses
- 利用javascript onclick实现网页跳转
- maven依赖最佳实践_Maven最佳实践
- 国产数据库年终大盘点!
- Spring Boot对jsp的支持
- 《区块链+》读书感想
- 利用Scrapy编写“1024网站种子吞噬爬虫”,送福利
- 【读书笔记】码农翻身 - 简介
- 高级软件工程(2022春)课程总结
- [Python]查看Python版本
- 升级JFlash后无法连接JLINK问题处理
- 64位操作系统和32位的区别介绍【详解】
- 使用Makefile链接so库文件
- SQL字符串拼接 引号问题
- 【强化学习】Deep Q Network深度Q网络(DQN)
热门文章
- NOIP复习资料——往年习题精选
- 矩阵快速幂 HDU3483
- POJ 2429 GCD LCM Inverse ★(pollard-ρ DFS枚举)
- JavaScript 判断浏览器类型
- Hihocoder 1632 : Secret Poems 思维|技巧
- java垃圾回收机制_干货:Java 垃圾回收机制
- java 做项目踩坑,web项目踩坑过程
- STM32 USB虚拟串口原理(上)
- python随机划分数据集_Python之机器学习-sklearn生成随机数据
- 微星主板超频_内存超频能力依旧拔群!微星MEG Z490 ACE主板评测