Apache Flink 是以高效、可扩展方式处理海量数据的大数据处理框架。本文介绍它的一些核心概念,以及标准数据转换Java版本api,这些API以流畅的方式可以很容易使用Flink的核心数据结构——分布式集合。
首先介绍Flink DataSet API实现统计单词频次程序,然后简要看下用于实时流式数据处理的DataStream API。

maven依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.2.0</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-test-utils_2.10</artifactId><version>1.2.0</version><scope>test<scope>
</dependency>

核心API概念

使用Flink时,选哟知道一些API相关的概念:

  • 每个在分布式集合数据执行转换程序,需要使用多个转换数据函数,包括:filtering, mapping, joining, grouping, and aggregating。
  • Flink中sink操作触发流执行产生程序期望的结果,例如,将结果保存到文件系统或打印到标准输出。
  • Flink转换是懒执行,意味着知道sink操作执行才会真正执行。
  • Flink API支持两种模式——批处理和实时处理。对于有限数据源使用批模式,使用DataSet API;处理无界实时流数据,应该DataStream API。

DataSet API转换数据

Flink程序的入口点是ExecutionEnvironment 类的实例, 它定义了程序执行的上下文。下面创建ExecutionEnvironment对下并开始处理数据:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

注:当你在本地机器上启动程序,则仅在本地JVM上执行处理。如果需要在集群环境中启动处理,则应该在集群中每个服务器上按照Apache Flink并配置相应ExecutionEnvironment。

创建数据集(DataSet)

要执行数据转换,需要提供数据。下面使用ExecutionEnvironement创建DataSet class :

DataSet<Integer> amounts = env.fromElements(1, 29, 40, 50);

也可以从其他数据源创建数据集,如Apache Kafka、CSV文件或其他数据源。

过滤和归约

准备好数据集,就可以进行过滤和转换。假设我们需要根据某阈值进行过滤,然后对过滤后的数据进行累加。则可以使用 filter() 和 reduce() 函数实现:

int threshold = 30;
List<Integer> collect = amounts.filter(a -> a > threshold).reduce((integer, t1) -> integer + t1).collect();assertThat(collect.get(0)).isEqualTo(90);

注:collect()方法是sink操作,它实际触发数据转换。

map映射

假设我们有Person对象数据集:

private static class Person {private int age;private String name;// standard constructors/getters/setters
}

接着创建该对象的数据集:

DataSet<Person> personDataSource = env.fromCollection(Arrays.asList(new Person(23, "Tom"),new Person(75, "Michael")));

如果我们仅需要每个对象的age属性,可以使用map转换方法实现:

List<Integer> ages = personDataSource.map(p -> p.age).collect();assertThat(ages).hasSize(2);
assertThat(ages).contains(23, 75);

join方法

可以对两个数据集基于ID字段进行关联操作,实现连接转换。下面创建用户的事务和地址数据集:

Tuple3<Integer, String, String> address= new Tuple3<>(1, "5th Avenue", "London");
DataSet<Tuple3<Integer, String, String>> addresses= env.fromElements(address);Tuple2<Integer, String> firstTransaction = new Tuple2<>(1, "Transaction_1");
DataSet<Tuple2<Integer, String>> transactions = env.fromElements(firstTransaction, new Tuple2<>(12, "Transaction_2"));

两个元组的第一个字段都是整型,这是连接两个数据集的ID字段。为了执行实际连接逻辑,需要实现地址和事务数据集的KeySelector接口:

private static class IdKeySelectorTransaction implements KeySelector<Tuple2<Integer, String>, Integer> {@Overridepublic Integer getKey(Tuple2<Integer, String> value) {return value.f0;}
}private static class IdKeySelectorAddress implements KeySelector<Tuple3<Integer, String, String>, Integer> {@Overridepublic Integer getKey(Tuple3<Integer, String, String> value) {return value.f0;}
}

每个选择器只返回应该执行联接的字段。不幸的是不能使用lambda表达式简化实现,应该Flink需要泛型类型信息。

接着使用选择器实现合并逻辑:

List<Tuple2<Tuple2<Integer, String>, Tuple3<Integer, String, String>>>joined = transactions.join(addresses).where(new IdKeySelectorTransaction()).equalTo(new IdKeySelectorAddress()).collect();assertThat(joined).hasSize(1);
assertThat(joined).contains(new Tuple2<>(firstTransaction, address));

排序

首先准备一些实例数据,Tuple2类型集合:

Tuple2<Integer, String> secondPerson = new Tuple2<>(4, "Tom");
Tuple2<Integer, String> thirdPerson = new Tuple2<>(5, "Scott");
Tuple2<Integer, String> fourthPerson = new Tuple2<>(200, "Michael");
Tuple2<Integer, String> firstPerson = new Tuple2<>(1, "Jack");
DataSet<Tuple2<Integer, String>> transactions = env.fromElements(fourthPerson, secondPerson, thirdPerson, firstPerson);

如何需要按Tuple2中第一个字段进行排序,需要使用sortPartition方法执行转换:

List<Tuple2<Integer, String>> sorted = transactions.sortPartition(new IdKeySelectorTransaction(), Order.ASCENDING).collect();assertThat(sorted).containsExactly(firstPerson, secondPerson, thirdPerson, fourthPerson);

经典示例

单词计数是现实大数据处理框架的经典示例,主要对数据文本的内容处理计算单词频数。本节提供Flink实现版本。首先创建LineSplitter 类分割输入为单词,收集每个单词的Tuple2类型(key-value), key即输入中发现的每个单词,value为常数1。

该类实现FlatMapFunction接口,它接收字符串作为输入,产生 Tuple2<String, Integer>作为输出:

public class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) {Stream.of(value.toLowerCase().split("\\W+")).filter(t -> t.length() > 0).forEach(token -> out.collect(new Tuple2<>(token, 1)));}
}

然后调用Collector类的collect方法,推送数据至处理流水线。接着按第一个元素(单词)对元组进行分组并执行sum聚集方法对元组的第二个元素进行求和计算单词的频数。

public static DataSet<Tuple2<String, Integer>> startWordCount(ExecutionEnvironment env, List<String> lines) throws Exception {DataSet<String> text = env.fromCollection(lines);return text.flatMap(new LineSplitter()).groupBy(0).aggregate(Aggregations.SUM, 1);
}

我们使用了三种Flink转换类型:flatMap(), groupBy() 和 aggregate()。下面写完整测试是否与期望一致:

List<String> lines = Arrays.asList("This is a first sentence","This is a second sentence with a one word");DataSet<Tuple2<String, Integer>> result = WordCount.startWordCount(env, lines);List<Tuple2<String, Integer>> collect = result.collect();assertThat(collect).containsExactlyInAnyOrder(new Tuple2<>("a", 3), new Tuple2<>("sentence", 2), new Tuple2<>("word", 1),new Tuple2<>("is", 2), new Tuple2<>("this", 2), new Tuple2<>("second", 1),new Tuple2<>("first", 1), new Tuple2<>("with", 1), new Tuple2<>("one", 1));

DataStream API 转换数据

创建DataStream

Apache Flink 通过DataStream API支持事件流处理。首先需要使用StreamExecutionEnvironment 类消费事件:

StreamExecutionEnvironment executionEnvironment= StreamExecutionEnvironment.getExecutionEnvironment();

接着使用executionEnvironment从不同来源创建事件流,它可以是消息总线,如Apache Kafka,但我们简单创建一组字符串元素:

DataStream<String> dataStream = executionEnvironment.fromElements("This is a first sentence", "This is a second sentence with a one word");

和DataSet类一样,可以对DataStream中的元素应用转换:

SingleOutputStreamOperator<String> upperCase = text.map(String::toUpperCase);

为了触发执行,需要执行sink操作,如print()方法,把转换结果打印至控制台,接着执行StreamExecutionEnvironment 类的execute方法:

upperCase.print();
env.execute();

程序会产生下面输出结果:

1> THIS IS A FIRST SENTENCE
2> THIS IS A SECOND SENTENCE WITH A ONE WORD

窗口事件

当实时处理事件流时,可能需要把一些事件分为组,基于这些事件窗口进行计算。
假设事件流中每个事件发送至我们系统中,其中包括事件量和时间戳。我们可以容许事件无序到达,但前提是它们的延迟不超过20秒。对于这种场景首先创建一个流来模拟两个相隔几分钟的事件,并定义一个时间戳提取器来指定延迟阈值:

SingleOutputStreamOperator<Tuple2<Integer, Long>> windowed= env.fromElements(new Tuple2<>(16, ZonedDateTime.now().plusMinutes(25).toInstant().getEpochSecond()),new Tuple2<>(15, ZonedDateTime.now().plusMinutes(2).toInstant().getEpochSecond())).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple2<Integer, Long>>(Time.seconds(20)) {@Overridepublic long extractTimestamp(Tuple2<Integer, Long> element) {return element.f1 * 1000;}});

接下来定义一个窗口操作,将事件分组到5秒的窗口中,并对这些事件应用转换:

SingleOutputStreamOperator<Tuple2<Integer, Long>> reduced = windowed.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))).maxBy(0, true);
reduced.print();

它将获得每5秒窗口的最后一个元素,因此它输出:

1> (15,1491221519)

请注意,我们没有看到第二个事件,因为它的到达时间晚于指定的延迟阈值。

总结

本文简要介绍了Apache Flink框架,并通过示例展示如何使用一些转换API,包括利用DataSet API实现单词频次计算,利用DataStream API 实现简单实时事件流转换。

Flink快速入门教程相关推荐

  1. BIML 101 - ETL数据清洗 系列 - BIML 快速入门教程 - 序

    BIML 101 - BIML 快速入门教程 做大数据的项目,最花时间的就是数据清洗. 没有一个相对可靠的数据,数据分析就是无木之舟,无水之源. 如果你已经进了ETL这个坑,而且预算有限,并且有大量的 ...

  2. ​HealthKit开发快速入门教程大学霸内部教程

    ​HealthKit开发快速入门教程大学霸内部教程 ​ ​ 国内第一本HealthKit专向教程.本教程详细讲解iOS中,如何使用HealthKit框架开发健康应用.最后,本教程结合HealthKit ...

  3. Apple Watch开发快速入门教程

     Apple Watch开发快速入门教程  试读下载地址:http://pan.baidu.com/s/1eQ8JdR0 介绍:苹果为Watch提供全新的开发框架WatchKit.本教程是国内第一本A ...

  4. 指示灯组与3个复位按钮的介绍Arduino Yun快速入门教程

    指示灯组与3个复位按钮的介绍Arduino Yun快速入门教程 ​1.4.2  指示灯组 指示灯组的放大图如图1.5所示. 图1.5  指示灯组 各个指示灯对应的功能如下: q  RX:对应于0号端口 ...

  5. 游戏控制杆OUYA游戏开发快速入门教程

    游戏控制杆OUYA游戏开发快速入门教程 1.2.2  游戏控制杆 游戏控制杆各个角度的视图,如图1-4所示,它的硬件规格是本文选自OUYA游戏开发快速入门教程大学霸: 图1-4  游戏控制杆各个角度的 ...

  6. Arduino Yun的主要部件介绍选自Arduino Yun快速入门教程

    Arduino Yun的主要部件介绍 1.4.1  主要部件 Yun的主要部件如图1.4所示. 图1.4  Arduino Yun的主要部件 在Yun小小的板子上集成了两颗处理器.一个是ATmega3 ...

  7. 认识AndEngine选自Android 2D游戏引擎AndEngine快速入门教程

    认识AndEngine什么是AndEngine 随着Android手机.平板的盛行,Android下的游戏也不断的变得火热.而对于游戏开发有兴趣的同学们,应该也想要学习开发游戏.虽说游戏开发的引擎较多 ...

  8. OUYA游戏开发快速入门教程1.2OUYA的硬件规格

    OUYA游戏开发快速入门教程1.2OUYA的硬件规格 从官网上购买回来的OUYA产品,包含游戏主机.游戏控制杆.说明书.电源线.HDMI线.电源线和电池,如图1-2所示.本节就来简要介绍下,游戏主机和 ...

  9. Android 2D游戏引擎AndEngine快速入门教程

    Android 2D游戏引擎AndEngine快速入门教程 介绍:AndEngine是一款知名的Android 2D游戏引擎.该引擎代码开源,并且可以免费使用.本书详细讲解如何使用AndEngine引 ...

最新文章

  1. 逻辑回归算法原理简介
  2. undefined reference to 'WinMain@16' 的四种情况
  3. UA MATH636 信息论9 Berlekamp-Welch算法
  4. shutter 无法设置快捷键的解决方法
  5. WebStorm 和 VsCode 的结合体来了!
  6. 接口实例(C#,IShape)【C#】
  7. esp8266时钟_ESP8266(Non-OS SDK) 驱动 waveshare 2.9 寸墨水屏(二)- 程序移植、修改与测试
  8. 一張表的數據導入到另一張表
  9. 防火墙、WAF、IPS、IDS都是什么
  10. 扩展欧几里德算法的定义、解释、证明及其应用
  11. D3DXIntersectTri 求三角形与射线相交
  12. Easy machine learning pipelines with pipelearner: intro and call for contributors
  13. 如何避免计算机被别人共享,win7如何防止别人偷窥电脑 win7防止别人偷窥电脑操作方法...
  14. (Excel)常用函数公式及操作技巧之六:汇总计算与统计(一)
  15. 香浓熵(Shannon)与冯诺伊曼熵(Von Neumann)
  16. 创业者、如何可以投资小,又可以依附互联网去创业?
  17. 学习Linux/Unix这么久了,你真的知道什么是终端吗?
  18. JS学习笔记 - Extends
  19. SQL必知必会(一)SQL基础篇
  20. 百度收录 百度收录有什么好方法吗

热门文章

  1. 揭开ESP8266神秘的面纱
  2. 判断一个人的年龄是否满足18岁
  3. HyperLedger FabricV2.3 Raft单机集群部署
  4. 少儿编程值得报班学习吗?别问了,程序员懵了
  5. Zemax学习笔记——多重结构使用方法
  6. 我的CSDN 2007 MVB 最有价值BLOG 奖杯碎了,心疼
  7. wps2019数据分析加载项_WPS加载项——集成创新 体验升级
  8. 超低噪声放大器DN-AP06
  9. 高科路由器有虚拟服务器设置吗,高科路由器怎么设置 高科无线路由器安装设置教程-路由器知识...
  10. Open-falcon 通过docker方式进行安装部署