Flink DataStream API Programming Guide Flink DataStream API编程指南

DataStream programs in Flink are regular programs that implement transformations on data streams (e.g., filtering, updating state, defining windows, aggregating). The data streams are initially created from various sources (e.g., message queues, socket streams, files). Results are returned via sinks, which may for example write the data to files, or to standard output (for example the command line terminal). Flink programs run in a variety of contexts, standalone, or embedded in other programs. The execution can happen in a local JVM, or on clusters of many machines.

In order to create your own Flink DataStream program, we encourage you to start with anatomy of a Flink Program and gradually add your own stream transformations. The remaining sections act as references for additional operations and advanced features.
为了创建你自己的Flink DataStream程序,我们鼓励您从anatomy of a Flink Program章节开始,逐步添加自己的流转换。其余章节作为其他操作和高级功能的参考。

What is a DataStream? DataStream是什么?

The DataStream API gets its name from the special DataStream class that is used to represent a collection of data in a Flink program. You can think of them as immutable collections of data that can contain duplicates. This data can either be finite or unbounded, the API that you use to work on them is the same.
DataStream API从特殊的DataStream类(被用于表示Flink程序中的数据集合)中获取其名称。您可以将它们(DataStream)视为可以包含重复项的不可变数据集合。包含数据可以是有限的,也可以是无限的,用于处理它们的API是相同的。

A DataStream is similar to a regular Java Collection in terms of usage but is quite different in some key ways. They are immutable, meaning that once they are created you cannot add or remove elements. You can also not simply inspect the elements inside but only work on them using the DataStream API operations, which are also called transformations.
DataStream在使用方面类似于常规Java集合,但在某些关键方面有很大不同。它们是不可变的,这意味着一旦创建它们,就不能添加或删除元素,也不能简单地查看它们内部的元素,但可以使用DataStream API(也称为transformation)处理它们。

You can create an initial DataStream by adding a source in a Flink program. Then you can derive new streams from this and combine them by using API methods such as map, filter, and so on.

Anatomy of a Flink Program Flink程序剖析

Flink programs look like regular programs that transform DataStreams. Each program consists of the same basic parts:

  1. Obtain an execution environment, 获得执行环境,
  2. Load/create the initial data, 加载/创建初始数据,
  3. Specify transformations on this data, 指定对该数据的转换,
  4. Specify where to put the results of your computations, 指定将计算结果放在哪里,
  5. Trigger the program execution 触发程序执行

We will now give an overview of each of those steps, please refer to the respective sections for more details. Note that all core classes of the Java DataStream API can be found in org.apache.flink.streaming.api .
我们现在将概述这些步骤中的每一步,请参阅相应的章节以了解更多详细信息。注意,可以在org.apache.flink.streaming.api中找到Java DataStream API的所有核心类。

The StreamExecutionEnvironment is the basis for all Flink programs. You can obtain one using these static methods on StreamExecutionEnvironment:

getExecutionEnvironment();createLocalEnvironment();createRemoteEnvironment(String host, int port, String... jarFiles);

Typically, you only need to use getExecutionEnvironment(), since this will do the right thing depending on the context: if you are executing your program inside an IDE or as a regular Java program it will create a local environment that will execute your program on your local machine. If you created a JAR file from your program, and invoke it through the command line, the Flink cluster manager will execute your main method and getExecutionEnvironment() will return an execution environment for executing your program on a cluster.

For specifying data sources the execution environment has several methods to read from files using various methods: you can just read them line by line, as CSV files, or using any of the other provided sources. To just read a text file as a sequence of lines, you can use:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> text = env.readTextFile("file:///path/to/file");

This will give you a DataStream on which you can then apply transformations to create new derived DataStreams.

You apply transformations by calling methods on DataStream with a transformation functions. For example, a map transformation looks like this:

DataStream<String> input = ...;DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() {@Overridepublic Integer map(String value) {return Integer.parseInt(value);}

This will create a new DataStream by converting every String in the original collection to an Integer.

Once you have a DataStream containing your final results, you can write it to an outside system by creating a sink. These are just some example methods for creating a sink:

writeAsText(String path);print();

Once you specified the complete program you need to trigger the program execution by calling execute() on the StreamExecutionEnvironment. Depending on the type of the ExecutionEnvironment the execution will be triggered on your local machine or submit your program for execution on a cluster.

The execute() method will wait for the job to finish and then return a JobExecutionResult, this contains execution times and accumulator results.

If you don’t want to wait for the job to finish, you can trigger asynchronous job execution by calling executeAsync() on the StreamExecutionEnvironment. It will return a JobClient with which you can communicate with the job you just submitted. For instance, here is how to implement the semantics of execute() by using executeAsync().

final JobClient jobClient = env.executeAsync();final JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult().get();

That last part about program execution is crucial to understanding when and how Flink operations are executed. All Flink programs are executed lazily: When the program’s main method is executed, the data loading and transformations do not happen directly. Rather, each operation is created and added to a dataflow graph. The operations are actually executed when the execution is explicitly triggered by an execute() call on the execution environment. Whether the program is executed locally or on a cluster depends on the type of execution environment.

The lazy evaluation lets you construct sophisticated programs that Flink executes as one holistically planned unit.

Example Program 示例程序

The following program is a complete, working example of streaming window word count application, that counts the words coming from a web socket in 5 second windows. You can copy & paste the code to run it locally.

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;public class WindowWordCount {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Tuple2<String, Integer>> dataStream = env.socketTextStream("localhost", 9999).flatMap(new Splitter()).keyBy(value -> value.f0).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum(1);dataStream.print();env.execute("Window WordCount");}public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {for (String word: sentence.split(" ")) {out.collect(new Tuple2<String, Integer>(word, 1));}}}}

To run the example program, start the input stream with netcat first from a terminal:

nc -lk 9999

Just type some words hitting return for a new word. These will be the input to the word count program. If you want to see counts greater than 1, type the same word again and again within 5 seconds (increase the window size from 5 seconds if you cannot type that fast ☺).

Data Sources 数据源

Sources are where your program reads its input from. You can attach a source to your program by using StreamExecutionEnvironment.addSource(sourceFunction). Flink comes with a number of pre-implemented source functions, but you can always write your own custom sources by implementing the SourceFunction for non-parallel sources, or by implementing the ParallelSourceFunction interface or extending the RichParallelSourceFunction for parallel sources.

There are several predefined stream sources accessible from the StreamExecutionEnvironment:

File-based: 基于文件的

  • readTextFile(path) - Reads text files, i.e. files that respect the TextInputFormat specification, line-by-line and returns them as Strings.
    readTextFile(path) - 逐行读取符合TextInputFormat规范的文本文件,并将其作为字符串返回

  • readFile(fileInputFormat, path) - Reads (once) files as dictated by the specified file input format.
    readFile(fileInputFormat,path) - 根据指定的文件输入格式读取(一次)文件

  • readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - This is the method called internally by the two previous ones. It reads files in the path based on the given fileInputFormat. Depending on the provided watchType, this source may periodically monitor (every interval ms) the path for new data (FileProcessingMode.PROCESS_CONTINUOUSLY), or process once the data currently in the path and exit (FileProcessingMode.PROCESS_ONCE). Using the pathFilter, the user can further exclude files from being processed.
    readFile(fileInputFormat、path、watchType、interval、pathFilter、typeInfo) - 这是前两个方法在内部调用的方法。它根据给定的fileInputFormat读取path中的文件。根据提供的watchType,此source可能会定期监视(每间隔ms)指定路径中的新数据(FileProcessingMode.PROCESS_CONTINUOUSLY),或处理当前路径中的数据一次,然后退出(FileProcessingMode.PROCESS_ONCE)。使用pathFilter,用户可以进一步排除正在处理的文件。


Under the hood, Flink splits the file reading process into two sub-tasks, namely directory monitoring and data reading. Each of these sub-tasks is implemented by a separate entity. Monitoring is implemented by a single, non-parallel (parallelism = 1) task, while reading is performed by multiple tasks running in parallel. The parallelism of the latter is equal to the job parallelism. The role of the single monitoring task is to scan the directory (periodically or only once depending on the watchType), find the files to be processed, divide them in splits, and assign these splits to the downstream readers. The readers are the ones who will read the actual data. Each split is read by only one reader, while a reader can read multiple splits, one-by-one.


  1. If the watchType is set to FileProcessingMode.PROCESS_CONTINUOUSLY, when a file is modified, its contents are re-processed entirely. This can break the “exactly-once” semantics, as appending data at the end of a file will lead to all its contents being re-processed.
  2. If the watchType is set to FileProcessingMode.PROCESS_ONCE, the source scans the path once and exits, without waiting for the readers to finish reading the file contents. Of course the readers will continue reading until all file contents are read. Closing the source leads to no more checkpoints after that point. This may lead to slower recovery after a node failure, as the job will resume reading from the last checkpoint.

Socket-based: 基于套接字的

  • socketTextStream - Reads from a socket. Elements can be separated by a delimiter.
    socketTextStream - 从套接字读取。元素可以用分隔符分隔。

Collection-based: 基于集合的

  • fromCollection(Collection) - Creates a data stream from the Java Java.util.Collection. All elements in the collection must be of the same type.
    fromCollection(Collection) - 从Java Java.util.Collection创建数据流。集合中的所有元素必须为同一类型。

  • fromCollection(Iterator, Class) - Creates a data stream from an iterator. The class specifies the data type of the elements returned by the iterator.
    fromCollection(Iterator, Class) - 从迭代器创建数据流。class指定迭代器返回的元素的数据类型。

  • fromElements(T …) - Creates a data stream from the given sequence of objects. All objects must be of the same type.
    fromElements(T …) - 从给定的对象序列创建数据流。所有对象必须是同一类型。

  • fromParallelCollection(SplittableIterator, Class) - Creates a data stream from an iterator, in parallel. The class specifies the data type of the elements returned by the iterator.
    fromParallelCollection(SplittableIterator, Class) - 从迭代器创建并行数据流。class指定迭代器返回的元素的数据类型。

  • generateSequence(from, to) - Generates the sequence of numbers in the given interval, in parallel.
    generateSequence(from, to) - 以给定范围的数字生成并行序列。

Custom: 用户自定义的

  • addSource - Attach a new source function. For example, to read from Apache Kafka you can use addSource(new FlinkKafkaConsumer<>(…)). See connectors for more details.
    addSource - 附加一个新的source函数。例如,要从Apache Kafka中读取数据,可以使用addSource(new FlinkKafkaConsumer<>(…))。有关详细信息,请参见连接器。

DataStream Transformations DataStream转换

Please see operators for an overview of the available stream transformations.

Data Sinks 数据接收器

Data sinks consume DataStreams and forward them to files, sockets, external systems, or print them. Flink comes with a variety of built-in output formats that are encapsulated behind operations on the DataStreams:

  • writeAsText() / TextOutputFormat - Writes elements line-wise as Strings. The Strings are obtained by calling the toString() method of each element.
    writeAsText() / TextOutputFormat - 将元素以字符串类型逐行写出。字符串是通过调用每个元素的toString()方法获得的。

  • writeAsCsv(…) / CsvOutputFormat - Writes tuples as comma-separated value files. Row and field delimiters are configurable. The value for each field comes from the toString() method of the objects.
    writeAsCsv(…) / CsvOutputFormat - 将元组的值以逗号分隔写入文件。行和字段分隔符是可配置的。每个字段的值来自对象的toString()方法。

  • print() / printToErr() - Prints the toString() value of each element on the standard out / standard error stream. Optionally, a prefix (msg) can be provided which is prepended to the output. This can help to distinguish between different calls to print. If the parallelism is greater than 1, the output will also be prepended with the identifier of the task which produced the output.
    print() / printToErr() - 在标准输出/标准错误流中打印每个元素的toString()值。可选地,可以在输出之前提供前缀(msg) 。这有助于区分不同的打印调用。如果并行度大于1,则输出将会以生成输出的任务的编号作为前缀。

  • writeUsingOutputFormat() / FileOutputFormat - Method and base class for custom file outputs. Supports custom object-to-bytes conversion.
    writeUsingOutputFormat() / FileOutputFormat - 自定义文件输出的方法和基类。支持自定义对象到字节的转换。

  • writeToSocket - Writes elements to a socket according to a SerializationSchema
    writeToSocket - 根据SerializationSchema将元素写入socket

  • addSink - Invokes a custom sink function. Flink comes bundled with connectors to other systems (such as Apache Kafka) that are implemented as sink functions.
    addSink - 调用自定义sink函数。Flink与其他系统(如Apache Kafka)的连接器捆绑在一起,这些连接器被实现为sink函数。

Note that the write*() methods on DataStream are mainly intended for debugging purposes. They are not participating in Flink’s checkpointing, this means these functions usually have at-least-once semantics. The data flushing to the target system depends on the implementation of the OutputFormat. This means that not all elements send to the OutputFormat are immediately showing up in the target system. Also, in failure cases, those records might be lost.

For reliable, exactly-once delivery of a stream into a file system, use the StreamingFileSink. Also, custom implementations through the .addSink(…) method can participate in Flink’s checkpointing for exactly-once semantics.

Iterations 迭代

Iterative streaming programs implement a step function and embed it into an IterativeStream. As a DataStream program may never finish, there is no maximum number of iterations. Instead, you need to specify which part of the stream is fed back to the iteration and which part is forwarded downstream using a side output or a filter. Here, we show an example using filters. First, we define an IterativeStream

IterativeStream<Integer> iteration = input.iterate();

Then, we specify the logic that will be executed inside the loop using a series of transformations (here a simple map transformation)

DataStream<Integer> iterationBody = iteration.map(/* this is executed many times */);

To close an iteration and define the iteration tail, call the closeWith(feedbackStream) method of the IterativeStream. The DataStream given to the closeWith function will be fed back to the iteration head. A common pattern is to use a filter to separate the part of the stream that is fed back, and the part of the stream which is propagated forward. These filters can, e.g., define the “termination” logic, where an element is allowed to propagate downstream rather than being fed back.

iteration.closeWith(iterationBody.filter(/* one part of the stream */));
DataStream<Integer> output = iterationBody.filter(/* some other part of the stream */);

For example, here is program that continuously subtracts 1 from a series of integers until they reach zero:

DataStream<Long> someIntegers = env.generateSequence(0, 1000);IterativeStream<Long> iteration = someIntegers.iterate();//定义迭代逻辑
DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() {@Overridepublic Long map(Long value) throws Exception {return value - 1 ;}
DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() {@Overridepublic boolean filter(Long value) throws Exception {return (value > 0);}
DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() {@Overridepublic boolean filter(Long value) throws Exception {return (value <= 0);}

Execution Parameters 执行参数

The StreamExecutionEnvironment contains the ExecutionConfig which allows to set job specific configuration values for the runtime.

Please refer to execution configuration for an explanation of most parameters. These parameters pertain specifically to the DataStream API:
有关大多数参数的说明,请参阅执行配置。这些参数特别适用于DataStream API:

  • setAutoWatermarkInterval(long milliseconds): Set the interval for automatic watermark emission. You can get the current value with long getAutoWatermarkInterval()
    setAutoWatermarkInterval(long milliseconds):设置自动水印发射的间隔。可以使用getAutoWatermarkInterval()获取当前值

Fault Tolerance 容错

State & Checkpointing describes how to enable and configure Flink’s checkpointing mechanism.

Controlling Latency 控制延迟

By default, elements are not transferred on the network one-by-one (which would cause unnecessary network traffic) but are buffered. The size of the buffers (which are actually transferred between machines) can be set in the Flink config files. While this method is good for optimizing throughput, it can cause latency issues when the incoming stream is not fast enough. To control throughput and latency, you can use env.setBufferTimeout(timeoutMillis) on the execution environment (or on individual operators) to set a maximum wait time for the buffers to fill up. After this time, the buffers are sent automatically even if they are not full. The default value for this timeout is 100 ms.


LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setBufferTimeout(timeoutMillis);env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);

To maximize throughput, set setBufferTimeout(-1) which will remove the timeout and buffers will only be flushed when they are full. To minimize latency, set the timeout to a value close to 0 (for example 5 or 10 ms). A buffer timeout of 0 should be avoided, because it can cause severe performance degradation.

Debugging 调试

Before running a streaming program in a distributed cluster, it is a good idea to make sure that the implemented algorithm works as desired. Hence, implementing data analysis programs is usually an incremental process of checking results, debugging, and improving.

Flink provides features to significantly ease the development process of data analysis programs by supporting local debugging from within an IDE, injection of test data, and collection of result data. This section give some hints how to ease the development of Flink programs.

Local Execution Environment 本地执行环境

A LocalStreamEnvironment starts a Flink system within the same JVM process it was created in. If you start the LocalEnvironment from an IDE, you can set breakpoints in your code and easily debug your program.

A LocalEnvironment is created and used as follows:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();DataStream<String> lines = env.addSource(/* some source */);
// build your programenv.execute();

Collection Data Sources 集合数据源

Flink provides special data sources which are backed by Java collections to ease testing. Once a program has been tested, the sources and sinks can be easily replaced by sources and sinks that read from / write to external systems.

Collection data sources can be used as follows:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();// Create a DataStream from a list of elements
DataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);// Create a DataStream from any Java collection
List<Tuple2<String, Integer>> data = ...
DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data);// Create a DataStream from an Iterator
Iterator<Long> longIt = ...;
DataStream<Long> myLongs = env.fromCollection(longIt, Long.class);

Note: Currently, the collection data source requires that data types and iterators implement Serializable. Furthermore, collection data sources can not be executed in parallel ( parallelism = 1).

Iterator Data Sink 迭代数据Sink

Flink also provides a sink to collect DataStream results for testing and debugging purposes. It can be used as follows:

import org.apache.flink.streaming.experimental.DataStreamUtils;DataStream<Tuple2<String, Integer>> myResult = ...;
Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult);

Where to go next? 下一步去哪里?

  • Operators: Specification of available streaming operators.
    Operators: 可用流式operators的规范
  • Event Time: Introduction to Flink’s notion of time.
  • State & Fault Tolerance: Explanation of how to develop stateful applications.
  • Connectors: Description of available input and output connectors.

