Flink 的流数据 API 编程指南

Flink 的流数据处理程序是常规的程序 ,通过再流数据上,实现了各种转换 (比如 过滤, 更新中间状态, 定义窗口, 聚合)。流数据可以来之多种数据源 (比如, 消息队列, socket 流, 文件). 通过sink组件落地流计算的最终结果,比如可以把数据落地文件系统,标准输出流比如命令行界面, Flink 的程序可以运行在多种上下文环境 ,可以单独只是Flink api,也可以嵌入其他程序. execution可以运行在本地的 JVM里, 也可以 运行在多台机器的集群中。为了创建你的流数据处理程序,,我们建议您从程序骨架开始,然后逐步添加您自己的transformations。后面的章节,是一些附加的操作,和一些高级功能。

  • 例子程序
  • 连接Flink
  • 程序骨架
  • 流数据抽象
  • 延迟计算
  • 转换
    • 物理分区
    • 任务链和资源组
  • 指定key
  • 提交函数给Flink
  • 数据类型
  • 数据源
  • Execution配置
  • 数据Sinks组件
  • 调试
    • 本地运行环境
    • 集合框架数据源
  • 窗口
    • 时间窗口
    • 指定过key的流数据的窗口
    • 为指定过key的流数据的窗口
  • Execution参数
    • 故障容错机制
    • 并行度
    • 延迟控制
  • 带状态的流数据计算
    • Checkpointing本地变量
    • 使用Key/Value的状态接口
    • 有状态的源的函数
    • 在迭代的任务中checkpoint状态
  • 迭代计算
  • 连接器
    • Apache Kafka
    • Elasticsearch
    • Hadoop FileSystem
    • RabbitMQ
    • Twitter Streaming API
    • Docker containers for connectors
  • 打包程序和分布式运行
  • 并行执行
  • 执行计划

例子程序

下面是一个可运行的完整的例子  ,带窗口的流数据wordcount程序, 数据源来自一个每5秒一次的socket. 你可以复制黏贴并本地运行.

public class WindowWordCount {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Tuple2<String, Integer>> dataStream = env.socketTextStream("localhost", 9999).flatMap(new Splitter()).keyBy(0).timeWindow(Time.of(5, TimeUnit.SECONDS)).sum(1);dataStream.print();env.execute("Window WordCount");}public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {for (String word: sentence.split(" ")) {out.collect(new Tuple2<String, Integer>(word, 1));}}}}

object WindowWordCount {def main(args: Array[String]) {val env = StreamExecutionEnvironment.getExecutionEnvironmentval text = env.socketTextStream("localhost", 9999)val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }.map { (_, 1) }.keyBy(0).timeWindow(Time.of(5, TimeUnit.SECONDS)).sum(1)counts.printenv.execute("Window Stream WordCount")}
}

为了运行这个程序, 启动netcat在terminal中,敲下面这段:

nc -lk 9999

随便敲几个字. 这几个字将作为例子程序的输入. 如果你在5秒内,按了重复的字符,他们的count将会超过1. (如果你敲得不够快,可以提高5秒这个设置☺).

Back to top

为了可以写Flink的代码, 你需要导入对应的语言的DataStream 的依赖包到你的工程里。

最简单的做法是使用quickstart 脚本: either for Java or for Scala. 你可以从模板中创建一个空工程 (a Maven Archetype), 这个工程已经准备好了一切编程所需的一切了.通过敲下面的代码,你可使用archetype 来手工的创建一个工程:

mvn archetype:generate /-DarchetypeGroupId=org.apache.flink/-DarchetypeArtifactId=flink-quickstart-java /-DarchetypeVersion=1.0-SNAPSHOT

mvn archetype:generate /-DarchetypeGroupId=org.apache.flink/-DarchetypeArtifactId=flink-quickstart-scala /-DarchetypeVersion=1.0-SNAPSHOT

这个archetypes依赖,稳定版本或者当前的版本(-SNAPSHOT).

如果你想为一个存在的maven工程添加Flink,在你的pom文件里添加下面这段依赖。

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>1.0-SNAPSHOT</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.0-SNAPSHOT</version>
</dependency>

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala</artifactId><version>1.0-SNAPSHOT</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.0-SNAPSHOT</version>
</dependency>

程序骨架

就像例子程序里看到的, Flink 的流数据编程就像大多数java程序一样的,是一个带main函数一样的java程序 . 每个程序由相同的基本部分组成:

  1. 获取一个流数据的 ExecutionEnvironment,
  2. 链接流数据的数据源,
  3. 定义流数据上的transformation,
  4. 定义处理完的数据的输出,
  5. 开始执行.

我们现在会对每一个步骤一个概述,,请参阅有各个部分的关详细信息的。

StreamExecutionEnvironment 是所有 Flink 流数据程序的基础. 你可以通过treamExecutionEnvironment这个类的任意一个静态方法获取 :

getExecutionEnvironment()createLocalEnvironment()
createLocalEnvironment(int parallelism)
createLocalEnvironment(int parallelism, Configuration customConfiguration)createRemoteEnvironment(String host, int port, String... jarFiles)
createRemoteEnvironment(String host, int port, int parallelism, String... jarFiles)

一般来说,你只需要使用 getExecutionEnvironment(), 因为这个方法会根据环境的上下文获取正确的对象: 如果你一般java程序一样,在IDE里执行你的程序,它会创建一个本地environment ,用来在本地机器上执行你的程序。如果你的程序打包成jar, 然后通过命令行或者web界面调用这个jar, Flink 的cluster manager 会执行你的主函数,此时getExecutionEnvironment()会返回一个集群环境的execution来执行你的程序。

environment 有多个方法,可以用来定义不同的数据源 。包括文件系统, sockets, and 和外部系统. 你可以调用下面的代码来获取socket里的数据源,用来调试用:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> lines = env.socketTextStream("localhost", 9999)

这行代码会返回一个DataStream ,你可以在这个对象使用transformations. 更多的数据源相关的,可以阅读数据源那一章。

一旦获取了DataStream 对象,你可以调用transformations ,来创建一个新的DataStream。 然后你还可以写回socket, 或者继续调用transform , 或者和其他的DataStreams结合, 或者 把数据落地到其他外部系统(比如, 消息队列或者文件系统). 你可以通过调用DataStream的方法来调用各种不同的transformation。 并嵌入你自定义的函数到transformation中。举个例子, map transformation大概是像是下面这样的:

DataStream<String> input = ...;DataStream<Integer> intValues = input.map(new MapFunction<String, Integer>() {@Overridepublic Integer map(String value) {return Integer.parseInt(value);}
});

这段代码会产生一个新的DataStream ,并将原始流上的String类型的数据转换成Integer类型。更多的详细内容可以看transformation这一章。

一旦你有的一个含有你最终计算结果的DataStream, 你可以把结果落地到外部系统 (比如HDFS, Kafka, Elasticsearch), 或者写回socket, 或者写入到文件系统, 或者打印出来.

writeAsText(String path, ...)
writeAsCsv(String path, ...)
writeToSocket(String hostname, int port, ...)print()addSink(...)

一旦你编写好转换和落地等操作,你需要通过调用execute() 来触发程序开始执行,具体的执行方式依赖具体的StreamExecutionEnvironment. 这个方法会再本地机器上执行,也可能在集群上提交这个程序。

env.execute();

下面除了例子是scala编写的其他和上面一样

As presented in the example, Flink DataStream programs look like regular Scala programs with a main() method. Each program consists of the same basic parts:

  1. Obtaining a StreamExecutionEnvironment,
  2. Connecting to data stream sources,
  3. Specifying transformations on the data streams,
  4. Specifying output for the processed data,
  5. Executing the program.

We will now give an overview of each of those steps, please refer to the respective sections for more details.

The StreamExecutionEnvironment is the basis for all Flink DataStream programs. You can obtain one using these static methods on classStreamExecutionEnvironment:

def getExecutionEnvironmentdef createLocalEnvironment(parallelism: Int =  Runtime.getRuntime.availableProcessors())def createRemoteEnvironment(host: String, port: Int, jarFiles: String*)
def createRemoteEnvironment(host: String, port: Int, parallelism: Int, jarFiles: String*)

Typically, you only need to use getExecutionEnvironment, since this will do the right thing depending on the context: if you are executing your program inside an IDE or as a regular Java program it will create a local environment that will execute your program on your local machine. If you created a JAR file from your program, and invoke it through the command line or the web interface, the Flink cluster manager will execute your main method and getExecutionEnvironment() will return an execution environment for executing your program on a cluster.

For specifying data sources the execution environment has several methods to read from files, sockets, and external systems using various methods. To just read data from a socket (useful also for debugging), you can use:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironmentDataStream<String> lines = env.socketTextStream("localhost", 9999)

This will give you a DataStream on which you can then apply transformations. For more information on data sources and input formats, please refer to Data Sources.

Once you have a DataStream you can apply transformations to create a new DataStream which you can then write to a file, transform again, combine with other DataStreams, or push to an external system. You apply transformations by calling methods on DataStream with your own custom transformation function. For example, a map transformation looks like this:

val input: DataStream[String] = ...val mapped = input.map { x => x.toInt }

This will create a new DataStream by converting every String in the original set to an Integer. For more information and a list of all the transformations, please refer to Transformations.

Once you have a DataStream containing your final results, you can push the result to an external system (HDFS, Kafka, Elasticsearch), write it to a socket, write to a file, or print it.

writeAsText(path: String, ...)
writeAsCsv(path: String, ...)
writeToSocket(hostname: String, port: Int, ...)print()addSink(...)

Once you specified the complete program you need to trigger the program execution by calling execute on StreamExecutionEnvironment. This will either execute on the local machine or submit the program for execution on a cluster, depending on the chosen execution environment.

env.execute()

Back to top

流数据抽象

流数据是一种相同类型的无限的数据集合。

Transformations会返回不同子类类型的of DataStream,并且转换后的还可以继续transformations,比如 keyBy(…) 方法会返回KeyedDataStream ,这个也是流数据, 通过一个 key在本地做分区的流数据。还可以进行窗口操作。

Back to top

延迟执行

Flink的所有流数据程序是延迟执的。当main函数执行后, 数据加载和转换不是立刻执行的,相反的,每一步操作会加入一个执行计划.。直到evn执行execute方法来启动程序,这个执行计划才会执行。不论是本地执行还是在集群上执行.

延迟执行让你可以构建复杂的程序,并且让flink执行起来,像是个完整的计划单元。

Transformations

数据transformation让流数据产生新的流数据,. 程序可以结合多个流数据来构建复杂的应用拓扑结构。

这章给出了所有可用的transformations的详细说明。

Transformation 详细说明
Map
DataStream → DataStream

取一个元素并产生一个元素(一进对一出的意思)。下面的例子是一个map函数,该函数将输入流的值加倍:

DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {@Overridepublic Integer map(Integer value) throws Exception {return 2 * value;}
});

FlatMap
DataStream → DataStream

需要一个元素,并产生一个零,一个或多个元素(void返回值,对返回无要求,依赖out如何发送)。下面的例子是一个flatmap功能拆分句子的话:

dataStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out)throws Exception {for(String word: value.split(" ")){out.collect(word);}}
});

Filter
DataStream → DataStream

对每个元素执行boolean函数,过滤掉false的值。下面的例子是滤出零值的过滤器:

dataStream.filter(new FilterFunction<Integer>() {@Overridepublic boolean filter(Integer value) throws Exception {return value != 0;}
});

KeyBy
DataStream → KeyedStream

从逻辑上将一个流数据划分成不相交的分区,每个分区包含相同的键的元素。在内部是用通过哈希分区来实现的。查看如何指定键的键。这一转变返回keyeddatastream。下面例子展示如何定义分区。

dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple

Reduce
KeyedStream → DataStream

这是一个keyeddatastream特有的滚动的reduce功能, 多对一:对所有同key的元素进行传入的运算,将总的结果发送出去。

下面的reduce 函数,得到了部分流数据的和:

keyedStream.reduce(new ReduceFunction<Integer>() {@Overridepublic Integer reduce(Integer value1, Integer value2)throws Exception {return value1 + value2;}
});

Fold
DataStream → DataStream

有一个初始值(0),其他和上面一样。这是一个keyeddatastream特有的滚动的reduce功能, 多对一:对所有同key的元素进行传入的运算,将总的结果发送出去。



下面的reduce 函数,得到了部分流数据的和:

keyedStream.fold(0, new ReduceFunction<Integer>() {@Overridepublic Integer fold(Integer accumulator, Integer value)throws Exception {return accumulator + value;}
});

Aggregations
KeyedStream → DataStream

这是一个keyeddatastream特有的滚动的聚合功能. min 和minBy 的区别是min 返回最小值, minBy 返回有指定key的最小值,对应的元素。

keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");

Window
KeyedStream → WindowedStream

可以对分区完KeyedStreams进行分区.  Windows根据每个key对应的数据的某些特征进行分组 (比如:每五秒到达的数据根据key划分为一个组). 后面有一章专门详细介绍windows

dataStream.keyBy(0).window(TumblingTimeWindows.of(5, TimeUnit.SECONDS)); // Last 5 seconds of data

WindowAll
DataStream → AllWindowedStream

Windows也能在一般的DataStream上使用而不仅仅是对KeyedStream。Windows能对所有的stream event 进行分组(比如:对最近的5秒的数据进行分组).

警告: 这是在许多情况下,一个非平行变换。所有的记录都会聚集在一个任务的windowall算子。

dataStream.windowAll(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS))); // Last 5 seconds of data

(Window) Apply
WindowedStream → DataStream
AllWindowedStream → DataStream

对windowStream的每一个小窗口应用一个函数.。下面的例子是对每个window的数据做sum的操作.

注意: 如果你使用的是上面的 那个windowAll 的transformation, 你需要传递AllWindowFunction ,而不是windowFunction。

windowedStream.apply (new WindowFunction<Tuple2<String,Integer>,Integer>, Tuple, Window>() {public void apply (Tuple tuple,Window window,Iterable<Tuple2<String, Integer>> values,Collector<Integer> out) throws Exception {int sum = 0;for (value t: values) {sum += t.f1;}out.collect (new Integer(sum));}
};

(Window) Reduce
WindowedStream → DataStream

对一个window里的数据做reduce,并返回reduce的结果。

windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>() {public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);}
};

(Window) Fold
WindowedStream → DataStream

对一个window里的数据做fold,并返回fold的结果。

windowedStream.fold (new Tuple2<String,Integer>("Sum of all", 0),  new FoldFunction<Tuple2<String,Integer>() {public Tuple2<String, Integer> fold(Tuple2<String, Integer> acc, Tuple2<String, Integer> value) throws Exception {return new Tuple2<String,Integer>(acc.f0, acc.f1 + value.f1);}
};

windows上的聚合
WindowedStream → DataStream

聚合window内的内容.。min 和minBy 的区别是min 返回最小值, minBy 返回有指定key的最小值,对应的元素。

windowedStream.sum(0);
windowedStream.sum("key");
windowedStream.min(0);
windowedStream.min("key");
windowedStream.max(0);
windowedStream.max("key");
windowedStream.minBy(0);
windowedStream.minBy("key");
windowedStream.maxBy(0);
windowedStream.maxBy("key");

Union
DataStream* → DataStream

连接两个或者多个datastream,并创建一个包含这几个dataStream里的所有元素的新的dataStream。

注意: 如果你union同一个datastream,还是只能获取其中一个。

dataStream.union(otherStream1, otherStream2, ...);

Window Join
DataStream,DataStream → DataStream

在一个window内,根据给定的key的条件是否满足,来join两个流。

dataStream.join(otherStream).where(0).equalTo(1).window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS))).apply (new JoinFunction () {...});

Window CoGroup
DataStream,DataStream → DataStream

在一个window内,根据给定的key的条件是否满足,对两个流合并后,并分组。

dataStream.coGroup(otherStream).where(0).equalTo(1).window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS))).apply (new CoGroupFunction () {...});

Connect
DataStream,DataStream → ConnectedStreams

连接两个流,并保留同样的流类型. 连接后的两个流之间可以共享 state。

DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);

CoMap, CoFlatMap
ConnectedStreams → DataStream

map 和 flatMap在连接后的流中的效果是类似的。

connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {@Overridepublic Boolean map1(Integer value) {return true;}@Overridepublic Boolean map2(String value) {return false;}
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {@Overridepublic void flatMap1(Integer value, Collector<String> out) {out.collect(value.toString());}@Overridepublic void flatMap2(String value, Collector<String> out) {for (String word: value.split(" ")) {out.collect(word);}}
});

Split
DataStream → SplitStream

根据某些标准,将一个流分离成两个或者多个流。

SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {@Overridepublic Iterable<String> select(Integer value) {List<String> output = new ArrayList<String>();if (value % 2 == 0) {output.add("even");}else {output.add("odd");}return output;}
});

Select
SplitStream → DataStream

从分离后的流中,选出一个或者多个流。

SplitStream<Integer> split;
DataStream<Integer> even = split.select("even");
DataStream<Integer> odd = split.select("odd");
DataStream<Integer> all = split.select("even","odd");

Iterate
DataStream → IterativeStream → DataStream

Creates a "feedback" loop in the flow , by redirecting the output of one operator to some previous operator. This is especially useful for defining algorithms that continuously update a model. The following code starts with a stream and applies the iteration body continuously. Elements that are greater than 0 are sent back to the feedback channel, and the rest of the elements are forwarded downstream. See iterations for a complete description.

IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){@Overridepublic boolean filter(Integer value) throws Exception {return value > 0;}
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){@Overridepublic boolean filter(Integer value) throws Exception {return value <= 0;}
});

Extract Timestamps
DataStream → DataStream

可以抽出时间语义窗口里面的记录的时间戳,详情请见working with time

stream.assignTimestamps (new TimeStampExtractor() {...});

Transformation Description
Map
DataStream → DataStream

Takes one element and produces one element. A map function that doubles the values of the input stream:

dataStream.map { x => x * 2 }

FlatMap
DataStream → DataStream

Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:

dataStream.flatMap { str => str.split(" ") }

Filter
DataStream → DataStream

Evaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values:

dataStream.filter { _ != 0 }

KeyBy
DataStream → KeyedStream

Logically partitions a stream into disjoint partitions, each partition containing elements of the same key. Internally, this is implemented with hash partitioning. See keys on how to specify keys. This transformation returns a KeyedDataStream.

dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple

Reduce
KeyedStream → DataStream

A "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value.

A reduce function that creates a stream of partial sums:

keyedStream.reduce { _ + _ }

Fold
DataStream → DataStream

A "rolling" fold on a keyed data stream with an initial value. Combines the current element with the last folded value and emits the new value.

A fold function that creates a stream of partial sums:

keyedStream.fold { 0, _ + _ }

Aggregations
KeyedStream → DataStream

Rolling aggregations on a keyed data stream. The difference between min and minBy is that min returns the minimun value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).

keyedStream.sum(0)
keyedStream.sum("key")
keyedStream.min(0)
keyedStream.min("key")
keyedStream.max(0)
keyedStream.max("key")
keyedStream.minBy(0)
keyedStream.minBy("key")
keyedStream.maxBy(0)
keyedStream.maxBy("key")

Window
KeyedStream → WindowedStream

Windows can be defined on already partitioned KeyedStreams. Windows group the data in each key according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a description of windows.

dataStream.keyBy(0).window(TumblingTimeWindows.of(5, TimeUnit.SECONDS)) // Last 5 seconds of data // Last 5 seconds of data

WindowAll
DataStream → AllWindowedStream

Windows can be defined on regular DataStreams. Windows group all the stream events according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a complete description of windows.

WARNING: This is in many cases a non-parallel transformation. All records will be gathered in one task for the windowAll operator.

dataStream.windowAll(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS))) // Last 5 seconds of data

Window Apply
WindowedStream → DataStream
AllWindowedStream → DataStream

Applies a general function to the window as a whole. Below is a function that manually sums the elements of a window.

Note: If you are using a windowAll transformation, you need to use an AllWindowFunction instead.

windowedStream.apply { applyFunction }

Window Reduce
WindowedStream → DataStream

Applies a functional reduce function to the window and returns the reduced value.

windowedStream.reduce { _ + _ }

Window Fold
WindowedStream → DataStream

Applies a functional fold function to the window and returns the folded value.

windowedStream.fold { 0, _ + _ }

Aggregations on windows
WindowedStream → DataStream

Aggregates the contents of a window. The difference between min and minBy is that min returns the minimun value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).

windowedStream.sum(0)
windowedStream.sum("key")
windowedStream.min(0)
windowedStream.min("key")
windowedStream.max(0)
windowedStream.max("key")
windowedStream.minBy(0)
windowedStream.minBy("key")
windowedStream.maxBy(0)
windowedStream.maxBy("key")

Union
DataStream* → DataStream

Union of two or more data streams creating a new stream containing all the elements from all the streams. Node: If you union a data stream with itself you will still only get each element once.

dataStream.union(otherStream1, otherStream2, ...)

Window Join
DataStream,DataStream → DataStream

Join two data streams on a given key and a common window.

dataStream.join(otherStream).where(0).equalTo(1).window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS))).apply { ... }

Window CoGroup
DataStream,DataStream → DataStream

Cogroups two data streams on a given key and a common window.

dataStream.coGroup(otherStream).where(0).equalTo(1).window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS))).apply {}

Connect
DataStream,DataStream → ConnectedStreams

"Connects" two data streams retaining their types, allowing for shared state between the two streams.

someStream : DataStream[Int] = ...
otherStream : DataStream[String] = ...val connectedStreams = someStream.connect(otherStream)

CoMap, CoFlatMap
ConnectedStreams → DataStream

Similar to map and flatMap on a connected data stream

connectedStreams.map((_ : Int) => true,(_ : String) => false
)
connectedStreams.flatMap((_ : Int) => true,(_ : String) => false
)

Split
DataStream → SplitStream

Split the stream into two or more streams according to some criterion.

val split = someDataStream.split((num: Int) =>(num % 2) match {case 0 => List("even")case 1 => List("odd")}
)

Select
SplitStream → DataStream

Select one or more streams from a split stream.

val even = split select "even"
val odd = split select "odd"
val all = split.select("even","odd")

Iterate
DataStream → IterativeStream → DataStream

Creates a "feedback" loop in the flow, by redirecting the output of one operator to some previous operator. This is especially useful for defining algorithms that continuously update a model. The following code starts with a stream and applies the iteration body continuously. Elements that are greater than 0 are sent back to the feedback channel, and the rest of the elements are forwarded downstream. See iterations for a complete description.

initialStream. iterate {iteration => {val iterationBody = iteration.map {/*do something*/}(iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))}
}
IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter ( _ > 0);
iteration.closeWith(feedback);

Extract Timestamps
DataStream → DataStream

Extracts timestamps from records in order to work with windows that use event time semantics. See working with time.

stream.assignTimestamps { timestampExtractor }

The following transformations are available on data streams of Tuples:

Transformation Description
Project
DataStream → DataStream

Selects a subset of fields from the tuples

DataStream<Tuple3<Integer, Double, String>> in = // [...]
DataStream<Tuple2<String, Integer>> out = in.project(2,0);

Transformation Description
Project
DataStream → DataStream

Selects a subset of fields from the tuples

val in : DataStream[(Int,Double,String)] = // [...]
val out = in.project(2,0)

物理分区

Flink 也通过以下Functions对transformation后的流进行底层的控制(如果需要的话)

Transformation Description
Hash partitioning
DataStream → DataStream

对keyBy后相同key的流返回DataStream,而不是KeyedStream

dataStream.partitionByHash("someKey");
dataStream.partitionByHash(0);

Custom partitioning
DataStream → DataStream

通过使用用户自定义的分区规则来给每个元素选择目标task

dataStream.partitionCustom(new Partitioner(){...}, "someKey");
dataStream.partitionCustom(new Partitioner(){...}, 0);

Random partitioning
DataStream → DataStream

随机均匀将将元素分区

dataStream.partitionRandom();

Rebalancing (Round-robin partitioning)
DataStream → DataStream

循环对元素分区,对每个分区进行负载均衡,用于优化处理数据倾斜的情况

dataStream.rebalance();

Broadcasting
DataStream → DataStream

将每个元素以广播形式发送到每一个分区

dataStream.broadcast();

Transformation Description
Hash partitioning
DataStream → DataStream

Identical to keyBy but returns a DataStream instead of a KeyedStream.

dataStream.partitionByHash("someKey")
dataStream.partitionByHash(0)

Custom partitioning
DataStream → DataStream

Uses a user-defined Partitioner to select the target task for each element.

dataStream.partitionCustom(partitioner, "someKey")
dataStream.partitionCustom(partitioner, 0)

Random partitioning
DataStream → DataStream

Partitions elements randomly according to a uniform distribution.

dataStream.partitionRandom()

Rebalancing (Round-robin partitioning)
DataStream → DataStream

Partitions elements round-robin, creating equal load per partition. Useful for performance optimization in the presence of data skew.

dataStream.rebalance()

Broadcasting
DataStream → DataStream

Broadcasts elements to every partition.

dataStream.broadcast()

Task chaining and resource groups

Chaining two subsequent transformations means co-locating them within the same thread for better performance. Flink by default chains operators if this is possible (e.g., two subsequent map transformations). The API gives fine-grained control over chaining if desired:

Use StreamExecutionEnvironment.disableOperatorChaining() if you want to disable chaining in the whole job. For more fine grained control, the following functions are available. Note that these functions can only be used right after a DataStream transformation as they refer to the previous transformation. For example, you can use someStream.map(...).startNewChain(), but you cannot use someStream.startNewChain().

A resource group is a slot in Flink, see slots. You can manually isolate operators in separate slots if desired.

Transformation Description
Start new chain

Begin a new chain, starting with this operator. The two mappers will be chained, and filter will not be chained to the first mapper.

someStream.filter(...).map(...).startNewChain().map(...);

Disable chaining

Do not chain the map operator

someStream.map(...).disableChaining();

Start a new resource group

Start a new resource group containing the map and the subsequent operators.

someStream.filter(...).startNewResourceGroup();

Isolate resources

Isolate the operator in its own slot.

someStream.map(...).isolateResources();

Transformation Description
Start new chain

Begin a new chain, starting with this operator. The two mappers will be chained, and filter will not be chained to the first mapper.

someStream.filter(...).map(...).startNewChain().map(...)

Disable chaining

Do not chain the map operator

someStream.map(...).disableChaining()

Start a new resource group

Start a new resource group containing the map and the subsequent operators.

someStream.filter(...).startNewResourceGroup()

Isolate resources

Isolate the operator in its own slot.

someStream.map(...).isolateResources()

Back to top

Specifying Keys

The keyBy transformation requires that a key is defined on its argument DataStream.

A DataStream is keyed as

DataStream<...> input = // [...]
DataStream<...> windowed = input.keyBy(/*define key here*/).window(/*define window here*/);

The data model of Flink is not based on key-value pairs. Therefore, you do not need to physically pack the data stream types into keys and values. Keys are “virtual”: they are defined as functions over the actual data to guide the grouping operator.

See the relevant section of the DataSet API documentation on how to specify keys. Just replace DataSet with DataStream, and groupBy withkeyBy.

Some transformations take user-defined functions as arguments.

See the relevant section of the DataSet API documentation.

Back to top

Data Types

Flink places some restrictions on the type of elements that are used in DataStreams and in results of transformations. The reason for this is that the system analyzes the types to determine efficient execution strategies.

See the relevant section of the DataSet API documentation.

Back to top

Data Sources

Sources can by created by using StreamExecutionEnvironment.addSource(sourceFunction). You can either use one of the source functions that come with Flink or write a custom source by implementing the SourceFunction for non-parallel sources, or by implementing theParallelSourceFunction interface or extending RichParallelSourceFunction for parallel sources.

There are several predefined stream sources accessible from the StreamExecutionEnvironment:

File-based:

  • readTextFile(path) / TextInputFormat - Reads files line wise and returns them as Strings.

  • readTextFileWithValue(path) / TextValueInputFormat - Reads files line wise and returns them as StringValues. StringValues are mutable strings.

  • readFile(path) / Any input format - Reads files as dictated by the input format.

  • readFileOfPrimitives(path, Class) / PrimitiveInputFormat - Parses files of new-line (or another char sequence) delimited primitive data types such as String or Integer.

  • readFileStream - create a stream by appending elements when there are changes to a file

Socket-based:

  • socketTextStream - Reads from a socket. Elements can be separated by a delimiter.

Collection-based:

  • fromCollection(Collection) - Creates a data stream from the Java Java.util.Collection. All elements in the collection must be of the same type.

  • fromCollection(Iterator, Class) - Creates a data stream from an iterator. The class specifies the data type of the elements returned by the iterator.

  • fromElements(T ...) - Creates a data stream from the given sequence of objects. All objects must be of the same type.

  • fromParallelCollection(SplittableIterator, Class) - Creates a data stream from an iterator, in parallel. The class specifies the data type of the elements returned by the iterator.

  • generateSequence(from, to) - Generates the sequence of numbers in the given interval, in parallel.

Custom:

  • addSource - Attache a new source function. For example, to read from Apache Kafka you can use addSource(new FlinkKafkaConsumer082<>(...)). See connectors for more details.

Sources can by created by using StreamExecutionEnvironment.addSource(sourceFunction). You can either use one of the source functions that come with Flink or write a custom source by implementing the SourceFunction for non-parallel sources, or by implementing theParallelSourceFunction interface or extending RichParallelSourceFunction for parallel sources.

There are several predefined stream sources accessible from the StreamExecutionEnvironment:

File-based:

  • readTextFile(path) / TextInputFormat - Reads files line wise and returns them as Strings.

  • readTextFileWithValue(path) / TextValueInputFormat - Reads files line wise and returns them as StringValues. StringValues are mutable strings.

  • readFile(path) / Any input format - Reads files as dictated by the input format.

  • readFileOfPrimitives(path, Class) / PrimitiveInputFormat - Parses files of new-line (or another char sequence) delimited primitive data types such as String or Integer.

  • readFileStream - create a stream by appending elements when there are changes to a file

Socket-based:

  • socketTextStream - Reads from a socket. Elements can be separated by a delimiter.

Collection-based:

  • fromCollection(Seq) - Creates a data stream from the Java Java.util.Collection. All elements in the collection must be of the same type.

  • fromCollection(Iterator) - Creates a data stream from an iterator. The class specifies the data type of the elements returned by the iterator.

  • fromElements(elements: _*) - Creates a data stream from the given sequence of objects. All objects must be of the same type.

  • fromParallelCollection(SplittableIterator) - Creates a data stream from an iterator, in parallel. The class specifies the data type of the elements returned by the iterator.

  • generateSequence(from, to) - Generates the sequence of numbers in the given interval, in parallel.

Custom:

  • addSource - Attache a new source function. For example, to read from Apache Kafka you can use addSource(new FlinkKafkaConsumer082<>(...)). See connectors for more details.

Back to top

Execution Configuration

The StreamExecutionEnvironment also contains the ExecutionConfig which allows to set job specific configuration values for the runtime.

See the relevant section of the DataSet API documentation.

Parameters in the ExecutionConfig that pertain specifically to the DataStream API are:

  • enableTimestamps() / disableTimestamps(): Attach a timestamp to each event emitted from a source. areTimestampsEnabled() returns the current value.

  • setAutoWatermarkInterval(long milliseconds): Set the interval for automatic watermark emission. You can get the current value withlong getAutoWatermarkInterval()

Back to top

Data Sinks

Data sinks consume DataStreams and forward them to files, sockets, external systems, or print them. Flink comes with a variety of built-in output formats that are encapsulated behind operations on the DataStreams:

  • writeAsText() / TextOuputFormat - Writes elements line-wise as Strings. The Strings are obtained by calling the toString() method of each element.

  • writeAsCsv(...) / CsvOutputFormat - Writes tuples as comma-separated value files. Row and field delimiters are configurable. The value for each field comes from the toString() method of the objects.

  • print() / printToErr() - Prints the toString() value of each element on the standard out / strandard error stream. Optionally, a prefix (msg) can be provided which is prepended to the output. This can help to distinguish between different calls to print. If the parallelism is greater than 1, the output will also be prepended with the identifier of the task which produced the output.

  • write() / FileOutputFormat - Method and base class for custom file outputs. Supports custom object-to-bytes conversion.

  • writeToSocket - Writes elements to a socket according to a SerializationSchema

  • addSink - Invokes a custom sink function. Flink comes bundled with connectors to other systems (such as Apache Kafka) that are implemented as sink functions.

Data sinks consume DataStreams and forward them to files, sockets, external systems, or print them. Flink comes with a variety of built-in output formats that are encapsulated behind operations on the DataStreams:

  • writeAsText() / TextOuputFormat - Writes elements line-wise as Strings. The Strings are obtained by calling the toString() method of each element.

  • writeAsCsv(...) / CsvOutputFormat - Writes tuples as comma-separated value files. Row and field delimiters are configurable. The value for each field comes from the toString() method of the objects.

  • print() / printToErr() - Prints the toString() value of each element on the standard out / strandard error stream. Optionally, a prefix (msg) can be provided which is prepended to the output. This can help to distinguish between different calls to print. If the parallelism is greater than 1, the output will also be prepended with the identifier of the task which produced the output.

  • write() / FileOutputFormat - Method and base class for custom file outputs. Supports custom object-to-bytes conversion.

  • writeToSocket - Writes elements to a socket according to a SerializationSchema

  • addSink - Invokes a custom sink function. Flink comes bundled with connectors to other systems (such as Apache Kafka) that are implemented as sink functions.

Back to top

Debugging

Before running a streaming program in a distributed cluster, it is a good idea to make sure that the implemented algorithm works as desired. Hence, implementing data analysis programs is usually an incremental process of checking results, debugging, and improving.

Flink provides features to significantly ease the development process of data analysis programs by supporting local debugging from within an IDE, injection of test data, and collection of result data. This section give some hints how to ease the development of Flink programs.

Local Execution Environment

LocalStreamEnvironment starts a Flink system within the same JVM process it was created in. If you start the LocalEnvironement from an IDE, you can set breakpoints in your code and easily debug your program.

A LocalEnvironment is created and used as follows:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();DataStream<String> lines = env.addSource(/* some source */);
// build your programenv.execute();

val env = StreamExecutionEnvironment.createLocalEnvironment()val lines = env.addSource(/* some source */)
// build your programenv.execute()

Collection Data Sources

Flink provides special data sources which are backed by Java collections to ease testing. Once a program has been tested, the sources and sinks can be easily replaced by sources and sinks that read from / write to external systems.

Collection data sources can be used as follows:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();// Create a DataStream from a list of elements
DataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);// Create a DataStream from any Java collection
List<Tuple2<String, Integer>> data = ...
DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data);// Create a DataStream from an Iterator
Iterator<Long> longIt = ...
DataStream<Long> myLongs = env.fromCollection(longIt, Long.class);

val env = StreamExecutionEnvironment.createLocalEnvironment()// Create a DataStream from a list of elements
val myInts = env.fromElements(1, 2, 3, 4, 5)// Create a DataStream from any Collection
val data: Seq[(String, Int)] = ...
val myTuples = env.fromCollection(data)// Create a DataStream from an Iterator
val longIt: Iterator[Long] = ...
val myLongs = env.fromCollection(longIt)

Note: Currently, the collection data source requires that data types and iterators implement Serializable. Furthermore, collection data sources can not be executed in parallel ( parallelism = 1).

Back to top

窗口

时间窗口

时间窗口,通常是指一定时间内的一组event的组合。  时间窗口函数定义了时间的类型,目前支持三种不同的时间窗口:

  • Processing time: Processing time是指当transformation发生时候的机器的时间, Processing time 是最简单的时间类型,也是性能最高的。. 但是,在分布式和异步环境下,机器时间,往往不一致和有很多不确定性。

  • Event time: Event time是指每个event发生的时间 。 这份时间一般是当消息进入flink前,消息本身自带的。或者从消息的某个字段中抽取出来. 当使用event time的情况下,乱序的消息可以被适当的处理。. 举个例子, 在12分的时间窗口里,当一个10分钟的event在12分钟的时候到达了,transformation也会正确的处理这些乱了序的event。. Event time 的处理方式提供了可预测的结果。 , 但会带来更多的延迟, 因为乱序的消息需要被缓存起来到内存里。

  • Ingestion time: Ingestion(食入,摄取) time 是当event进入到flink的时间。.当event消息进入到flink被分配到的task所在的机器上的时间,作为分配给event的时间,. Ingestion time比   processing time更有确定性和可预测性, 比event time有更低的延迟。因为不依赖外部系统。因此, Ingestion time 提供了一种处于两者之间的解决方案。 Ingestion time 其实可以说是 event time的一种特殊情况,实际上,ingestion time 和eventtime在flink的底层中的处理方式是一样的。

当使用 event time时, transformations需要避免无限的等待event到达,Watermarks 提供了一种控制event time的偏移时间的机制。Watermarks是由 sources发射出来的. 一个watermark 带有一个确定的时间戳(long),比如转换后是2015-12-03 14:17:30 ,则表示,不会再有比这个时间更早的时间的消息会到达。

你可以通过下面的方式,选择你需要的时间语义。

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

默认情况是TimeCharacteristic.ProcessingTime, 写一个processing time的语义的程序是不需要,再做其他事情。

如果要写一个event time语义的程序 , 需要做下面4个步骤:

  • 1:设置event time的语义 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  • 2:使用DataStream.assignTimestamps(...) 告诉flink,时间戳和event的关联。比如说,第几个字段是时间戳。

  • 设置 让时间戳有效,enableTimestamps(), 还有watermark的发射间隔。(setAutoWatermarkInterval(long milliseconds)) inExecutionConfig.

举个例子, 假设   我们有一个tuple的数据流, 并且里面的第一个字段是时间戳 (产生这些消息的系统赋予的,非flink),  并且我们知道处理时间和这个时间的落差不会超过1秒。

DataStream<Tuple4<Long,Integer,Double,String>> stream = //...
stream.assignTimestamps(new TimestampExtractor<Tuple4<Long,Integer,Double,String>>{@Overridepublic long extractTimestamp(Tuple4<Long,Integer,Double,String> element, long currentTimestamp) {return element.f0;}@Overridepublic long extractWatermark(Tuple4<Long,Integer,Double,String> element, long currentTimestamp) {return element.f0 - 1000;}@Overridepublic long getCurrentWatermark() {return Long.MIN_VALUE;}
});

val stream: DataStream[(Long,Int,Double,String)] = null;
stream.assignTimestampts(new TimestampExtractor[(Long, Int, Double, String)] {override def extractTimestamp(element: (Long, Int, Double, String), currentTimestamp: Long): Long = element._1override def extractWatermark(element: (Long, Int, Double, String), currentTimestamp: Long): Long = element._1 - 1000override def getCurrentWatermark: Long = Long.MinValue
})

如果你确定,你的时间戳一定是升序的,按顺序到达,你可以使用 AscendingTimestampExtractor, 系统会自动的发射watermark:

DataStream<Tuple4<Long,Integer,Double,String>> stream = //...
stream.assignTimestamps(new AscendingTimestampExtractor<Tuple4<Long,Integer,Double,String>>{@Overridepublic long extractAscendingTimestamp(Tuple4<Long,Integer,Double,String> element, long currentTimestamp) {return element.f0;}
});

stream.extractAscendingTimestamp(record => record._1)

使用 ingestion time 语义,你需要:

1:env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime).

你可以想象一下,这个设置,其实是是event time的简写。因为source根据当前的机器时间,flink注入和发射都是在flink做的,所以flink可以推断后面的那些参数,所以自动做的。

Windows on Keyed Data Streams

Flink提供了好多方法,为KeyedStream定义windowon . 每个window 包含了同样的key的元素 。

Basic Window Constructs

Flink offers a general window mechanism that provides flexibility, as well as a number of pre-defined windows for common use cases. See first if your use case can be served by the pre-defined windows below before moving to defining your own windows.

Transformation Description
跳动时间 window
KeyedStream → WindowedStream

定义一个5秒跳动的窗口. 表示根据元素的时间戳,5秒为一个单位组织起来的窗口, 并且每个元素只会在一个窗口中出现一次。时间戳根据上面的env设置的语义而定。

keyedStream.timeWindow(Time.of(5, TimeUnit.SECONDS));

滑动时间window
KeyedStream → WindowedStream

定义一个5秒的窗口, 1秒滑动一次.表示根据元素的时间戳,5秒为一个单位组织起来的窗口 ,但是每个元素可能在多个窗口中出现多次。

keyedStream.timeWindow(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS));

跳动个数 window
KeyedStream → WindowedStream

1000个一个单位的窗口,一个元素只会出现一次

keyedStream.countWindow(1000);

滑动个数 window
KeyedStream → WindowedStream

1000个一个单位的窗口,一个元素可能出现多次

keyedStream.countWindow(1000, 100)

Transformation Description
Tumbling time window
KeyedStream → WindowedStream

Defines a window of 5 seconds, that "tumbles". This means that elements are grouped according to their timestamp in groups of 5 second duration, and every element belongs to exactly one window. The notion of time is specified by the selected TimeCharacteristic (see time).

keyedStream.timeWindow(Time.of(5, TimeUnit.SECONDS))

Sliding time window
KeyedStream → WindowedStream

Defines a window of 5 seconds, that "slides" by 1 seconds. This means that elements are grouped according to their timestamp in groups of 5 second duration, and elements can belong to more than one window (since windows overlap by at most 4 seconds) The notion of time is specified by the selected TimeCharacteristic (see time).

keyedStream.timeWindow(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS))

Tumbling count window
KeyedStream → WindowedStream

Defines a window of 1000 elements, that "tumbles". This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element belongs to exactly one window.

keyedStream.countWindow(1000)

Sliding count window
KeyedStream → WindowedStream

Defines a window of 1000 elements, that "slides" every 100 elements. This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element can belong to more than one window (as windows overlap by at most 900 elements).

keyedStream.countWindow(1000, 100)

高级Window构造方式

这个机制可以定义出功能更丰富的窗口,相反的需要写更多的代码。 举个例子,下面是一个自定义的窗口,每个window持有最新的5秒并且没1秒滑动一次。 但是,当100个元素被添加到window后,window的execution函数,会被跟踪(触发)。之后每一次execution执行都会被跟踪(触发)。 window会保留10个元素:

keyedStream.window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)).trigger(Count.of(100)).evictor(Count.of(10));

keyedStream.window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)).trigger(Count.of(100)).evictor(Count.of(10))

构造一个自定义窗口的一般方式是,

(1)指定一个WindowAssigner,

(2)指定 一个触发器Trigger (optionally),

(3)指定一个逐出器Evictor (optionally).

WindowAssigner定义了如何组织一个窗口 (时间或者个数) 一个window 元素的逻辑组合,有一个begin value,和一个end value。相应的,有一个begin time和end time. 带有时间戳的元素 。

举个例子,滑动时间窗口分配器,定义了5秒为一个单位,每1秒滑动一次,假设,以毫秒为单位,时间从0毫秒开始,然后我们有6个窗口: [0,5000], [1000,6000], [2000,7000], [3000, 8000], [4000, 9000], and [5000, 10000]. 每个进来的元素,根据他们的时间戳,被分配到这6个窗口中,可能出现在多个窗口里,比如带有2000 时间戳的元素,会被分配到前三个窗口。Flink 运行,会绑定在对应的窗口分配器,可以覆盖更多的场景. 你可以自定义你的window类型,通过继承WindowAssigner类。

Transformation Description
Global window
KeyedStream → WindowedStream

所有进来的元素,按key分组,每个组放在相同的window里。这些window没有默认的trigger,因此如果没有自定义trigger的话,这些数据是不会被trigger触发的

stream.window(GlobalWindows.create());

Tumbling time windows
KeyedStream → WindowedStream

所有进来的元素,根据元素各自的时间戳被分配到一个window里,windows之间不交叉, 每个元素最多只会出现在一个window里一次。 The window 有一个默认的 trigger. 针对event/ingestion time这两钟语义, 当收到一个高于自己的end value的watermark, window就会 触发。, 而对于 processing time 当前的processing time 超过他的current end value.

stream.window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)));

Sliding time windows
KeyedStream → WindowedStream

所有进来的元素,根据元素各自的时间戳被分配到一个window里,windows之间可能会产生交叉, 。The window 有一个默认的 trigger. 针对event/ingestion time这两钟语义, 当收到一个高于自己的end value的watermark, window就会 触发。, 而对于 processing time 当前的processing time 超过他的current end value.

stream.window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)));

Transformation Description
Global window
KeyedStream → WindowedStream

All incoming elements of a given key are assigned to the same window. The window does not contain a default trigger, hence it will never be triggered if a trigger is not explicitly specified.

stream.window(GlobalWindows.create)

Tumbling time windows
KeyedStream → WindowedStream

Incoming elements are assigned to a window of a certain size (1 second below) based on their timestamp. Windows do not overlap, i.e., each element is assigned to exactly one window. The notion of time is specified by the selected TimeCharacteristic (see time). The window comes with a default trigger. For event/ingestion time, a window is triggered when a watermark with value higher than its end-value is received, whereas for processing time when the current processing time exceeds its current end value.

stream.window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))

Sliding time windows
KeyedStream → WindowedStream

Incoming elements are assigned to a window of a certain size (5 seconds below) based on their timestamp. Windows "slide" by the provided value (1 second in the example), and hence overlap. The window comes with a default trigger. For event/ingestion time, a window is triggered when a watermark with value higher than its end-value is received, whereas for processing time when the current processing time exceeds its current end value.

stream.window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)))

Trigger定义了,跟在每一个的window后面的函数(sum,count),什么时候evaluated (“fires”)。如果没有指定trigger,就使用默认的trigger. Flink 自带了一组trigger,如果默认的trigger都没法满足你的应用,可以通过实现Trigger接口实现自己的trigger. 注意,如果使用自定义trigger后,会覆盖默认的trigger.

Transformation Description
Processing time trigger

当前的处理时间超过他的end-value时,则发射一个window,从此之后,被跟踪的window上的元素就会被丢弃。

windowedStream.trigger(ProcessingTimeTrigger.create());

Watermark trigger

当接收到一个超过end value的watermark时,则发射一个window。被跟踪的window上的元素就会被丢弃。

windowedStream.trigger(EventTimeTrigger.create());

Continuous processing time trigger

每个being fire 的 window会定期的考虑if()。当当前时间超过他的end-value的时候,才会真正发射,被触发的窗口里的函数将会保留。

windowedStream.trigger(ContinuousProcessingTimeTrigger.of(Time.of(5, TimeUnit.SECONDS)));

Continuous watermark time trigger

每个being fire 的 window会定期的考虑if()。当watermark时间超过他的end-value的时候,才会真正发射,被触发的窗口里的函数将会保留。

windowedStream.trigger(ContinuousEventTimeTrigger.of(Time.of(5, TimeUnit.SECONDS)));

Count trigger

超过1000个元素后,这个窗口就会被发射,处于准备发射状态的窗口里的元素,将会被保留。

windowedStream.trigger(CountTrigger.of(1000));

Purging trigger

Takes any trigger as an argument and forces the triggered window elements to be "purged" (discarded) after triggering.

windowedStream.trigger(PurgingTrigger.of(CountTrigger.of(1000)));

Delta trigger

每个being fire 的 window会定期的考虑if()。当最后一个元素和第一个插入的元素运算后满足true的时候,才会真正发射。

windowedStream.trigger(new DeltaTrigger.of(5000.0, new DeltaFunction<Double>() {@Overridepublic double getDelta (Double old, Double new) {return (new - old > 0.01);}
}));

Transformation Description
Processing time trigger

A window is fired when the current processing time exceeds its end-value. The elements on the triggered window are henceforth discarded.

windowedStream.trigger(ProcessingTimeTrigger.create);

Watermark trigger

A window is fired when a watermark with value that exceeds the window's end-value has been received. The elements on the triggered window are henceforth discarded.

windowedStream.trigger(EventTimeTrigger.create);

Continuous processing time trigger

A window is periodically considered for being fired (every 5 seconds in the example). The window is actually fired only when the current processing time exceeds its end-value. The elements on the triggered window are retained.

windowedStream.trigger(ContinuousProcessingTimeTrigger.of(Time.of(5, TimeUnit.SECONDS)));

Continuous watermark time trigger

A window is periodically considered for being fired (every 5 seconds in the example). A window is actually fired when a watermark with value that exceeds the window's end-value has been received. The elements on the triggered window are retained.

windowedStream.trigger(ContinuousEventTimeTrigger.of(Time.of(5, TimeUnit.SECONDS)));

Count trigger

A window is fired when it has more than a certain number of elements (1000 below). The elements of the triggered window are retained.

windowedStream.trigger(CountTrigger.of(1000));

Purging trigger

Takes any trigger as an argument and forces the triggered window elements to be "purged" (discarded) after triggering.

windowedStream.trigger(PurgingTrigger.of(CountTrigger.of(1000)));

Delta trigger

A window is periodically considered for being fired (every 5000 milliseconds in the example). A window is actually fired when the value of the last added element exceeds the value of the first element inserted in the window according to a `DeltaFunction`.

windowedStream.trigger(DeltaTrigger.of(5000.0, { (old,new) => new - old > 0.01 }))

当trigger进行了fire之后, 并且执行sum和count之前, 有一个可选的逐出器可以移除保留元素。. Flink 自带了一组evictors ,你还可以通过实现Evictor接口,实现自定义的逐出器。.

Transformation Description
Time evictor

从window的begin处开始移除元素,知道最后剩下 end value -1秒到 end value的元素。

triggeredStream.evict(TimeEvictor.of(Time.of(1, TimeUnit.SECONDS)));

Count evictor

保留倒数的最后1000 元素,其他的丢掉。

triggeredStream.evict(CountEvictor.of(1000));

Delta evictor

从window的begin开始,一直丢元素,知道某个元素,8比最后一个元素15小5。(通过一个阈值5 ,和一个函数).

triggeredStream.evict(DeltaEvictor.of(5000, new DeltaFunction<Double>() {public double (Double oldValue, Double newValue) {return newValue - oldValue;}
}));

Transformation Description
Time evictor

Evict all elements from the beginning of the window, so that elements from end-value - 1 second until end-value are retained (the resulting window size is 1 second).

triggeredStream.evict(TimeEvictor.of(Time.of(1, TimeUnit.SECONDS)));

Count evictor

Retain 1000 elements from the end of the window backwards, evicting all others.

triggeredStream.evict(CountEvictor.of(1000));

Delta evictor

Starting from the beginning of the window, evict elements until an element with value lower than the value of the last element is found (by a threshold and a DeltaFunction).

windowedStream.evict(DeltaEvictor.of(5000.0, { (old,new) => new - old > 0.01 }))

Recipes for Building Windows

window 分配器,trigger,evictor的机制都功能强大。这些机制让你可以定义各种不同类型的window。Flink’s的基本window其实是在这三个机制之上包了一层的,.下面是一些通用的端口是如何通过这三种机制来构造的 。

Window type Definition
Tumbling count window

stream.countWindow(1000)

stream.window(GlobalWindows.create()).trigger(CountTrigger.of(1000).evict(CountEvictor.of(1000)))

Sliding count window

stream.countWindow(1000, 100)

stream.window(GlobalWindows.create()).trigger(CountTrigger.of(1000).evict(CountEvictor.of(100)))

Tumbling event time window

stream.timeWindow(Time.of(5, TimeUnit.SECONDS))

stream.window(TumblingTimeWindows.of((Time.of(5, TimeUnit.SECONDS))).trigger(EventTimeTrigger.create())

Sliding event time window

stream.timeWindow(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS))

stream.window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS))).trigger(EventTimeTrigger.create())

Tumbling processing time window

stream.timeWindow(Time.of(5, TimeUnit.SECONDS))

stream.window(TumblingTimeWindows.of((Time.of(5, TimeUnit.SECONDS))).trigger(ProcessingTimeTrigger.create())

Sliding processing time window

stream.timeWindow(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS))

stream.window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS))).trigger(ProcessingTimeTrigger.create())

Windows on Unkeyed Data Streams

You也可以对普通流(stream,之前都是keyedStream)定义窗口。通过调用 the windowAll 这个transformation.这个stream 包含了所有keyed的stream, but 在一个单独的task里evaluated (在一个单独的计算节点上). 定义trigger和evictor的语法是一样的:

nonKeyedStream.windowAll(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)).trigger(Count.of(100)).evictor(Count.of(10));

nonKeyedStream.windowAll(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)).trigger(Count.of(100)).evictor(Count.of(10))

基本的window 定义也适用于普通的nokey的windows:

Transformation Description
Tumbling time window all
DataStream → WindowedStream

Defines a window of 5 seconds, that "tumbles". This means that elements are grouped according to their timestamp in groups of 5 second duration, and every element belongs to exactly one window. The notion of time used is controlled by the StreamExecutionEnvironment.

nonKeyedStream.timeWindowAll(Time.of(5, TimeUnit.SECONDS));

Sliding time window all
DataStream → WindowedStream

Defines a window of 5 seconds, that "slides" by 1 seconds. This means that elements are grouped according to their timestamp in groups of 5 second duration, and elements can belong to more than one window (since windows overlap by at least 4 seconds) The notion of time used is controlled by the StreamExecutionEnvironment.

nonKeyedStream.timeWindowAll(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS));

Tumbling count window all
DataStream → WindowedStream

Defines a window of 1000 elements, that "tumbles". This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element belongs to exactly one window.

nonKeyedStream.countWindowAll(1000)

Sliding count window all
DataStream → WindowedStream

Defines a window of 1000 elements, that "slides" every 100 elements. This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element can belong to more than one window (as windows overlap by at least 900 elements).

nonKeyedStream.countWindowAll(1000, 100)

Transformation Description
Tumbling time window all
DataStream → WindowedStream

Defines a window of 5 seconds, that "tumbles". This means that elements are grouped according to their timestamp in groups of 5 second duration, and every element belongs to exactly one window. The notion of time used is controlled by the StreamExecutionEnvironment.

nonKeyedStream.timeWindowAll(Time.of(5, TimeUnit.SECONDS));

Sliding time window all
DataStream → WindowedStream

Defines a window of 5 seconds, that "slides" by 1 seconds. This means that elements are grouped according to their timestamp in groups of 5 second duration, and elements can belong to more than one window (since windows overlap by at least 4 seconds) The notion of time used is controlled by the StreamExecutionEnvironment.

nonKeyedStream.timeWindowAll(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS));

Tumbling count window all
DataStream → WindowedStream

Defines a window of 1000 elements, that "tumbles". This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element belongs to exactly one window.

nonKeyedStream.countWindowAll(1000)

Sliding count window all
DataStream → WindowedStream

Defines a window of 1000 elements, that "slides" every 100 elements. This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element can belong to more than one window (as windows overlap by at least 900 elements).

nonKeyedStream.countWindowAll(1000, 100)

Back to top

Execution Parameters

Fault Tolerance

Flink has a checkpointing mechanism that recovers streaming jobs after failues. The checkpointing mechanism requires a persistent or durablesource that can be asked for prior records again (Apache Kafka is a good example of a durable source).

The checkpointing mechanism stores the progress in the source as well as the user-defined state (see Working with State) consistently to provideexactly once processing guarantees.

To enable checkpointing, call enableCheckpointing(n) on the StreamExecutionEnvironment, where n is the checkpoint interval in milliseconds.

Other parameters for checkpointing include:

  • Number of retries: The setNumberOfExecutionRerties() method defines how many times the job is restarted after a failure. When checkpointing is activated, but this value is not explicitly set, the job is restarted infinitely often.
  • exactly-once vs. at-least-once: You can optionally pass a mode to the enableCheckpointing(n) method to choose between the two guarantee levels. Exactly-once is preferrable for most applications. At-least-once may be relevant for certain super-low-latency (consistently few milliseconds) applications.

The docs on streaming fault tolerance describe in detail the technique behind Flink’s streaming fault tolerance mechanism.

Flink can guarantee exactly-once state updates to user-defined state only when the source participates in the snapshotting mechanism. This is currently guaranteed for the Kafka source (and internal number generators), but not for other sources. The following table lists the state update guarantees of Flink coupled with the bundled sources:

Source Guarantees Notes
Apache Kafka exactly once Use the appropriate Kafka connector for your version
RabbitMQ at most once  
Twitter Streaming API at most once  
Collections at most once  
Files at least once At failure the file will be read from the beginning
Sockets at most once  

To guarantee end-to-end exactly-once record delivery (in addition to exactly-once updates), the data sink needs to take part in the snapshotting mechanism. The following table lists the delivery guarantees (assuming exactly-once state updates) of Flink coupled with bundled sinks:

Sink Guarantees Notes
HDFS rolling sink exactly once Implementation depends on Hadoop version
Elasticsearch at least once  
Kafka producer at least once  
File sinks at least once  
Socket sinks at lest once  
Standard output at least once  

Parallelism

You can control the number of parallel instances created for each operator by calling the operator.setParallelism(int) method.

Controlling Latency

By default, elements are not transferred on the network one-by-one (which would cause unnecessary network traffic) but are buffered. The size of the buffers (which are actually transferred between machines) can be set in the Flink config files. While this method is good for optimizing throughput, it can cause latency issues when the incoming stream is not fast enough. To control throughput and latency, you can useenv.setBufferTimeout(timeoutMillis) on the execution environment (or on individual operators) to set a maximum wait time for the buffers to fill up. After this time, the buffers are sent automatically even if they are not full. The default value for this timeout is 100 ms.

Usage:

LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setBufferTimeout(timeoutMillis);env.genereateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);

LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment
env.setBufferTimeout(timeoutMillis)env.genereateSequence(1,10).map(myMap).setBufferTimeout(timeoutMillis)

To maximize throughput, set setBufferTimeout(-1) which will remove the timeout and buffers will only be flushed when they are full. To minimize latency, set the timeout to a value close to 0 (for example 5 or 10 ms). A buffer timeout of 0 should be avoided, because it can cause severe performance degradation.

Back to top

Working with State

All transformations in Flink may look like functions (in the functional processing terminology), but are in fact stateful operators. You can make everytransformation (mapfilter, etc) stateful by declaring local variables or using Flink’s state interface. You can register any local variable as managedstate by implementing an interface. In this case, and also in the case of using Flink’s native state interface, Flink will automatically take consistent snapshots of your state periodically, and restore its value in the case of a failure.

The end effect is that updates to any form of state are the same under failure-free execution and execution under failures.

First, we look at how to make local variables consistent under failures, and then we look at Flink’s state interface.

By default state checkpoints will be stored in-memory at the JobManager. For proper persistence of large state, Flink supports storing the checkpoints on file systems (HDFS, S3, or any mounted POSIX file system), which can be configured in the flink-conf.yaml or viaStreamExecutionEnvironment.setStateBackend(…).

Checkpointing Local Variables

Local variables can be checkpointed by using the Checkpointed interface.

When the user-defined function implements the Checkpointed interface, the snapshotState(…) and restoreState(…) methods will be executed to draw and restore function state.

In addition to that, user functions can also implement the CheckpointNotifier interface to receive notifications on completed checkpoints via thenotifyCheckpointComplete(long checkpointId) method. Note that there is no guarantee for the user function to receive a notification if a failure happens between checkpoint completion and notification. The notifications should hence be treated in a way that notifications from later checkpoints can subsume missing notifications.

For example the same counting, reduce function shown for OperatorStates by using the Checkpointed interface instead:

public class CounterSum implements ReduceFunction<Long>, Checkpointed<Long> {// persistent counterprivate long counter = 0;@Overridepublic Long reduce(Long value1, Long value2) {counter++;return value1 + value2;}// regularly persists state during normal operation@Overridepublic Serializable snapshotState(long checkpointId, long checkpointTimestamp) {return counter;}// restores state on recovery from failure@Overridepublic void restoreState(Long state) {counter = state;}
}

Using the Key/Value State Interface

The state interface gives access to key/value states, which are a collection of key/value pairs. Because the state is partitioned by the keys (distributed accross workers), it can only be used on the KeyedStream, created via stream.keyBy(…) (which means also that it is usable in all types of functions on keyed windows).

The handle to the state can be obtained from the function’s RuntimeContext. The state handle will then give access to the value mapped under the key of the current record or window - each key consequently has its own value.

The following code sample shows how to use the key/value state inside a reduce function. When creating the state handle, one needs to supply a name for that state (a function can have multiple states of different types), the type of the state (used to create efficient serializers), and the default value (returned as a value for keys that do not yet have a value associated).

public class CounterSum implements RichReduceFunction<Long> {/** The state handle */private OperatorState<Long> counter;@Overridepublic Long reduce(Long value1, Long value2) {counter.update(counter.value() + 1);return value1 + value2;}@Overridepublic void open(Configuration config) {counter = getRuntimeContext().getKeyValueState("myCounter", Long.class, 0L);}
}

State updated by this is usually kept locally inside the flink process (unless one configures explicitly an external state backend). This means that lookups and updates are process local and this very fast.

The important implication of having the keys set implicitly is that it forces programs to group the stream by key (via the keyBy() function), making the key partitioning transparent to Flink. That allows the system to efficiently restore and redistribute keys and state.

The Scala API has shortcuts that for stateful map() or flatMap() functions on KeyedStream, which give the state of the current key as an option directly into the function, and return the result with a state update:

val stream: DataStream[(String, Int)] = ...val counts: DataStream[(String, Int)] = stream.keyBy(_._1).mapWithState((in: (String, Int), count: Option[Int]) =>count match {case Some(c) => ( (in._1, c), Some(c + in._2) )case None => ( (in._1, 0), Some(in._2) )})

Stateful Source Functions

Stateful sources require a bit more care as opposed to other operators. In order to make the updates to the state and output collection atomic (required for exactly-once semantics on failure/recovery), the user is required to get a lock from the source’s context.

public static class CounterSource implements RichParallelSourceFunction<Long>, Checkpointed<Long> {/**  current offset for exactly once semantics */private long offset;/** flag for job cancellation */private volatile boolean isRunning = true;@Overridepublic void run(SourceContext<Long> ctx) {final Object lock = ctx.getCheckpointLock();while (isRunning) {// output and state update are atomicsynchronized (lock) {ctx.collect(offset);offset += 1;}}}@Overridepublic void cancel() {isRunning = false;}@Overridepublic Long snapshotState(long checkpointId, long checkpointTimestamp) {return offset;}@Overridepublic void restoreState(Long state) {offset = state;}
}

Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the flink.streaming.api.checkpoint.CheckpointNotifier interface.

State Checkpoints in Iterative Jobs

Flink currently only provides processing guarantees for jobs without iterations. Enabling checkpointing on an iterative job causes an exception. In order to force checkpointing on an iterative program the user needs to set a special flag when enabling checkpointing:env.enableCheckpointing(interval, force = true).

Please note that records in flight in the loop edges (and the state changes associated with them) will be lost during failure.

Back to top

Iterations

Iterative streaming programs implement a step function and embed it into an IterativeStream. As a DataStream program may never finish, there is no maximum number of iterations. Instead, you need to specify which part of the stream is fed back to the iteration and which part is forwarded downstream using a split transformation or a filter. Here, we show an example using filters. First, we define an IterativeStream

IterativeStream<Integer> iteration = input.iterate();

Then, we specify the logic that will be executed inside the loop using a series of trasformations (here a simple map transformation)

DataStream<Integer> iterationBody = iteration.map(/* this is executed many times */);

To close an iteration and define the iteration tail, call the closeWith(feedbackStream) method of the IterativeStream. The DataStream given to the closeWith function will be fed back to the iteration head. A common pattern is to use a filter to separate the part of the strem that is fed back, and the part of the stream which is propagated forward. These filters can, e.g., define the “termination” logic, where an element is allowed to propagate downstream rather than being fed back.

iteration.closeWith(iterationBody.filter(/* one part of the stream */));
DataStream<Integer> output = iterationBody.filter(/* some other part of the stream */);

By default the partitioning of the feedback stream will be automatically set to be the same as the input of the iteration head. To override this the user can set an optional boolean flag in the closeWith method.

For example, here is program that continuously subtracts 1 from a series of integers until they reach zero:

DataStream<Long> someIntegers = env.generateSequence(0, 1000);IterativeStream<Long> iteration = someIntegers.iterate();DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() {@Overridepublic Long map(Long value) throws Exception {return value - 1 ;}
});DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() {@Overridepublic boolean filter(Long value) throws Exception {return (value > 0);}
});iteration.closeWith(stillGreaterThanZero);DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() {@Overridepublic boolean filter(Long value) throws Exception {return (value <= 0);}
});

Iterative streaming programs implement a step function and embed it into an IterativeStream. As a DataStream program may never finish, there is no maximum number of iterations. Instead, you need to specify which part of the stream is fed back to the iteration and which part is forwarded downstream using a split transformation or a filter. Here, we show an example iteration where the body (the part of the computation that is repeated) is a simple map transformation, and the elements that are fed back are distinguished by the elements that are forwarded downstream using filters.

val iteratedStream = someDataStream.iterate(iteration => {val iterationBody = iteration.map(/* this is executed many times */)(tail.filter(/* one part of the stream */), tail.filter(/* some other part of the stream */))
})

By default the partitioning of the feedback stream will be automatically set to be the same as the input of the iteration head. To override this the user can set an optional boolean flag in the closeWith method.

For example, here is program that continuously subtracts 1 from a series of integers until they reach zero:

val someIntegers: DataStream[Long] = env.generateSequence(0, 1000)val iteratedStream = someIntegers.iterate(iteration => {val minusOne = iteration.map( v => v - 1)val stillGreaterThanZero = minusOne.filter (_ > 0)val lessThanZero = minusOne.filter(_ <= 0)(stillGreaterThanZero, lessThanZero)}
)

Back to top

Connectors

Connectors provide code for interfacing with various third-party systems.

Currently these systems are supported:

  • Apache Kafka (sink/source)
  • Elasticsearch (sink)
  • Hadoop FileSystem (sink)
  • RabbitMQ (sink/source)
  • Twitter Streaming API (source)

To run an application using one of these connectors, additional third party components are usually required to be installed and launched, e.g. the servers for the message queues. Further instructions for these can be found in the corresponding subsections. Docker containers are also provided encapsulating these services to aid users getting started with connectors.

Apache Kafka

This connector provides access to event streams served by Apache Kafka.

Flink provides special Kafka Connectors for reading and writing data to Kafka topics. The Flink Kafka Consumer integrates with Flink’s checkpointing mechanisms to provide different processing guarantees (most importantly exactly-once guarantees).

For exactly-once processing Flink can not rely on the auto-commit capabilities of the Kafka consumers. The Kafka consumer might commit offsets to Kafka which have not been processed successfully.

Please pick a package (maven artifact id) and class name for your use-case and environment. For most users, the flink-connector-kafka-083package and the FlinkKafkaConsumer082 class are appropriate.

Package Supported since Class name Kafka version Checkpointing behavior Notes
flink-connector-kafka 0.9, 0.10 KafkaSource 0.8.1, 0.8.2 Does not participate in checkpointing (no consistency guarantees) Uses the old, high level KafkaConsumer API, autocommits to ZK via Kafka
flink-connector-kafka 0.9, 0.10 PersistentKafkaSource 0.8.1, 0.8.2 Does not guarantee exactly-once processing, element order, or strict partition assignment Uses the old, high level KafkaConsumer API, offsets are committed into ZK manually
flink-connector-kafka-083 0.9.1, 0.10 FlinkKafkaConsumer081 0.8.1 Guarantees exactly-once processing Uses the SimpleConsumer API of Kafka internally. Offsets are committed to ZK manually
flink-connector-kafka-083 0.9.1, 0.10 FlinkKafkaConsumer082 0.8.2 Guarantee exactly-once processing Uses the SimpleConsumer API of Kafka internally. Offsets are committed to ZK manually

Then, import the connector in your maven project:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.0-SNAPSHOT</version>
</dependency>

Note that the streaming connectors are currently not part of the binary distribution. See how to link with them for cluster execution here.

Installing Apache Kafka

  • Follow the instructions from Kafka’s quickstart to download the code and launch a server (launching a Zookeeper and a Kafka server is required every time before starting the application).
  • On 32 bit computers this problem may occur.
  • If the Kafka and Zookeeper servers are running on a remote machine, then the advertised.host.name setting in theconfig/server.properties file must be set to the machine’s IP address.

Kafka Consumer

The standard FlinkKafkaConsumer082 is a Kafka consumer providing access to one topic.

The following parameters have to be provided for the FlinkKafkaConsumer082(...) constructor:

  1. The topic name
  2. A DeserializationSchema
  3. Properties for the Kafka consumer. The following properties are required:
    • “bootstrap.servers” (comma separated list of Kafka brokers)
    • “zookeeper.connect” (comma separated list of Zookeeper servers)
    • “group.id” the id of the consumer group

Example:

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
DataStream<String> stream = env.addSource(new FlinkKafkaConsumer082<>("topic", new SimpleStringSchema(), properties)).print();

val properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
stream = env.addSource(new KafkaSource[String]("topic", new SimpleStringSchema(), properties)).print

Kafka Consumers and Fault Tolerance

As Kafka persists all the data, a fault tolerant Kafka consumer can be provided.

The FlinkKafkaConsumer082 can read a topic, and if the job fails for some reason, the source will continue on reading from where it left off after a restart. For example if there are 3 partitions in the topic with offsets 31, 122, 110 read at the time of job failure, then at the time of restart it will continue on reading from those offsets, no matter whether these partitions have new messages.

To use fault tolerant Kafka Consumers, checkpointing of the topology needs to be enabled at the execution environment:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);

Also note that Flink can only restart the topology if enough processing slots are available to restart the topology. So if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards. Flink on YARN supports automatic restart of lost YARN containers.

Kafka Sink

A class providing an interface for sending data to Kafka.

The following arguments have to be provided for the KafkaSink(…) constructor in order:

  1. Broker address (in hostname:port format, can be a comma separated list)
  2. The topic name
  3. Serialization schema

Example:

stream.addSink(new KafkaSink<String>("localhost:9092", "test", new SimpleStringSchema()));

stream.addSink(new KafkaSink[String]("localhost:9092", "test", new SimpleStringSchema))

The user can also define custom Kafka producer configuration for the KafkaSink with the constructor:

public KafkaSink(String zookeeperAddress, String topicId, Properties producerConfig,SerializationSchema<IN, byte[]> serializationSchema)

public KafkaSink(String zookeeperAddress, String topicId, Properties producerConfig,SerializationSchema serializationSchema)

If this constructor is used, the user needs to make sure to set the broker(s) with the “metadata.broker.list” property. Also the serializer configuration should be left default, and the serialization should be set via SerializationSchema.

The Apache Kafka official documentation can be found here.

Back to top

Elasticsearch

This connector provides a Sink that can write to an Elasticsearch Index. To use this connector, add the following dependency to your project:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch</artifactId><version>1.0-SNAPSHOT</version>
</dependency>

Note that the streaming connectors are currently not part of the binary distribution. See here for information about how to package the program with the libraries for cluster execution.

Installing Elasticsearch

Instructions for setting up an Elasticsearch cluster can be found here. Make sure to set and remember a cluster name. This must be set when creating a Sink for writing to your cluster

Elasticsearch Sink

The connector provides a Sink that can send data to an Elasticsearch Index.

The sink can use two different methods for communicating with Elasticsearch:

  1. An embedded Node
  2. The TransportClient

See here for information about the differences between the two modes.

This code shows how to create a sink that uses an embedded Node for communication:

DataStream<String> input = ...;Map<String, String> config = Maps.newHashMap();
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1");
config.put("cluster.name", "my-cluster-name");input.addSink(new ElasticsearchSink<>(config, new IndexRequestBuilder<String>() {@Overridepublic IndexRequest createIndexRequest(String element, RuntimeContext ctx) {Map<String, Object> json = new HashMap<>();json.put("data", element);return Requests.indexRequest().index("my-index").type("my-type").source(json);}
}));

val input: DataStream[String] = ...val config = new util.HashMap[String, String]
config.put("bulk.flush.max.actions", "1")
config.put("cluster.name", "my-cluster-name")text.addSink(new ElasticsearchSink(config, new IndexRequestBuilder[String] {override def createIndexRequest(element: String, ctx: RuntimeContext): IndexRequest = {val json = new util.HashMap[String, AnyRef]json.put("data", element)println("SENDING: " + element)Requests.indexRequest.index("my-index").`type`("my-type").source(json)}
}))

Note how a Map of Strings is used to configure the Sink. The configuration keys are documented in the Elasticsearch documentation here. Especially important is the cluster.name parameter that must correspond to the name of your cluster.

Internally, the sink uses a BulkProcessor to send index requests to the cluster. This will buffer elements before sending a request to the cluster. The behaviour of the BulkProcessor can be configured using these config keys: * bulk.flush.max.actions: Maximum amount of elements to buffer * bulk.flush.max.size.mb: Maximum amount of data (in megabytes) to buffer * bulk.flush.interval.ms: Interval at which to flush data regardless of the other two settings in milliseconds

This example code does the same, but with a TransportClient:

DataStream<String> input = ...;Map<String, String> config = Maps.newHashMap();
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1");
config.put("cluster.name", "my-cluster-name");List<TransportAddress> transports = new ArrayList<String>();
transports.add(new InetSocketTransportAddress("node-1", 9300));
transports.add(new InetSocketTransportAddress("node-2", 9300));input.addSink(new ElasticsearchSink<>(config, transports, new IndexRequestBuilder<String>() {@Overridepublic IndexRequest createIndexRequest(String element, RuntimeContext ctx) {Map<String, Object> json = new HashMap<>();json.put("data", element);return Requests.indexRequest().index("my-index").type("my-type").source(json);}
}));

val input: DataStream[String] = ...val config = new util.HashMap[String, String]
config.put("bulk.flush.max.actions", "1")
config.put("cluster.name", "my-cluster-name")val transports = new ArrayList[String]
transports.add(new InetSocketTransportAddress("node-1", 9300))
transports.add(new InetSocketTransportAddress("node-2", 9300))text.addSink(new ElasticsearchSink(config, transports, new IndexRequestBuilder[String] {override def createIndexRequest(element: String, ctx: RuntimeContext): IndexRequest = {val json = new util.HashMap[String, AnyRef]json.put("data", element)println("SENDING: " + element)Requests.indexRequest.index("my-index").`type`("my-type").source(json)}
}))

The difference is that we now need to provide a list of Elasticsearch Nodes to which the sink should connect using a TransportClient.

More about information about Elasticsearch can be found here.

Back to top

Hadoop FileSystem

This connector provides a Sink that writes rolling files to any filesystem supported by Hadoop FileSystem. To use this connector, add the following dependency to your project:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-filesystem</artifactId><version>1.0-SNAPSHOT</version>
</dependency>

Note that the streaming connectors are currently not part of the binary distribution. See here for information about how to package the program with the libraries for cluster execution.

Rolling File Sink

The rolling behaviour as well as the writing can be configured but we will get to that later. This is how you can create a default rolling sink:

DataStream<String> input = ...;input.addSink(new RollingSink<String>("/base/path"));

val input: DataStream[String] = ...input.addSink(new RollingSink("/base/path"))

The only required parameter is the base path where the rolling files (buckets) will be stored. The sink can be configured by specifying a custom bucketer, writer and batch size.

By default the rolling sink will use the pattern "yyyy-MM-dd--HH" to name the rolling buckets. This pattern is passed to SimpleDateFormat with the current system time to form a bucket path. A new bucket will be created whenever the bucket path changes. For example, if you have a pattern that contains minutes as the finest granularity you will get a new bucket every minute. Each bucket is itself a directory that contains several part files: Each parallel instance of the sink will create its own part file and when part files get too big the sink will also create a new part file next to the others. To specify a custom bucketer use setBucketer() on a RollingSink.

The default writer is StringWriter. This will call toString() on the incoming elements and write them to part files, separated by newline. To specify a custom writer use setWriter() on a RollingSink. If you want to write Hadoop SequenceFiles you can use the providedSequenceFileWriter which can also be configured to use compression.

The last configuration option is the batch size. This specifies when a part file should be closed and a new one started. (The default part file size is 384 MB).

Example:

DataStream<Tuple2<IntWritable,Text>> input = ...;RollingSink sink = new RollingSink<String>("/base/path");
sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"));
sink.setWriter(new SequenceFileWriter<IntWritable, Text>());
sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,input.addSink(sink);

val input: DataStream[Tuple2[IntWritable, Text]] = ...val sink = new RollingSink[String]("/base/path")
sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"))
sink.setWriter(new SequenceFileWriter[IntWritable, Text]())
sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,input.addSink(sink)

This will create a sink that writes to bucket files that follow this schema:

/base/path/{date-time}/part-{parallel-task}-{count}

Where date-time is the string that we get from the date/time format, parallel-task is the index of the parallel sink instance and count is the running number of part files that where created because of the batch size.

For in-depth information, please refer to the JavaDoc for RollingSink.

Back to top

RabbitMQ

This connector provides access to data streams from RabbitMQ. To use this connector, add the following dependency to your project:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-rabbitmq</artifactId><version>1.0-SNAPSHOT</version>
</dependency>

Note that the streaming connectors are currently not part of the binary distribution. See linking with them for cluster execution here.

Installing RabbitMQ

Follow the instructions from the RabbitMQ download page. After the installation the server automatically starts, and the application connecting to RabbitMQ can be launched.

RabbitMQ Source

A class providing an interface for receiving data from RabbitMQ.

The followings have to be provided for the RMQSource(…) constructor in order:

  1. The hostname
  2. The queue name
  3. Deserialization schema

Example:

DataStream<String> stream = env.addSource(new RMQSource<String>("localhost", "hello", new SimpleStringSchema())).print

stream = env.addSource(new RMQSource[String]("localhost", "hello", new SimpleStringSchema)).print

RabbitMQ Sink

A class providing an interface for sending data to RabbitMQ.

The followings have to be provided for the RMQSink(…) constructor in order:

  1. The hostname
  2. The queue name
  3. Serialization schema

Example:

stream.addSink(new RMQSink<String>("localhost", "hello", new StringToByteSerializer()));

stream.addSink(new RMQSink[String]("localhost", "hello", new StringToByteSerializer))

More about RabbitMQ can be found here.

Back to top

Twitter Streaming API

Twitter Streaming API provides opportunity to connect to the stream of tweets made available by Twitter. Flink Streaming comes with a built-inTwitterSource class for establishing a connection to this stream. To use this connector, add the following dependency to your project:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-twitter</artifactId><version>1.0-SNAPSHOT</version>
</dependency>

Note that the streaming connectors are currently not part of the binary distribution. See linking with them for cluster execution here.

Authentication

In order to connect to Twitter stream the user has to register their program and acquire the necessary information for the authentication. The process is described below.

Acquiring the authentication information

First of all, a Twitter account is needed. Sign up for free at twitter.com/signup or sign in at Twitter’s Application Management and register the application by clicking on the “Create New App” button. Fill out a form about your program and accept the Terms and Conditions. After selecting the application, the API key and API secret (called consumerKey and sonsumerSecret in TwitterSource respectively) is located on the “API Keys” tab. The necessary access token data (token and secret) can be acquired here. Remember to keep these pieces of information secret and do not push them to public repositories.

Accessing the authentication information

Create a properties file, and pass its path in the constructor of TwitterSource. The content of the file should be similar to this:

#properties file for my app
secret=***
consumerSecret=***
token=***-***
consumerKey=***

Constructors

The TwitterSource class has two constructors.

  1. public TwitterSource(String authPath, int numberOfTweets); to emit finite number of tweets
  2. public TwitterSource(String authPath); for streaming

Both constructors expect a String authPath argument determining the location of the properties file containing the authentication information. In the first case, numberOfTweets determines how many tweet the source emits.

Usage

In contrast to other connectors, the TwitterSource depends on no additional services. For example the following code should run gracefully:

DataStream<String> streamSource = env.addSource(new TwitterSource("/PATH/TO/myFile.properties"));

streamSource = env.addSource(new TwitterSource("/PATH/TO/myFile.properties"))

The TwitterSource emits strings containing a JSON code. To retrieve information from the JSON code you can add a FlatMap or a Map function handling JSON code. For example, there is an implementation JSONParseFlatMap abstract class among the examples. JSONParseFlatMap is an extension of the FlatMapFunction and has a

String getField(String jsonText, String field);

getField(jsonText : String, field : String) : String

function which can be use to acquire the value of a given field.

There are two basic types of tweets. The usual tweets contain information such as date and time of creation, id, user, language and many more details. The other type is the delete information.

Example

TwitterLocal is an example how to use TwitterSource. It implements a language frequency counter program.

Back to top

Docker containers for connectors

A Docker container is provided with all the required configurations for test running the connectors of Apache Flink. The servers for the message queues will be running on the docker container while the example topology can be run on the user’s computer.

Installing Docker

The official Docker installation guide can be found here. After installing Docker an image can be pulled for each connector. Containers can be started from these images where all the required configurations are set.

Creating a jar with all the dependencies

For the easiest setup, create a jar with all the dependencies of the flink-streaming-connectors project.

cd /PATH/TO/GIT/flink/flink-staging/flink-streaming-connectors
mvn assembly:assembly
~~~bashThis creates an assembly jar under *flink-streaming-connectors/target*.#### RabbitMQ
Pull the docker image:~~~bash
sudo docker pull flinkstreaming/flink-connectors-rabbitmq

To run the container, type:

sudo docker run -p 127.0.0.1:5672:5672 -t -i flinkstreaming/flink-connectors-rabbitmq

Now a terminal has started running from the image with all the necessary configurations to test run the RabbitMQ connector. The -p flag binds the localhost’s and the Docker container’s ports so RabbitMQ can communicate with the application through these.

To start the RabbitMQ server:

sudo /etc/init.d/rabbitmq-server start

To launch the example on the host computer, execute:

java -cp /PATH/TO/JAR-WITH-DEPENDENCIES org.apache.flink.streaming.connectors.rabbitmq.RMQTopology \
> log.txt 2> errorlog.txt

There are two connectors in the example. One that sends messages to RabbitMQ, and one that receives messages from the same queue. In the logger messages, the arriving messages can be observed in the following format:

<DATE> INFO rabbitmq.RMQTopology: String: <one> arrived from RMQ
<DATE> INFO rabbitmq.RMQTopology: String: <two> arrived from RMQ
<DATE> INFO rabbitmq.RMQTopology: String: <three> arrived from RMQ
<DATE> INFO rabbitmq.RMQTopology: String: <four> arrived from RMQ
<DATE> INFO rabbitmq.RMQTopology: String: <five> arrived from RMQ

Apache Kafka

Pull the image:

sudo docker pull flinkstreaming/flink-connectors-kafka

To run the container type:

sudo docker run -p 127.0.0.1:2181:2181 -p 127.0.0.1:9092:9092 -t -i \
flinkstreaming/flink-connectors-kafka

Now a terminal has started running from the image with all the necessary configurations to test run the Kafka connector. The -p flag binds the localhost’s and the Docker container’s ports so Kafka can communicate with the application through these. First start a zookeeper in the background:

/kafka_2.9.2-0.8.1.1/bin/zookeeper-server-start.sh /kafka_2.9.2-0.8.1.1/config/zookeeper.properties \
> zookeeperlog.txt &

Then start the kafka server in the background:

/kafka_2.9.2-0.8.1.1/bin/kafka-server-start.sh /kafka_2.9.2-0.8.1.1/config/server.properties \> serverlog.txt 2> servererr.txt &

To launch the example on the host computer execute:

java -cp /PATH/TO/JAR-WITH-DEPENDENCIES org.apache.flink.streaming.connectors.kafka.KafkaTopology \
> log.txt 2> errorlog.txt

In the example there are two connectors. One that sends messages to Kafka, and one that receives messages from the same queue. In the logger messages, the arriving messages can be observed in the following format:

<DATE> INFO kafka.KafkaTopology: String: (0) arrived from Kafka
<DATE> INFO kafka.KafkaTopology: String: (1) arrived from Kafka
<DATE> INFO kafka.KafkaTopology: String: (2) arrived from Kafka
<DATE> INFO kafka.KafkaTopology: String: (3) arrived from Kafka
<DATE> INFO kafka.KafkaTopology: String: (4) arrived from Kafka
<DATE> INFO kafka.KafkaTopology: String: (5) arrived from Kafka
<DATE> INFO kafka.KafkaTopology: String: (6) arrived from Kafka
<DATE> INFO kafka.KafkaTopology: String: (7) arrived from Kafka
<DATE> INFO kafka.KafkaTopology: String: (8) arrived from Kafka
<DATE> INFO kafka.KafkaTopology: String: (9) arrived from Kafka

Back to top

Program Packaging & Distributed Execution

See the relevant section of the DataSet API documentation.

Back to top

Parallel Execution

See the relevant section of the DataSet API documentation.

Back to top

Execution Plans

See the relevant section of the DataSet API documentation.

Back to top

11 编程指南_流数据相关推荐

  1. python初学者编程指南_动态编程初学者指南

    python初学者编程指南 编程辅导 (PROGRAMMING TUTORIAL) Dynamic programming is an art, the more problems you solve ...

  2. hive编程指南_第三篇|Spark SQL编程指南

    在<第二篇|Spark Core编程指南>一文中,对Spark的核心模块进行了讲解.本文将讨论Spark的另外一个重要模块--Spark SQL,Spark SQL是在Shark的基础之上 ...

  3. 编程指南_今晚7点,译者编程入门指南抽奖!

    各位关注"简言"的同学们好.老师们! 我的新书<译者编程入门指南>出版啦!感谢大家一直以来的支持和陪伴,我每次发完文章后都会得到大家的点赞.转发.留言甚至打赏,我感到非 ...

  4. maya python 游戏与影视编程指南_《Maya Python游戏与影视编程指南》【价格 目录 书评 正版】_中国图书网...

    致谢 xi引言:欢迎使用maya python xii第 1 部分 python和maya的基础知识 1第 1 章 maya 命令引擎和用户界面 21.1 与maya 进行交互 3 maya 嵌入式语 ...

  5. DirectX 11 编程指南

    分享一下我老师大神的人工智能教程!零基础,通俗易懂!http://blog.csdn.net/jiangjunshow 也欢迎大家转载本篇文章.分享知识,造福人民,实现我们中华民族伟大复兴! 微软在2 ...

  6. mapreduce编程规范_大数据之MapReduce详解

    今天要讲的是MapReduce 目录 今天先总体说下MapReduce的相关知识,后续将会详细说明对应的shuffle.mr与yarn的联系.以及mr的join操作的等知识.以下内容全是个人学习后的见 ...

  7. 大数据:技术与应用实践指南_大数据技术与应用社团 社会实践总结篇

    不知不觉,我们已经在家里呆了七个月了 也不知道宿舍还好吗 小伙伴们有没有在家好好学习 在这个漫长的假期里,我们热爱学习的大数据技术与应用社团举办了为期七天的社会实践活动. 本次实践活动主要内容为网页设 ...

  8. iPhone应用程序编程指南

    介绍 请注意:本文档之前命名为iPhone OS编程指南. iPhone SDK为创建iPhone的本地应用程序提供必需的工具和资源.在用户的Home屏幕上,iPhone的本地应用程序表示为图标.它们 ...

  9. (转)---iPhone应用程序编程指南

    请注意:本文档之前命名为iPhone OS编程指南. iPhone SDK为创建iPhone的本地应用程序提供必需的工具和资源.在用户的Home屏幕上,iPhone的本地应用程序表示为图标.它们和运行 ...

最新文章

  1. 昨晚,B站崩了!看了网友们的评论,我差点笑死...
  2. mysql的引擎讲解
  3. Qt编写自定义控件及插件的使用
  4. postman--安装及Interceptor插件
  5. 蔚来:首台ET7白车身合肥工厂下线
  6. 用友ERP-NC系统 NCFindWeb接口文件读取
  7. WebGrid CRM 功能模块描述
  8. 诺基亚n9用linux软件,Nokia N9 卸载国行N9自带的第三方软件
  9. 基于MC1496芯片的AM调制与解调
  10. python操作autocad_利用python控制Autocad:pyautocad方式
  11. 参与百度世界2012 赢百度APP推广大礼包
  12. clustalw序列比对_你还在用ClustalW做多序列比对?OUT了
  13. python Excel表序号(leetcode)
  14. keep-alive 的作用及使用场景
  15. 如何将“\”替换为任意字符
  16. point类型的数组java_Java基础学习之引用类型数组访问NullPoint问题
  17. 一种可以使身体吸收天道法则的电脑
  18. CSDN 编程竞赛二十一期题解
  19. 怎么在线录制电脑内部声音
  20. 办公自动化打卡 task01

热门文章

  1. IE使用ntko插件,查看文件提示“文件存取错误”
  2. MNIST手写数字识别之MLP实现
  3. 【设计模式】11-15:迪米特拉(最小知识)原则、外观模式、建造者模式、观察者模式、抽象工厂模式...
  4. Numpy-如何对数组进行切割
  5. 有哪些能给视频加特效字幕的软件?试试这几种简单方法
  6. vue实现下拉二级联动_Vue实现三级联动/多级联动/城市选择
  7. springboot整合德鲁伊
  8. 哈尔滨理工大学第七届程序设计竞赛初赛(高年级组)题解
  9. linux灯控软件,Ubuntu下通过脚本控制键盘背光灯
  10. 校园网状态下通过智能插座为ipad实现远程智能充电