流处理,这里用netcat来完成

package com.smalltiger.flinkWCimport org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._/*** Created by smalltiger on 2019/11/6.* flink基于流处理的一个WordCount统计*/
object StreamWC {def main(args: Array[String]): Unit = {//从外部命令中获取参数var params: ParameterTool = ParameterTool.fromArgs(args)var host: String = params.get("host")var port: Int = params.getInt("port")//1.获取当前执行环境var env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//接受socket文本流val textDstrem:DataStream[String] = env.socketTextStream(host,port)//flatMap和Map需要引入引入隐式转换import org.apache.flink.api.scala._val dataStream:DataStream[(String,Int)] = textDstrem.flatMap(_.split(" ")).filter(_.nonEmpty).map((_,1)).keyBy(0).sum(1)dataStream.print().setParallelism(1)//启动executor,执行任务env.execute("启动任务")}
}

批处理,直接处理文件内容

package com.smalltiger.flinkWC
import org.apache.flink.api.scala._/*** Created by smalltiger on 2019/11/6.* flink基于批处理统计wordcount*/
object WordCount {def main(args: Array[String]): Unit = {//创建执行环境val env = ExecutionEnvironment.getExecutionEnvironment//从文件中读取数据val inpath = "D:\\WorkSpace\\flinkWC\\src\\main\\resources\\abc.txt";var inputDS: DataSet[String] = env.readTextFile(inpath)//按照空格进行一个分词,对单词进行groupby分组,然后用sum进行一个聚合var wordCounts: AggregateDataSet[(String, Int)] = inputDS.flatMap(_.split(" ")).map((_, 1)).groupBy(0).sum(1)wordCounts.print()}
}


flink(一个流处理,一个批处理)相关推荐

  1. Flink的流处理与批处理

    Flink的流处理与批处理 Flink通过执行引擎,能够同时支持批处理与流处理任务. 在执行引擎这一层,流处理系统与批处理系统最大的不同在于节点的数据传输方式. 流处理系统 对于一个流处理系统,其节点 ...

  2. 橘子学Flink03之Flink的流处理与批处理

    一.Flink 处理模型: flink的处理方式主要有两种是流处理与批处理.Flink 专注于无限流处理,有限流处理是无限流处理的一种特殊情况.可以通过调节阈值来设置多少数据处理一次,这是批处理的一种 ...

  3. c语言fgetc()函数(从指定的流 stream 获取下一个字符(一个无符号字符),并把位置标识符往前移动)

    C 标准库 - <stdio.h> 文章目录 描述 声明 参数 返回值 实例 描述 C 库函数 int fgetc(FILE *stream) 从指定的流 stream 获取下一个字符(一 ...

  4. Boost:构造一个流对象,任何发送到此流将标准输出

    Boost:构造一个流对象,任何发送到此流将标准输出 实现功能 C++实现代码 实现功能 构造一个流对象,任何发送到此流将标准输出 C++实现代码 #include "zfstream.h& ...

  5. 如何复位一个流的failbit和eofbit

    如果复位一个流cin的failbit位和eofbit位,那么可以采用如下代码: (1)当复位cin的failbit时,可以用 cin.clear(cin.rdstate() & ~cin.fa ...

  6. 【Flink】FlinkConsumer是如何保证一个partition对应一个thread的

    1.概述 我们都知道flink 连接kafka时,默认是一个partition对应一个thread,它究竟是怎么实现的呢?以及到我们自己定义 RichParallelSourceFunction 的时 ...

  7. HDU4183 Pahom on Water(来回走最大流,一个点只经过一次)

    题意: 有n个圆,每个圆的中心和半径和一个频率都给定,只有一个频率最高的789为紫色,只有一个最低的400为红色,规则如下: 1.当两个圆严格相交时,且人是从红色到紫色的方向运动时可以由低频率向高频率 ...

  8. java 8流在另一个流_Java 8流– Java流

    java 8流在另一个流 Welcome to Java 8 Stream API tutorial. In the last few java 8 posts, we looked into Jav ...

  9. java 8流在另一个流_Java 8流图

    java 8流在另一个流 Java 8 Stream map function can be used to perform some operation on all of it's element ...

最新文章

  1. 分布式系统咋做同步?虐死人!
  2. 【报错】ERROR 1064 (42000): You have an error in your SQL syntax; check the manual that corresponds t
  3. Cpp 对象模型探索 / 含有虚基类的类的内存布局
  4. 华为鸿蒙harmonyos-面向全场,华为正式官宣鸿蒙手机版相约6月2日,EMUI官博更名为HarmonyOS...
  5. BZOJ 2442: [Usaco2011 Open]修剪草坪 单调队列
  6. [LeetCode] Generate Parentheses
  7. 利用javascript onclick实现网页跳转
  8. maven依赖最佳实践_Maven最佳实践
  9. 国产数据库年终大盘点!
  10. Spring Boot对jsp的支持
  11. 《区块链+》读书感想
  12. 利用Scrapy编写“1024网站种子吞噬爬虫”,送福利
  13. 【读书笔记】码农翻身 - 简介
  14. 高级软件工程(2022春)课程总结
  15. [Python]查看Python版本
  16. 升级JFlash后无法连接JLINK问题处理
  17. 64位操作系统和32位的区别介绍【详解】
  18. 使用Makefile链接so库文件
  19. SQL字符串拼接 引号问题
  20. 【强化学习】Deep Q Network深度Q网络(DQN)

热门文章

  1. NOIP复习资料——往年习题精选
  2. 矩阵快速幂 HDU3483
  3. POJ 2429 GCD LCM Inverse ★(pollard-ρ DFS枚举)
  4. JavaScript 判断浏览器类型
  5. Hihocoder 1632 : Secret Poems 思维|技巧
  6. java垃圾回收机制_干货:Java 垃圾回收机制
  7. java 做项目踩坑,web项目踩坑过程
  8. STM32 USB虚拟串口原理(上)
  9. python随机划分数据集_Python之机器学习-sklearn生成随机数据
  10. 微星主板超频_内存超频能力依旧拔群!微星MEG Z490 ACE主板评测