目录

初识Flink

Flink设计理念

Flink的应用

Flink在企业中的应用

Flink的主要应用场景

流式数据处理的发展和演变

流处理和批处理

传统事务处理

有状态的流处理

Lambda 架构

新一代流处理器

Flink特性总结

Flink 的核心特性

分层API

Flink快速上手

创建项目

编写代码

批处理

流处理

流处理读取文本流


第一章 初识Flink

Flink 是 Apache 基金会旗下的一个开源大数据处理框架。目前,Flink 已经成为各大公司大数据实时处理的发力重点,特别是国内以阿里为代表的一众互联网大厂都在全力投入,为 Flink 社区贡献了大量源码。

Flink设计理念

具体定位是:Apache Flink 是一个框架和分布式处理引擎,如图 1-2 所示,用于对无界和有界数据流进行有状态计算。Flink 被设计在所有常见的集群环境中运行,以内存执行速度和任意规模来执行计算。

这里有很多专业词汇,我们从中至少可以提炼出一些容易理解的信息:Flink 是一个“框架”,是一个数据处理的“引擎”;既然是“分布式”,当然是为了应付大规模数据的应用场景了;另外,Flink 处理的是数据流。所以,Flink 是一个流式大数据处理引擎。 而“内存执行速度”和“任意规模”,突出了 Flink 的两个特点:速度快、可扩展性强——这说的自然就是小松鼠的“快速”和“灵巧”了。

Flink的应用

Flink 是一个大数据流处理引擎,它可以为不同的行业提供大数据实时处理的解决方案。随着 Flink 的快速发展完善,如今在世界范围许多公司都可以见到 Flink 的身影。

目前在全球范围内,北美、欧洲和金砖国家均是 Flink 的应用热门区域。当然,这些地区其实也就是 IT、互联网行业较发达的地区。

Flink 在国内热度尤其高,一方面是因为阿里的贡献和带头效应,另一方面也跟中国的应用场景密切相关。中国的人口规模与互联网使用普及程度,决定了对大数据处理的速度要求越来越高,也迫使中国的互联网企业去追逐更高的数据处理效率。试想在中国,一个网站可能要面对数亿的日活用户、每秒数亿次的计算峰值,这对很多国外的公司来说是无法想象的。而Flink 恰好给我们高速准确的处理海量流式数据提供了可能。

Flink在企业中的应用

Flink 为全球许多公司和企业的关键业务应用提供了强大的支持。

对于数据处理而言,任何行业、任何公司的需求其实都是一样的:数据规模大、实时性要求高、确保结果准确、方便扩展、故障后可恢复——而这些要求,作为新一代大数据流式处理引擎的 Flink 统统可以满足!这也正是 Flink 在全世界范围得到广泛应用的原因。

以下是 Flink 官网列出的知名企业用户,如图 1-3 所示,他们在生产环境中有各种各样有趣的应用。

Flink的主要应用场景

Flink是一个大数据流式处理引擎,处理的是流式数据,也就是“数据流”(Data Flow)。顾名思义,数据流的含义是,数据并不是收集好的,而是像水流一样,是一组有序的数据序列,逐个到来、逐个处理。由于数据来到之后就会被即刻处理,所以流处理的一大特点就是“快速”,也就是良好的实时性。Flink 适合的场景,其实也就是需要实时处理数据流的场景。

具体来看,一些行业中的典型应用有:

1.电商和市场营销

举例:实时数据报表、广告投放、实时推荐

在电商行业中,网站点击量是统计 PV、UV 的重要来源,也是如今“流量经济”的最主要数据指标。很多公司的营销策略,比如广告的投放,也是基于点击量来决定的。另外,在网站上提供给用户的实时推荐,往往也是基于当前用户的点击行为做出的。

网站获得的点击数据可能是连续且不均匀的,还可能在同一时间大量产生,这是典型的数据流。如果我们希望把它们全部收集起来,再去分析处理,就会面临很多问题:首先,我们需要很大的空间来存储数据;其次,收集数据的过程耗去了大量时间,统计分析结果的实时性就大大降低了;另外,分布式处理无法保证数据的顺序,如果我们只以数据进入系统的时间为准,可能导致最终结果计算错误。

我们需要的是直接处理数据流,而 Flink 就可以做到这一点。

2.物联网(IOT)

举例:传感器实时数据采集和显示、实时报警,交通运输业

物联网是流数据被普遍应用的领域。各种传感器不停获得测量数据,并将它们以流的形式传输至数据中心。而数据中心会将数据处理分析之后,得到运行状态或者报警信息,实时地显示在监控屏幕上。所以在物联网中,低延迟的数据传输和处理,以及准确的数据分析通常很关键。

交通运输业也体现了流处理的重要性。比如说,如今高铁运行主要就是依靠传感器检测数据,测量数据包括列车的速度和位置,以及轨道周边的状况。这些数据会从轨道传给列车,再从列车传到沿途的其他传感器;与此同时,数据报告也被发送回控制中心。因为列车处于高速行驶状态,因此数据处理的实时性要求是极高的。如果流数据没有被及时正确处理,调整意见和警告就不能相应产生,后果可能会非常严重。

3.物流配送和服务业

举例:订单状态实时更新、通知信息推送

在很多服务型应用中,都会涉及订单状态的更新和通知的推送。这些信息基于事件触发,不均匀地连续不断生成,处理之后需要及时传递给用户。这也是非常典型的数据流的处理。

4.银行和金融业

举例:实时结算和通知推送,实时检测异常行为

银行和金融业是另一个典型的应用行业。用户的交易行为是连续大量发生的,银行面对的是海量的流式数据。由于要处理的交易数据量太大,以前的银行是按天结算的,汇款一般都要隔天才能到账。所以有一个说法叫作“银行家工作时间”,说的就是银行家不仅不需要 996,甚至下午早早就下班了:因为银行需要早点关门进行结算,这样才能保证第二天营业之前算出准确的账。这显然不能满足我们快速交易的需求。在全球化经济中,能够提供 24 小时服务变得越来越重要。现在交易和报表都会快速准确地生成,我们跨行转账也可以做到瞬间到账,还可以接到实时的推送通知。这就需要我们能够实时处理数据流。

另外,信用卡欺诈的检测也需要及时的监控和报警。一些金融交易市场,对异常交易行为的及时检测可以更好地进行风险控制;还可以对异常登录进行检测,从而发现钓鱼式攻击,从而避免巨大的损失。

流式数据处理的发展和演变

我们已经了解,Flink 的主要应用场景,就是处理大规模的数据流。那为什么一定要用 Flink 呢?数据处理还有没有其他的方式?要解答这个疑惑,我们就需要先从流处理和批处理的概念讲起。

流处理和批处理

数据处理有不同的方式。

对于具体应用来说,有些场景数据是一个一个来的,是一组有序的数据序列,我们把它叫作“数据流”;而有些场景的数据,本身就是一批同时到来,是一个有限的数据集,这就是批量数据(有时也直接叫数据集)。

容易想到,处理数据流,当然应该“来一个就处理一个”,这种数据处理模式就叫作流处理;因为这种处理是即时的,所以也叫实时处理。与之对应,处理批量数据自然就应该一批读入、一起计算,这种方式就叫作批处理,也叫作离线处理。

那真实的应用场景中,到底是数据流更常见、还是批量数据更常见呢?

生活中,这两种形式的数据都有。比如我们日常发信息,可以一句一句地说,也可以写一大段一起发过去。一句一句的信息,就是一个一个的数据,它们构成的序列就是一个数据流;而一大段信息,是一组数据的集合,对应就是批量数据(数据集)。

当然,有经验的人都会知道,一句一句地发,你一言我一语,有来有往这才叫聊天;一大段信息直接砸过去,别人看着都眼晕,很容易就没下文了——如果是很重要的整篇内容(比如表白信),写成文档或者邮件发过去可能效果会更好。

所以我们看到,“聊天”这个生活场景,数据的生成、传递和接收处理,都是流式的;而 “写信”的场景,数据的生成尽管应该也是流式的(字总得一个个写),但我们可以把它们收集起来,统一传输、统一处理(当然我们还可以进一步较真:处理也是流式的,字得一个一个读)。

不论传输处理的方式是怎样的,数据的生成,一般都是流式的。

在 IT 应用场景中,这一点会体现得更加明显。企业的绝大多数应用程序,都是在不停地接收用户请求、记录用户行为和系统日志,或者持续接收采集到的状态信息。所以数据会在不同的时间持续生成,形成一个有序的数据序列——这就是典型的数据流。

所以流数据更真实地反映了我们的生活方式。真实场景中产生的,一般都是数据流。那处理数据流,就一定要用流处理的方式吗?

这个问题似乎问得有点无厘头。不过仔细一想就会发现,很多数据流的场景其实也可以用 “攒一批”的方式来处理。比如聊天,我们可以收到一条信息就回一条;也可以攒很多条一起回复。对于应用程序,也可以把要处理的数据先收集齐,然后才一并处理。

但是这样做的缺点也非常明显:数据处理不够及时,实时性变差了。流处理,是真正的即时处理,没有“攒批”的等待时间,所以会更快、实时性更好。

另外,在批处理的过程中,必须有一个固定的时间节点结束“攒批”的过程、开始计算。而数据流是连续不断、无休无止的,我们没有办法在某一时刻说:“好!现在收集齐所有数据了,我们可以开始分析了。”如果我们需要实现“持续计算”,就必须采用流处理的方式,来处理数据流。

很显然,对于流式数据,用流处理是最好、也最合理的方式。

但我们知道,传统的数据处理架构并不是这样。无论是关系型数据库、还是数据仓库,都倾向于先“收集数据”,然后再进行处理。为什么不直接用流处理的方式呢?这是因为,分布式批处理在架构上更容易实现。想想生活中发消息聊天的例子,我们就很容易理解了:如果来一条消息就立即处理,“微信秒回”,这样做一定会很受人欢迎;但是这要求自己必须时刻关注新消息,这会耗费大量精力,工作效率会受到很大影响。如果隔一段时间查一下新消息,做个 “批处理”,压力明显就小多了。当然,这样的代价就是可能无法及时处理有些消息,造成一定的后果。

想要弄清楚流处理的发展演变,我们先要了解传统的数据处理架构。

传统事务处理

IT 互联网公司往往会用不同的应用程序来处理各种业务。比如内部使用的企业资源规划(ERP)系统、客户关系管理(CRM)系统,还有面向客户的 Web 应用程序。这些系统一般都会进行分层设计:“计算层”就是应用程序本身,用于数据计算和处理;而“存储层”往往是传统的关系型数据库,用于数据存储,如图所示。

我们发现,这里的应用程序在处理数据的模式上有共同之处:接收的数据是持续生成的事件,比如用户的点击行为,客户下的订单,或者操作人员发出的请求。处理事件时,应用程序需要先读取远程数据库的状态,然后按照处理逻辑得到结果,将响应返回给用户,并更新数据库状态。一般来说,一个数据库系统可以服务于多个应用程序,它们有时会访问相同的数据库或表。

这就是传统的“事务处理”架构。系统所处理的连续不断的事件,其实就是一个数据流。而对于每一个事件,系统都在收到之后进行相应的处理,这也是符合流处理的原则的。所以可以说,传统的事务处理,就是最基本的流处理架构。

对于各种事件请求,事务处理的方式能够保证实时响应,好处是一目了然的。但是我们知道,这样的架构对表和数据库的设计要求很高;当数据规模越来越庞大、系统越来越复杂时,可能需要对表进行重构,而且一次联表查询也会花费大量的时间,甚至不能及时得到返回结果。于是,作为程序员就只好将更多的精力放在表的设计和重构,以及 SQL 的调优上,而无法专注于业务逻辑的实现了——我们都知道,这种工作费力费时,却没法直接体现在产品上给老板看,简直就是噩梦。

那有没有更合理、更高效的处理架构呢?

有状态的流处理

不难想到,如果我们对于事件流的处理非常简单,例如收到一条请求就返回一个“收到”,那就可以省去数据库的查询和更新了。但是这样的处理是没什么实际意义的。在现实的应用中,往往需要还其他一些额外数据。我们可以把需要的额外数据保存成一个“状态”,然后针对这条数据进行处理,并且更新状态。在传统架构中,这个状态就是保存在数据库里的。这就是所谓的“有状态的流处理”。

为了加快访问速度,我们可以直接将状态保存在本地内存,如图 1-6 所示。当应用收到一个新事件时,它可以从状态中读取数据,也可以更新状态。而当状态是从内存中读写的时候,这就和访问本地变量没什么区别了,实时性可以得到极大的提升。

另外,数据规模增大时,我们也不需要做重构,只需要构建分布式集群,各自在本地计算就可以了,可扩展性也变得更好。

因为采用的是一个分布式系统,所以还需要保护本地状态,防止在故障时数据丢失。我们可以定期地将应用状态的一致性检查点(checkpoint)存盘,写入远程的持久化存储,遇到故障时再去读取进行恢复,这样就保证了更好的容错性。

有状态的流处理是一种通用而且灵活的设计架构,可用于许多不同的场景。具体来说,有以下几种典型应用。

1.事件驱动型(Event-Driven)应用

事件驱动型应用是一类具有状态的应用,它从一个或多个事件流提取数据,并根据到来的事件触发计算、状态更新或其他外部动作。比较典型的就是以 Kafka 为代表的消息队列几乎都是事件驱动型应用。

这其实跟传统事务处理本质上是一样的,区别在于基于有状态流处理的事件驱动应用,不再需要查询远程数据库,而是在本地访问它们的数据,如图 1-7 所示,这样在吞吐量和延迟方面就可以有更好的性能。

另外远程持久性存储的检查点保证了应用可以从故障中恢复。检查点可以异步和增量地完成,因此对正常计算的影响非常小。

2.数据分析(Data Analysis)型应用

所谓的数据分析,就是从原始数据中提取信息和发掘规律。传统上,数据分析一般是先将数据复制到数据仓库(Data Warehouse),然后进行批量查询。如果数据有了更新,必须将最新数据添加到要分析的数据集中,然后重新运行查询或应用程序。

如今,Apache Hadoop 生态系统的组件,已经是许多企业大数据架构中不可或缺的组成部分。现在的做法一般是将大量数据(如日志文件)写入 Hadoop 的分布式文件系统(HDFS)、 S3 或 HBase 等批量存储数据库,以较低的成本进行大容量存储。然后可以通过 SQL-on-Hadoop 类的引擎查询和处理数据,比如大家熟悉的 Hive。这种处理方式,是典型的批处理,特点是可以处理海量数据,但实时性较差,所以也叫离线分析。

如果我们有了一个复杂的流处理引擎,数据分析其实也可以实时执行。流式查询或应用程序不是读取有限的数据集,而是接收实时事件流,不断生成和更新结果。结果要么写入外部数据库,要么作为内部状态进行维护。

Apache Flink 同事支持流式与批处理的数据分析应用,如图 1-8 所示。

与批处理分析相比,流处理分析最大的优势就是低延迟,真正实现了实时。另外,流处理不需要去单独考虑新数据的导入和处理,实时更新本来就是流处理的基本模式。当前企业对流式数据处理的一个热点应用就是实时数仓,很多公司正是基于 Flink 来实现的。

3. 数据管道(Data Pipeline)型应用

ETL 也就是数据的提取、转换、加载,是在存储系统之间转换和移动数据的常用方法。在数据分析的应用中,通常会定期触发 ETL 任务,将数据从事务数据库系统复制到分析数据库或数据仓库。

所谓数据管道的作用与 ETL 类似。它们可以转换和扩展数据,也可以在存储系统之间移动数据。不过如果我们用流处理架构来搭建数据管道,这些工作就可以连续运行,而不需要再去周期性触发了。比如,数据管道可以用来监控文件系统目录中的新文件,将数据写入事件日志。连续数据管道的明显优势是减少了将数据移动到目的地的延迟,而且更加通用,可以用于更多的场景。

有状态的流处理架构上其实并不复杂,很多用户基于这种思想开发出了自己的流处理系统,这就是第一代流处理器。Apache Storm 就是其中的代表。Storm 可以说是开源流处理的先锋,最早是由 Nathan Marz 和创业公司 BackType 的一个团队开发的,后来才成为 Apache 软件基金会下属的项目。Storm 提供了低延迟的流处理,但是它也为实时性付出了代价:很难实现高吞吐,而且无法保证结果的正确性。用更专业的话说,它并不能保证“精确一次”

(exactly-once);即便是它能够保证的一致性级别,开销也相当大。关于状态一致性和 exactly-once,我们会在后续的章节中展开讨论。

Lambda 架构

对于有状态的流处理,当数据越来越多时,我们必须用分布式的集群架构来获取更大的吞吐量。但是分布式架构会带来另一个问题:怎样保证数据处理的顺序是正确的呢?

对于批处理来说,这并不是一个问题。因为所有数据都已收集完毕,我们可以根据需要选择、排列数据,得到想要的结果。可如果我们采用“来一个处理一个”的流处理,就可能出现

“乱序”的现象:本来先发生的事件,因为分布处理的原因滞后了。怎么解决这个问题呢?以 Storm 为代表的第一代分布式开源流处理器,主要专注于具有毫秒延迟的事件处理,特点就是一个字“快”;而对于准确性和结果的一致性,是不提供内置支持的,因为结果有可能取决于到达事件的时间和顺序。另外,第一代流处理器通过检查点来保证容错性,但是故障恢复的时候,即使事件不会丢失,也有可能被重复处理——所以无法保证 exactly-once。

与批处理器相比,可以说第一代流处理器牺牲了结果的准确性,用来换取更低的延迟。而批处理器恰好反过来,牺牲了实时性,换取了结果的准确。

我们自然想到,如果可以让二者做个结合,不就可以同时提供快速和准确的结果了吗?正是基于这样的思想,Lambda 架构被设计出来。我们可以认为这是第二代流处理架构,但事实上,它只是第一代流处理器和批处理器的简单合并。

Lambda 架构主体是传统批处理架构的增强。它的“批处理层”(Batch Layer)就是由传统的批处理器和存储组成,而“实时层”(Speed Layer)则由低延迟的流处理器实现。数据到达之后,两层处理双管齐下,一方面由流处理器进行实时处理,另一方面写入批处理存储空间,等待批处理器批量计算。流处理器快速计算出一个近似结果,并将它们写入“流处理表”中。而批处理器会定期处理存储中的数据,将准确的结果写入批处理表,并从快速表中删除不准确的结果。最终,应用程序会合并快速表和批处理表中的结果,并展示出来。

Lambda 架构现在已经不再是最先进的,但仍在许多地方使用。它的优点非常明显,就是兼具了批处理器和第一代流处理器的特点,同时保证了低延迟和结果的准确性。而它的缺点同样非常明显。首先,Lambda 架构本身就很难建立和维护;而且,它需要我们对一个应用程序,做出两套语义上等效的逻辑实现,因为批处理和流处理是两套完全独立的系统,它们的 API 也完全不同。为了实现一个应用,付出了双倍的工作量,这对程序员显然不够友好。

新一代流处理器

之前的分布式流处理架构,都有明显的缺陷,人们也一直没有放弃对流处理器的改进和完善。终于,在原有流处理器的基础上,新一代分布式开源流处理器诞生了。为了与之前的系统区分,我们一般称之为第三代流处理器,代表当然就是 Flink。

第三代流处理器通过巧妙的设计,完美解决了乱序数据对结果正确性的影响。这一代系统还做到了精确一次(exactly-once)的一致性保障,是第一个具有一致性和准确结果的开源流处理器。另外,先前的流处理器仅能在高吞吐和低延迟中二选一,而新一代系统能够同时提供这两个特性。所以可以说,这一代流处理器仅凭一套系统就完成了 Lambda 架构两套系统的工作,它的出现使得 Lambda 架构黯然失色。

除了低延迟、容错和结果准确性之外,新一代流处理器还在不断添加新的功能,例如高可用的设置,以及与资源管理器(如 YARN 或 Kubernetes)的紧密集成等等。

Flink特性总结

Flink是第三代分布式流处理器

Flink 的核心特性

Flink区别于传统数据处理框架的特性如下:

- 高吞吐和低延迟。每秒处理数百万个事件,毫秒级延迟。

- 结果的准确性。Flink 提供了事件时间(event-time)和处理时间(processing-time)语义。对于乱序事件流,事件时间语义仍然能提供一致且准确的结果。

- 精确一次(exactly-once)的状态一致性保证。

- 可以连接到最常用的存储系统,如 Apache Kafka、Apache Cassandra、Elasticsearch、

JDBC、Kinesis 和(分布式)文件系统,如 HDFS 和 S3。

- 高可用。本身高可用的设置,加上与 K8s,YARN 和 Mesos 的紧密集成,再加上从故障中快速恢复和动态扩展任务的能力,Flink 能做到以极少的停机时间 7×24 全天候运行。

- 能够更新应用程序代码并将作业(jobs)迁移到不同的 Flink 集群,而不会丢失应用程序的状态。

分层API

除了以上特性外 Flink还是一个非常易于开发的框架,因为它拥有易于使用的分层API

最底层级的抽象仅仅提供了有状态流,它将处理函数(Process Function)嵌入到了 DataStream API 中。底层处理函数(Process Function)与 DataStream API 相集成,可以对某些操作进行抽象,它允许用户可以使用自定义状态处理来自一个或多个数据流的事件,且状态具有一致性和容错保证。除此之外,用户可以注册事件时间并处理时间回调,从而使程序可以处理复杂的计算。

实际上,大多数应用并不需要上述的底层抽象,而是直接针对核心 API(Core APIs) 进行编程,比如 DataStream API(用于处理有界或无界流数据)以及 DataSet API(用于处理有界数据集)。这些 API 为数据处理提供了通用的构建模块,比如由用户定义的多种形式的转换

(transformations)、连接(joins)、聚合(aggregations)、窗口(windows)操作等。DataSet API 为有界数据集提供了额外的支持,例如循环与迭代。这些 API 处理的数据类型以类(classes)的形式由各自的编程语言所表示。

Table API 是以表为中心的声明式编程,其中表在表达流数据时会动态变化。Table API 遵循关系模型:表有二维数据结构(schema)(类似于关系数据库中的表),同时 API 提供可比较的操作,例如 select、join、group-by、aggregate 等。

尽管 Table API 可以通过多种类型的用户自定义函数(UDF)进行扩展,仍不如核心 API 更具表达能力,但是使用起来代码量更少,更加简洁。除此之外,Table API 程序在执行之前会使用内置优化器进行优化。

我们可以在表与 DataStream/DataSet 之间无缝切换,以允许程序将 Table API DataStream 以及 DataSet 混合使用。

Flink 提供的最高层级的抽象是 SQL。这一层抽象在语法与表达能力上与 Table API 类似,但是是以 SQL 查询表达式的形式表现程序。SQL 抽象与 Table API 交互密切,同时 SQL 查询可以直接在 Table API 定义的表上执行。

目前 Flink SQL 和 Table API 还在开发完善的过程中,很多大厂都会二次开发符合自己需要的工具包。而 DataSet 作为批处理 API 实际应用较少,2020 年 12 月 8 日发布的新版本 1.12.0, 已经完全实现了真正的流批一体,DataSet API 已处于软性弃用(soft deprecated)的状态。用 Data Stream API 写好的一套代码, 即可以处理流数据, 也可以处理批数据,只需要设置不同的执行模式。这与之前版本处理有界流的方式是不一样的,Flink 已专门对批处理数据做了优化处理。

第二章 Flink快速上手

创建项目

1.创建工程

在idea中创建一个maven工程,将工程名命名为FlinkTutorial

2.添加相关依赖

 <properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.13.0</flink.version><java.version>1.8</java.version><scala.binary.version>2.12</scala.binary.version><slf4j.version>1.7.30</slf4j.version></properties><dependencies><!-- 引入Flink相关依赖--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId>        <artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- 引入日志管理相关依赖--><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-to-slf4j</artifactId><version>2.14.0</version></dependency></dependencies>

3.配置日志管理

在src/main/resource目录下添加log4j.properties文件,内容如下

log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

编写代码

搭好项目框架,接下来就是我们的核心工作——往里面填充代码。我们会用一个最简单的示例来说明 Flink 代码怎样编写:统计一段文字中,每个单词出现的频次。这就是传说中的

WordCount 程序——它是大数据领域非常经典的入门案例,地位等同于初学编程语言时的 Hello World。

我们的源码位于 src/main/java 目录下。首先新建一个包,命名为 com.atguigu.wc,在这个包下我们将编写 Flink 入门的 WordCount 程序。

我们已经知道,尽管 Flink 自身的定位是流式处理引擎,但它同样拥有批处理的能力。所以接下来,我们会针对不同的处理模式、不同的输入数据形式,分别讲述 WordCount 代码的实现。

批处理

对于批处理而言,输入的应该是收集好的数据集。这里我们将要统计的文字写入一个文档,然后读取文档处理数据

(1)在工厂根目录下创建input文件夹,创建文本文件words.txt

(2)在文本中添加文字,如:

hello world
hello flink
hello java

(3)新建java类 BatchWordCount,编写测试代码

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;//批处理
public class BatchWordCount {public static void main(String[] args) throws Exception {//创建执行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//从文件中读取数据DataSource<String> lineDataSource = env.readTextFile("input/words.txt");//将每行数据进行分词  然后转换成二元组类型FlatMapOperator<String, Tuple2<String, Long>> wordAndOneTuple = lineDataSource.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {//将一行的文本分词String[] words = line.split(" ");//将每个单词转换成二元组输出for (String word : words) {out.collect(Tuple2.of(word, 1L));}}).returns(Types.TUPLE(Types.STRING, Types.LONG));//按照word进行分组UnsortedGrouping<Tuple2<String, Long>> wordAndOneGroup = wordAndOneTuple.groupBy(0);//分组内进行聚合统计AggregateOperator<Tuple2<String, Long>> sum = wordAndOneGroup.sum(1);sum.print();}
}

代码说明和注意事项:

  • Flink 在执行应用程序前应该获取执行环境对象,也就是运行时上下文环境。

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

  • Flink 同时提供了 Java 和 Scala 两种语言的 API,有些类在两套 API 中名称是一样的。

所以在引入包时,如果有 Java 和 Scala 两种选择,要注意选用 Java 的包。

  • 直接调用执行环境的 readTextFile 方法,可以从文件中读取数据。
  • 我们的目标是将每个单词对应的个数统计出来,所以调用 flatmap 方法可以对一行文字进行分词转换。将文件中每一行文字拆分成单词后,要转换成(word,count)形式的二元组,初始 count 都为 1。returns 方法指定的返回数据类型 Tuple2,就是 Flink 自带的二元组数据类型。
  • 在分组时调用了 groupBy 方法,它不能使用分组选择器,只能采用位置索引或属性名

    称进行分组。

// 使用索引定位
dataStream.groupBy(0)
// 使用类属性名称
dataStream.groupBy("id")
  • 在分组之后调用 sum 方法进行聚合,同样只能指定聚合字段的位置索引或属性名称

(4)运行

可以看到,我们将文档中的所有单词的频次,全部统计出来,以二元组的形式在控制台打印输出了。

需要注意的是,这种代码的实现方式,是基于 DataSet API 的,也就是我们对数据的处理转换,是看作数据集来进行操作的。事实上 Flink 本身是流批统一的处理架构,批量的数据集本质上也是流,没有必要用两套不同的 API 来实现。所以从 Flink 1.12 开始,官方推荐的做法

是直接使用 DataStream API,在提交任务时通过将执行模式设为 BATCH 来进行批处理:

$ bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar

这样,DataSet API 就已经处于“软弃用”(soft deprecated)的状态,在实际应用中我们只要维护一套 DataStream API 就可以了。这里只是为了方便大家理解,我们依然用 DataSet API 做了批处理的实现。

流处理

我们已经知道,用 DataSet API 可以很容易地实现批处理;与之对应,流处理当然可以用 DataStream API 来实现。对于 Flink 而言,流才是整个处理逻辑的底层核心,所以流批统一之后的 DataStream API 更加强大,可以直接处理批处理和流处理的所有场景。

DataStream API 作为“数据流”的处理接口,又怎样处理批数据呢?

回忆一下上一章中我们讲到的 Flink 世界观。在 Flink 的视角里,一切数据都可以认为是流,流数据是无界流,而批数据则是有界流。所以批处理,其实就可以看作有界流的处理。

对于流而言,我们会在获取输入数据后立即处理,这个过程是连续不断的。当然,有时我们的输入数据可能会有尽头,这看起来似乎就成了一个有界流;但是它跟批处理是截然不同的

——在输入结束之前,我们依然会认为数据是无穷无尽的,处理的模式也仍旧是连续逐个处理。

下面我们就针对不同类型的输入数据源,用具体的代码来实现流处理。

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;import java.util.Arrays;public class BoundedStreamWordCount {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//读取文件DataStreamSource<String> lineDSS = env.readTextFile("input/words.txt");//转换数据格式SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDSS.flatMap((String line, Collector<String> words) -> {Arrays.stream(line.split(" ")).forEach(words::collect);}).returns(Types.STRING).map(word -> Tuple2.of(word, 1L)).returns(Types.TUPLE(Types.STRING, Types.LONG));//分组KeyedStream<Tuple2<String, Long>, String> wordAndOneKS = wordAndOne.keyBy(t -> t.f0);//求和SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneKS.sum(1);//打印sum.print();//执行env.execute();}
}

与批处理程序的不同:

  • 创建执行环境的不同,流处理程序使用的是 StreamExecutionEnvironment。
  • 每一步处理转换之后,得到的数据对象类型不同。
  • 分组操作调用的是 keyBy 方法,可以传入一个匿名函数作为键选择器(KeySelector),指定当前分组的 key 是什么。
  • 代码末尾需要调用 env 的 execute 方法,开始执行任务。

我们可以看到,这与批处理的结果是完全不同的。批处理针对每个单词,只会输出一个最终的统计个数;而在流处理的打印结果中,“hello”这个单词每出现一次,都会有一个频次统计数据输出。这就是流处理的特点,数据逐个处理,每来一条数据就会处理输出一次。我们通过打印结果,可以清晰地看到单词“hello”数量增长的过程。

看到这里大家可能又会有新的疑惑:我们读取文件,第一行应该是“hello flink”,怎么这里输出的第一个单词是“world”呢?每个输出的结果二元组,前面都有一个数字,这又是什么呢?

我们可以先做个简单的解释。Flink 是一个分布式处理引擎,所以我们的程序应该也是分布式运行的。在开发环境里,会通过多线程来模拟 Flink 集群运行。所以这里结果前的数字,其实就指示了本地执行的不同线程,对应着 Flink 运行时不同的并行资源。这样第一个乱序的问题也就解决了:既然是并行执行,不同线程的输出结果,自然也就无法保持输入的顺序了。

另外需要说明,这里显示的编号为 1~4,是由于运行电脑的 CPU 是 4 核,所以默认模拟的并行线程有 4 个。这段代码不同的运行环境,得到的结果会是不同的。关于 Flink 程序并行执行的数量,可以通过设定“并行度”(Parallelism)来进行配置,我们会在后续章节详细讲解这些内容。

流处理读取文本流

在实际的生产环境中,真正的数据流其实是无界的,有开始却没有结束,这就要求我们需要保持一个监听事件的状态,持续地处理捕获的数据。

为了模拟这种场景,我们就不再通过读取文件来获取数据了,而是监听数据发送端主机的指定端口,统计发送来的文本数据中出现过的单词的个数。具体实现上,我们只要对

BoundedStreamWordCount 代码中读取数据的步骤稍做修改,就可以实现对真正无界流的处理。

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;import java.util.Arrays;public class StreamWordCount {public static void main(String[] args) throws Exception {//创建流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//读取文本流DataStreamSource<String> lineDSS = env.socketTextStream("hadoop102", 7777);//转换数据格式SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDSS.flatMap((String line, Collector<String> words) -> {Arrays.stream(line.split(" ")).forEach(words::collect);}).returns(Types.STRING).map(word -> Tuple2.of(word, 1L)).returns(Types.TUPLE(Types.STRING, Types.LONG));//分组KeyedStream<Tuple2<String, Long>, String> wordAndOneKS = wordAndOne.keyBy(t -> t.f0);//求和SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneKS.sum(1);//打印sum.print();//执行env.execute();}
}

代码说明和注意事项:

  1. socket 文本流的读取需要配置两个参数:发送端主机名和端口号。这里代码中指定了主机“hadoop102”的 7777 端口作为发送数据的 socket 端口,读者可以根据测试环境自行配置。
  2. 在实际项目应用中,主机名和端口号这类信息往往可以通过配置文件,或者传入程序运行参数的方式来指定。
  3. socket文本流数据的发送,可以通过Linux系统自带的netcat工具进行模拟。

(2)在 Linux 环境的主机 hadoop102 上,执行下列命令,发送数据进行测试:

[atguigu@hadoop102 ~]$ nc -lk 7777

(3)启动 StreamWordCount 程序

我们会发现程序启动之后没有任何输出、也不会退出。这是正常的——因为 Flink 的流处理是事件驱动的,当前程序会一直处于监听状态,只有接收到数据才会执行任务、输出统计结果。

(4)从 hadoop102 发送数据:

hello flink hello world hello java

控制台输出结果:

我们会发现,输出的结果与之前读取文件的流处理非常相似。而且可以非常明显地看到,每输入一条数据,就有一次对应的输出。具体对应关系是:输入“hello flink”,就会输出两条统计结果(flink,1)和(hello,1);之后再输入“hello world”,同样会将 hello 和 world 的个数统计输出,hello 的个数会对应增长为 2。

Flink(初识Flink,快速上手)相关推荐

  1. 【Flink】Flink Serving 天池快速上手 【视频笔记】

    1.概述 未看完

  2. flink sql udf jar包_Flink 生态:一个案例快速上手 PyFlink

    简介: Flink 从 1.9.0 版本开始增加了对 Python 的支持(PyFlink),在刚刚发布的 Flink 1.10 中,PyFlink 添加了对 Python UDFs 的支持,现在可以 ...

  3. flink笔记1(初识 Flink)

    flink 一.初识 Flink 1.概念 2. Flink 的应用 (1)Flink 主要的应用场景 3.流式数据处理的发展和演变 (1)流处理和批处理 (2)传统事务处理 (3)有状态的流处理 ( ...

  4. Flink1.13学习_第 1 章 初识 Flink

    Flink 是 Apache 基金会旗下的一个开源大数据处理框架.目前,Flink 已经成为各大公司大数据实时处理的发力重点,特别是国内以阿里为代表的一众互联网大厂都在全力投入,为Flink 社区贡献 ...

  5. 初识Flink 原理介绍、发展由来。

    Apache Flink是什么? 在当代数据量激增的时代,各种业务场景都有大量的业务数据产生,对于这些不断产生的数据应该如何进行有效的处理,成为当下大多数公司所面临的问题.随着雅虎对hadoop的开源 ...

  6. 大数据基石-Hadoop3.x学习教程-Hadoop产品了解与快速上手

    大数据基石-Hadoop Hadoop3.x版本全系列教程 === 楼兰 === 文章目录 一.关于Hadoop 1.关于Hadoop产品 2.Hadoop课程内容 3.Hadoop的主要组件 二.H ...

  7. flink 写入到es_《从0到1学习Flink》—— Flink 写入数据到 Kafka

    前言 之前文章 <从0到1学习Flink>-- Flink 写入数据到 ElasticSearch 写了如何将 Kafka 中的数据存储到 ElasticSearch 中,里面其实就已经用 ...

  8. centos7安装flink集群_《从0到1学习Flink》—— Flink 写入数据到 Kafka

    前言 之前文章 <从0到1学习Flink>-- Flink 写入数据到 ElasticSearch 写了如何将 Kafka 中的数据存储到 ElasticSearch 中,里面其实就已经用 ...

  9. Python学习笔记---day02快速上手

    day02快速上手 课程目标:学习Python最基础的语法知识,可以用代码快速实现一些简单的功能 课程概要: 初识编码(密码本) 编程体验 输出 初识数据类型 变量 注释 输入 条件语句 1. 编码 ...

最新文章

  1. 【全网唯一】全网唯一能够跑通的,跑不通你来找我~用node.js完成微信支付下单功能,且只需要一个文件wxpay.js就解决业务流程的node.js程序
  2. bzoj2961 共点圆 (CDQ分治, 凸包)
  3. gocron - 定时任务管理系统
  4. 怎么在搭建Android开发环境?
  5. Elasticsearch之kopf插件安装之后的浏览详解
  6. 在IBM服务器安装Windows server 2012的心得
  7. 2021年全球营销趋势报告
  8. C++笔记-断言、静态断言、R转义符
  9. C#是一种垂死的语言吗?
  10. python小飞机程序
  11. xjad反编译遇到break MISSING_BLOCK_LABEL_365问题
  12. java案例2-7 随机抽取幸运观众
  13. 为什么从Java开发转测试?
  14. uniapp的分享到朋友圈和朋友(APP)
  15. Hadoop单节点设置
  16. java爬虫爬取B站弹幕
  17. oracle lms进程 内存,lms进程耗用大量内存
  18. PhotoShop CS6实现照片背景虚化效果
  19. 计算机网络有客户 服务器和对等模式,四种网络工作模式有:对等模式、客户/服务器模式以及( )、( ),...
  20. 通过Nginx访问静态页面

热门文章

  1. 多元回归函数regress的用法
  2. 社工小组 计算机小组活动,小组工作活动计划1
  3. mfc入门基础(四)对话框添加控件、创建对话框类和为对话框控件添加变量
  4. Python网络编程(OSI Socket)
  5. Android Excel(xls,xlsx)表格数据简单生成和读取尝试
  6. 测试之美(2)对测试的几点理解----谁是利益相关者?
  7. 六大学习趋势正重塑在线教育产业-网络线上教学
  8. 为什么计算机屏幕出现黄色,电脑为什么会出现显示器屏幕发黄
  9. 茁壮浏览器 android,傲游浏览器六一纯真献礼 过个别开生面的儿童节
  10. .net开发网站CMS博客框架