Apache Flink 零基础入门(十三)Flink 计数器
需求:当一个文本文件进入时,有可能会有一些格式乱码的错误行,如何统计哪些错误行?如何提取错误行
def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval data = env.fromElements("hadoop","spark","pyspark", "storm")data.map(new RichMapFunction[String, Long] {var counter = 0loverride def map(value: String): Long = {counter = counter + 1println("counter:"+counter)counter}}).setParallelism(2).print()}
使用这种方式,设置并行度之后,无法正确统计。
正确的方式是通过定义Accumulator来进行计数操作。scala实现方式如下:
val info = data.map(new RichMapFunction[String, String] {// step1:定义计数器val counter = new LongCounter()override def open(parameters: Configuration): Unit = {// step2: 注册计数器getRuntimeContext.addAccumulator("ele-counts-scala", counter)}override def map(in: String): String = {counter.add(1)in}})info.writeAsText("E:/test3", WriteMode.OVERWRITE).setParallelism(4)val jobResult=env.execute("CounterApp")// step3: 获取计数器val num =jobResult.getAccumulatorResult[Long]("ele-counts-scala")println("num:" + num )
Java
public class JavaCounterApp {public static void main(String[] args) throws Exception {ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();DataSource<String> data = executionEnvironment.fromElements("hadoop", "spark", "pyspark", "storm");DataSet dataSet = data.map(new RichMapFunction<String, String>() {LongCounter counter = new LongCounter();@Overridepublic void open(Configuration parameters) throws Exception {getRuntimeContext().addAccumulator("ele-counts-java",counter);}@Overridepublic String map(String value) throws Exception {counter.add(1);return value;}});dataSet.writeAsText("E:/test4", FileSystem.WriteMode.OVERWRITE).setParallelism(3);JobExecutionResult javaCounterApp = executionEnvironment.execute("JavaCounterApp");long num = javaCounterApp.getAccumulatorResult("ele-counts-java");System.out.println("num:" + num);}
}
Apache Flink 零基础入门(十三)Flink 计数器相关推荐
- Apache Flink 零基础入门【转】
Apache Flink 零基础入门(一):基础概念解析 Apache Flink 零基础入门(二):DataStream API 编程 转载于:https://www.cnblogs.com/dav ...
- Apache Flink 零基础入门(十八)Flink Table APISQL
什么是Flink关系型API? 虽然Flink已经支持了DataSet和DataStream API,但是有没有一种更好的方式去编程,而不用关心具体的API实现?不需要去了解Java和Scala的具体 ...
- Apache Flink 零基础入门(三)编写最简单的helloWorld
实验环境 JDK 1.8 IDE Intellij idea Flink 1.8.1 实验内容 创建一个Flink简单Demo,可以从流数据中统计单词个数. 实验步骤 首先创建一个maven项目,其中 ...
- Apache Flink 零基础入门(十五)Flink DataStream编程(如何自定义DataSource)
数据源可以通过StreamExecutionEnvironment.addSource(sourceFunction)方式来创建,Flink也提供了一些内置的数据源方便使用,例如readTextFil ...
- Apache Flink 零基础入门(一):基础概念解析
Apache Flink 的定义.架构及原理 Apache Flink 是一个分布式大数据处理引擎,可对有限数据流和无限数据流进行有状态或无状态的计算,能够部署在各种集群环境,对各种规模大小的数据进行 ...
- Apache Flink 零基础入门(二十)Flink部署与作业的提交
之前我们都是基于Idea在本地进行开发,这种方式很适合开发以及测试,但是开发完之后,如何提交到服务器中运行? Flink单机部署方式 本地开发和测试过程中非常有用,只要把代码放到服务器直接运行. 前置 ...
- Apache Flink 零基础入门(二十)Flink kafka connector
内置source和sink 内置source包括从文件读取,从文件夹读取,从socket中读取.从集合或者迭代器中读取.内置的sink包括写文件.控制台输出.socket 内置connectors A ...
- Apache Flink 零基础入门(十九)Flink windows和Time操作
Time类型 在Flink中常用的Time类型: 处理时间 摄取时间 事件时间 处理时间 是上图中,最后一步的处理时间,表示服务器中执行相关操作的处理时间.例如一些算子操作时间,在服务器上面的时间. ...
- Apache Flink 零基础入门(十七)Flink 自定义Sink
需求:socket发送过来的数据,把String类型转成对象,然后把Java对象保存到Mysql数据库中. 创建数据库和表 create database imooc_flink; create ta ...
最新文章
- 这个奇葩打字外设火了,一分钟500词比说话还快,直接被打字比赛禁用
- easyui的Pagination单独使用
- HashMap 的使用
- [蓝桥杯2016初赛]寒假作业-next_permutation枚举
- 最全机器学习种类讲解:监督、无监督、在线和批量学习都讲明白了
- Linux开机启动过程(5):内核解压
- AJAX框架衣柜推拉门设计,带镜子的推拉门衣柜如何设计好看
- 深入理解二阶段提交协议(DDB对XA悬挂事务的处理分析)(一)
- 2008年上半年程序员考试上午真题自我汇总
- C#毕业设计——基于C#+ASP.NET+SQL Server的酒店入住信息管理系统设计与实现(毕业论文+程序源码)——酒店入住信息管理系统
- 代码本色 processing编程练习
- cesium实现鹰眼地图(三维)效果
- Python基于PyTorch实现BP神经网络ANN分类模型项目实战
- 开机出现“Disk I/O error”的故障解决
- c代码触发sysrq-trigger
- 怎样规划学习Linux,就业方向有哪些?
- 使用C++计算3次牛顿插值法
- Tecnomatix Plant Simulation 14 学习之路(四)
- 精品帖—matlab求解存在多个非线性不等式约束的多元约束优化问题方法
- open cyper还是open cypher,李逵or李鬼?