需求:当一个文本文件进入时,有可能会有一些格式乱码的错误行,如何统计哪些错误行?如何提取错误行

  def main(args: Array[String]): Unit = {val env  = ExecutionEnvironment.getExecutionEnvironmentval data = env.fromElements("hadoop","spark","pyspark", "storm")data.map(new RichMapFunction[String, Long] {var counter = 0loverride def map(value: String): Long = {counter = counter + 1println("counter:"+counter)counter}}).setParallelism(2).print()}

使用这种方式,设置并行度之后,无法正确统计。

正确的方式是通过定义Accumulator来进行计数操作。scala实现方式如下:

val info = data.map(new RichMapFunction[String, String] {// step1:定义计数器val counter  = new LongCounter()override def open(parameters: Configuration): Unit = {// step2: 注册计数器getRuntimeContext.addAccumulator("ele-counts-scala", counter)}override def map(in: String): String = {counter.add(1)in}})info.writeAsText("E:/test3", WriteMode.OVERWRITE).setParallelism(4)val jobResult=env.execute("CounterApp")// step3: 获取计数器val num =jobResult.getAccumulatorResult[Long]("ele-counts-scala")println("num:" + num )

Java

public class JavaCounterApp {public static void main(String[] args) throws Exception {ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();DataSource<String> data = executionEnvironment.fromElements("hadoop", "spark", "pyspark", "storm");DataSet dataSet = data.map(new RichMapFunction<String, String>() {LongCounter counter = new LongCounter();@Overridepublic void open(Configuration parameters) throws Exception {getRuntimeContext().addAccumulator("ele-counts-java",counter);}@Overridepublic String map(String value) throws Exception {counter.add(1);return value;}});dataSet.writeAsText("E:/test4", FileSystem.WriteMode.OVERWRITE).setParallelism(3);JobExecutionResult javaCounterApp = executionEnvironment.execute("JavaCounterApp");long num = javaCounterApp.getAccumulatorResult("ele-counts-java");System.out.println("num:" + num);}
}

Apache Flink 零基础入门(十三)Flink 计数器相关推荐

  1. Apache Flink 零基础入门【转】

    Apache Flink 零基础入门(一):基础概念解析 Apache Flink 零基础入门(二):DataStream API 编程 转载于:https://www.cnblogs.com/dav ...

  2. Apache Flink 零基础入门(十八)Flink Table APISQL

    什么是Flink关系型API? 虽然Flink已经支持了DataSet和DataStream API,但是有没有一种更好的方式去编程,而不用关心具体的API实现?不需要去了解Java和Scala的具体 ...

  3. Apache Flink 零基础入门(三)编写最简单的helloWorld

    实验环境 JDK 1.8 IDE Intellij idea Flink 1.8.1 实验内容 创建一个Flink简单Demo,可以从流数据中统计单词个数. 实验步骤 首先创建一个maven项目,其中 ...

  4. Apache Flink 零基础入门(十五)Flink DataStream编程(如何自定义DataSource)

    数据源可以通过StreamExecutionEnvironment.addSource(sourceFunction)方式来创建,Flink也提供了一些内置的数据源方便使用,例如readTextFil ...

  5. Apache Flink 零基础入门(一):基础概念解析

    Apache Flink 的定义.架构及原理 Apache Flink 是一个分布式大数据处理引擎,可对有限数据流和无限数据流进行有状态或无状态的计算,能够部署在各种集群环境,对各种规模大小的数据进行 ...

  6. Apache Flink 零基础入门(二十)Flink部署与作业的提交

    之前我们都是基于Idea在本地进行开发,这种方式很适合开发以及测试,但是开发完之后,如何提交到服务器中运行? Flink单机部署方式 本地开发和测试过程中非常有用,只要把代码放到服务器直接运行. 前置 ...

  7. Apache Flink 零基础入门(二十)Flink kafka connector

    内置source和sink 内置source包括从文件读取,从文件夹读取,从socket中读取.从集合或者迭代器中读取.内置的sink包括写文件.控制台输出.socket 内置connectors A ...

  8. Apache Flink 零基础入门(十九)Flink windows和Time操作

    Time类型 在Flink中常用的Time类型: 处理时间 摄取时间 事件时间 处理时间 是上图中,最后一步的处理时间,表示服务器中执行相关操作的处理时间.例如一些算子操作时间,在服务器上面的时间. ...

  9. Apache Flink 零基础入门(十七)Flink 自定义Sink

    需求:socket发送过来的数据,把String类型转成对象,然后把Java对象保存到Mysql数据库中. 创建数据库和表 create database imooc_flink; create ta ...

最新文章

  1. 这个奇葩打字外设火了,一分钟500词比说话还快,直接被打字比赛禁用
  2. easyui的Pagination单独使用
  3. HashMap 的使用
  4. [蓝桥杯2016初赛]寒假作业-next_permutation枚举
  5. 最全机器学习种类讲解:监督、无监督、在线和批量学习都讲明白了
  6. Linux开机启动过程(5):内核解压
  7. AJAX框架衣柜推拉门设计,带镜子的推拉门衣柜如何设计好看
  8. 深入理解二阶段提交协议(DDB对XA悬挂事务的处理分析)(一)
  9. 2008年上半年程序员考试上午真题自我汇总
  10. C#毕业设计——基于C#+ASP.NET+SQL Server的酒店入住信息管理系统设计与实现(毕业论文+程序源码)——酒店入住信息管理系统
  11. 代码本色 processing编程练习
  12. cesium实现鹰眼地图(三维)效果
  13. Python基于PyTorch实现BP神经网络ANN分类模型项目实战
  14. 开机出现“Disk I/O error”的故障解决
  15. c代码触发sysrq-trigger
  16. 怎样规划学习Linux,就业方向有哪些?
  17. 使用C++计算3次牛顿插值法
  18. Tecnomatix Plant Simulation 14 学习之路(四)
  19. 精品帖—matlab求解存在多个非线性不等式约束的多元约束优化问题方法
  20. open cyper还是open cypher,李逵or李鬼?

热门文章

  1. servlet action is not available
  2. springcloud----负载均衡--Ribbon与LoadBalance
  3. ThinkPHP5跨控制器调用
  4. 使用svn diff的-r参数的来比较任意两个版本的差异
  5. JQUERY的appendappendTo
  6. 微信小程序长按图片,实现保存、转发、识别图中二维码
  7. 非计算机专业的学生,简谈非计算机专业的计算机教育
  8. mysql opensuse_opensuse免安装mysql
  9. ios调用restful接口_Postman调用https异常解决
  10. eclipse正则表达式查找