Apache Flink

Flink发展史

第一代大数据处理方案:2006年Hadoop的MapReduce-批/HDFS, 2014年9月份 apache Storm-流

第二代大数据处理方案:2014年2 Spark RDD -批处理 ,DStream - 流 (批模拟流 )延迟高

第三代大数据处理方案:2014年12 Flink DataStream-流,Dataset- 批 吞吐量高,低延迟特点。

Flink和Spark相似采用先进的DAG模型做任务拆分完成数据的内存计算,但是Flink是一个纯流式计算引擎。不同于Spark在批处理之上构建流处理,Flink设计恰恰和Spark相反,Flink是在流计算上构建批处理

架构剖析(√)

推荐阅读:https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html

Tasks&Operator Chains

“Oprator Chain”:将多个操作符合并到一个Task中,减少不必要的任务间网络通信。

“Task”:类似于Spark中Stage,将任务拆分成若干个阶段。

“SubTaks”:每个Task都有任务执行并行度,每个Task根据并行度拆分成若干个SubTask(等价于线程)

Operator Chain效果

Operator Chain关闭

Job Managers, Task Managers, Clients

JobManagers:称为Master,负责分布式任务调度,调度Task执行,协调checkpoint,实现故障恢复。

There is always at least one Job Manager. A high-availability setup will have multiple JobManagers, one of which one is always the leader, and the others are standby.

TaskManagers :称为Worker节点,负责执行Dataflow Graph(DAG)中SubTask,负责数据缓冲和交换。主动连接JobManager,声明自身状态信息和汇报应经分配的任务。

There must always be at least one TaskManager.

client :并不是运行时一个部分(和Spark Driver不同),负责生成或者发送dataflow给JobManager。

用户可以通过WebUI或者flink run运行代码

[root@HadoopNode00 flink-1.8.1]# ./bin/flink run --class com.baizhi.quickstart.FlinkWordCounts --detached --parallelism 3 --jobmanager HadoopNode00:8081  /root/flink-1.0-SNAPSHOT.jar
[root@HadoopNode00 flink-1.8.1]# ./bin/flink list --jobmanager HadoopNode00:8081 #查看服务列表
------------------ Running/Restarting Jobs -------------------
23.12.2019 14:59:10 : 2fce0e6f136fac71ce6b1aad1ae8927e : FlinkWordCounts (RUNNING)
--------------------------------------------------------------
No scheduled jobs.
[root@HadoopNode00 flink-1.8.1]# ./bin/flink cancel --jobmanager HadoopNode00:8081  2fce0e6f136fac71ce6b1aad1ae8927e # cancel任务
Cancelling job 2fce0e6f136fac71ce6b1aad1ae8927e.
Cancelled job 2fce0e6f136fac71ce6b1aad1ae8927e.

Task Slots & Resources

每个TaskManager是一个JVM进程,执行1~N个SubTask(等价线程),TaskSolts表示一个TaskManager所能接受的任务数,TaskSlot越多说明该节点结算能力越强。

Each task slot represents a fixed subset of resources of the TaskManager. A TaskManager with three slots, for example, will dedicate 1/3 of its managed memory to each slot. Slotting the resources means that a subtask will not compete with subtasks from other jobs for managed memory, but instead has a certain amount of reserved managed memory. Note that no CPU isolation happens here; currently slots only separate the managed memory of tasks.

TaskSlot会对计算节点内存进行均分,不同的Job持有不同TaskSlot,继而程序在运行时实现内存隔离。任意job在执行之前都必须分配额定数据的TaskSlot,这些TaskSlot和该job中最大的Task并行度相等。

  • 不同Job间通过TaskSlot进行隔离
  • 同一个Job的不同Task的SubTask之间可以共享slot
  • 同一个Job的相同Task的SubTask之间不可以共享slot

资源隔离:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/#task-chaining-and-resource-groups

State Backends(状态后端)

Flink是一个有状态的计算框架,所有的计算状态数据存储在内存中或者是RocksDB中,这取决于state backend策略的选取。默认情况下Flink的backend状态数据存在JobManager的内存中,一般用作测试环境(数据集非常小)。如果在生产环境下一般会采取以下两种状态后端:filesysterm,rocksdb.由于流计算在JobManager的管理下会定期的进行checkpoint机制,存储计算的快照信息,快照信息会根据state backend后端实现,将状态数据持久化起来。

Savepoints&Checkpoint

Flink的流计算可以从savepoint进行恢复,savepoint使得程序在升级的时候,可以依然保持升级以前的计算状态。Checkpoint是由JobManager定期触发checkpoint机制,用于将计算状态存储在backend中,当完成了最新checkpoint之后,系统会自动删除older checkpoint数据。相比较于checkpoint而言,Savepoint是通过手动方式触发checkpoint,触发的结果不会被删除。

[root@HadoopNode00 flink-1.8.1]# ./bin/flink cancel --jobmanager HadoopNode00:8081 --withSavepoint hdfs:///flink-savepoint  ee04f833b7df47bc5c4876cede2cbfb5
Cancelling job ee04f833b7df47bc5c4876cede2cbfb5 with savepoint to hdfs:///flink-savepoint.
Cancelled job ee04f833b7df47bc5c4876cede2cbfb5. Savepoint stored in hdfs://HadoopNode00:9000/flink-savepoint/savepoint-ee04f8-2b314a0076aa.

Flink Standalone(能搭建)

  • 设置CentOS进程数和文件数(重启生效) -可选
[root@HadoopNode00 ~]# vi /etc/security/limits.conf
* soft nofile 204800
* hard nofile 204800
* soft nproc 204800
* hard nproc 204800
  • 配置主机名(重启生效)
[root@HadoopNode00 ~]# vi /etc/sysconfig/network
NETWORKING=yes
HOSTNAME=HadoopNode00
  • 配置主机名和IP的关系
[root@HadoopNode00 ~]# cat /etc/hosts
127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.126.10 HadoopNode00
  • 关闭防火墙
[root@HadoopNode00 ~]# service iptables stop
iptables: Setting chains to policy ACCEPT: filter          [  OK  ]
iptables: Flushing firewall rules:                         [  OK  ]
iptables: Unloading modules:                               [  OK  ]
[root@HadoopNode00 ~]# chkconfig iptables off
[root@HadoopNode00 ~]# chkconfig --list | grep iptables
iptables        0:off   1:off   2:off   3:off   4:off   5:off   6:off
  • 安装JD1.8,配置JAVA_HOME(~/.bashrc)-略
  • 配置SSH面密码认证-略
  • 安装配置Hadoop,配置HADOOP_HOME和HADOOP_CALSSPATH(~/.bashrc)- 略
  • Flink 安装与配置

1,解压flink-1.8.1-bin-scala_2.11.tgz到指定目录下/home/flink

[root@HadoopNode00 ~]# mkdir /home/flink
[root@HadoopNode00 ~]# tar -zxf flink-1.8.1-bin-scala_2.11.tgz -C /home/flink/
[root@HadoopNode00 ~]# cd /home/flink/flink-1.8.1/
[root@HadoopNode00 flink-1.8.1]# ls -l
total 628
drwxr-xr-x. 2 502 games   4096 Dec 23 11:26 bin  # 执行脚本
drwxr-xr-x. 2 502 games   4096 Jun 25 16:10 conf # 配置目录
drwxr-xr-x. 6 502 games   4096 Dec 23 11:26 examples # 案例
drwxr-xr-x. 2 502 games   4096 Dec 23 11:26 lib # 系统依赖jars
-rw-r--r--. 1 502 games  11357 Jun 14  2019 LICENSE
drwxr-xr-x. 2 502 games   4096 Dec 23 11:26 licenses
drwxr-xr-x. 2 502 games   4096 Jun 24 23:02 log #系统启动日志,出错可以查看
-rw-r--r--. 1 502 games 596009 Jun 24 23:02 NOTICE
drwxr-xr-x. 2 502 games   4096 Dec 23 11:26 opt # Flink第三方可选jar,当需要的时候拷贝到lib下
-rw-r--r--. 1 502 games   1308 Jun 14  2019 README.txt
[root@HadoopNode00 flink-1.8.1]# tree conf/
conf/
├── flink-conf.yaml  # 主配置文件 √
├── log4j-cli.properties
├── log4j-console.properties
├── log4j.properties
├── log4j-yarn-session.properties
├── logback-console.xml
├── logback.xml
├── logback-yarn.xml
├── masters # 主节点信息,在单机环境下无需配置
├── slaves  # 计算节点信息 √
├── sql-client-defaults.yaml
└── zoo.cfg

2,配置slaves配置文件

[root@HadoopNode00 flink-1.8.1]# vi conf/slaves

HadoopNode00

3,配置flink-conf.yaml

#==============================================================================
# Common
#==============================================================================
jobmanager.rpc.address: HadoopNode00
# 表示从机的计算资源数
taskmanager.numberOfTaskSlots: 4
# 配置任务的默认计算并行度
parallelism.default: 3

4,启动Flink服务

[root@HadoopNode00 flink-1.8.1]# ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host HadoopNode00.
Starting taskexecutor daemon on host HadoopNode00.
[root@HadoopNode00 flink-1.8.1]# jps
10833 TaskManagerRunner
10340 StandaloneSessionClusterEntrypoint
10909 Jps

Quick Start

  • maven 依赖
<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>1.8.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>1.8.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.11</artifactId><version>1.8.1</version></dependency>
</dependencies>
<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>7</source><target>7</target></configuration></plugin><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>4.0.1</version><executions><execution><id>scala-compile-first</id><phase>process-resources</phase><goals><goal>add-source</goal><goal>compile</goal></goals></execution></executions></plugin><plugin><!-- maven 打包插件 打原始jar包 第三方依赖打入jar包中--><artifactId>maven-assembly-plugin</artifactId><configuration><archive><manifest><mainClass></mainClass></manifest><manifestEntries><Class-Path>.</Class-Path></manifestEntries></archive><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id> <!-- this is used for inheritance merges --><phase>package</phase> <!-- 指定在打包节点执行jar包合并操作 --><goals><goal>single</goal></goals></execution></executions></plugin></plugins>
</build>
  • 案例代码(会默写)
import org.apache.flink.streaming.api.scala._
object FlinkWordCounts {def main(args: Array[String]): Unit = {val fsEnv = StreamExecutionEnvironment.getExecutionEnvironmentfsEnv.socketTextStream("HadoopNode00",9999).flatMap(line=>line.split("\\s+")).map(word=>(word,1)).keyBy(0).sum(1).print()fsEnv.execute("FlinkWordCounts")}
}

Flink代码结构

import org.apache.flink.streaming.api.scala._//1.创建 StreamExecutionEnvironment
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment//2.指定输入Source - 细化
fsEnv.socketTextStream("HadoopNode00",9999)//3.Flink常见 Operators -细化
.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)//4.指定输出Sink - 细化
.print()//5.执行流计算
fsEnv.execute("FlinkWordCounts")

StreamExecutionEnvironment

//仅仅用于本地测试,需要用户指定并行度
val fsEnv = StreamExecutionEnvironment.createLocalEnvironment(3)
//跨平台提交,交叉测试
var jarFiles="flink\\target\\flink-1.0-SNAPSHOT.jar"
val fsEnv = StreamExecutionEnvironment.createRemoteEnvironment("HadoopNode00",8081,3,jarFiles )
//该模式可以自动识别是生产环境还是IDE集成开发环境
//如果是IDE环境,自动识别系统CPU核数,作为系统化测试并行度
//如果是生产环境下需要指定--parallelism指定并行度
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment

DataSource

是程序读入数据的地方,一般通过调用fsEnv#addSource完成Source的指定工作。Flink内置一些SourceFunction以便测试使用,同时Flink也允许用户自定义Source,自定义Source一般通过实现SourceFunction(非并行实现)或者实现ParallelSourceFunction|RichParallelSourceFunction `实现并行Source。

File-based

readTextFile(path) - Reads text files, i.e. files that respect the TextInputFormat specification, line-by-line and returns them as Strings.- 类似于批处理,仅仅运算一次

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.readTextFile("file:///D:/demo")
.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
.print()fsEnv.execute("FlinkWordCountsProductEnv")
  • readFile(fileInputFormat, path) - Reads (once) files as dictated by the specified file input format.
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
var inoutFormat= new TextInputFormat(null)
var filePath="file:///D:/demo"
fsEnv.readFile(inoutFormat,filePath)
.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
.print()fsEnv.execute("FlinkWordCountsReadFile")
  • readFile(fileInputFormat, path, watchType, interval, pathFilter) - This is the method called internally by the two previous ones. It reads files in the path based on the given fileInputFormat. Depending on the provided watchType, this source may periodically monitor (every interval ms) the path for new data (FileProcessingMode.PROCESS_CONTINUOUSLY), or process once the data currently in the path and exit (FileProcessingMode.PROCESS_ONCE). Using the pathFilter, the user can further exclude files from being processed.
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
var inoutFormat= new TextInputFormat(null)
var filePath="file:///D:/demo"
inoutFormat.setFilesFilter(new FilePathFilter {override def filterPath(filePath: Path): Boolean = {filePath.getName.endsWith(".txt")//过滤掉.txt结尾的文件}
})
fsEnv.readFile(inoutFormat,filePath,FileProcessingMode.PROCESS_CONTINUOUSLY,1000)
.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
.print()fsEnv.execute("FlinkWordCountsReadFile")

如果检测文件内容发生变化,Flink会重新计算整个文件,导致结果重复。

Socket-based

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironmentfsEnv.addSource[String](new SocketTextStreamFunction("HadoopNode00",9999,"\n",3))
.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
.print()fsEnv.execute("FlinkWordCountsSocketBased")

Collection-based:

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironmentfsEnv.fromCollection(Array("this is a demo","my name is jimi"))
.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
.print()fsEnv.execute("FlinkWordCountsCollection")

UserDefineSource

object FlinkWordCountsSourceFunction {def main(args: Array[String]): Unit = {val fsEnv = StreamExecutionEnvironment.getExecutionEnvironmentfsEnv.addSource[String](new UserDefineSourceFunction).flatMap(line=>line.split("\\s+")).map(word=>(word,1)).keyBy(0).sum(1).print()fsEnv.execute("FlinkWordCountsCollection")}
}
class UserDefineSourceFunction extends SourceFunction[String]{@volatile //禁止线程拷贝该变量private var isRunning = truevar messages=Array("this is a demo","hello world")override def run(ctx: SourceFunction.SourceContext[String]): Unit = {while(isRunning){Thread.sleep(1000)val randomIndex = new Random().nextInt(messages.length)ctx.collect(messages(randomIndex))//将结果输出给下游}}override def cancel(): Unit = {println("===cancel===")isRunning=false}
}

Kafka (掌握 √)

0.11.0.0FlinkKafkaConsumer011|FlinkKafkaProducer011

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_2.11</artifactId><version>1.8.1</version>
</dependency>
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironmentval props = new Properties()
props.setProperty("bootstrap.servers", "hadoopnode00:9092")
props.setProperty("group.id", "flink")
fsEnv.addSource[String](new FlinkKafkaConsumer011("flink_kafka",new SimpleStringSchema(),props)).flatMap(line=>line.split("\\s+")).map(word=>(word,1)).keyBy(0).sum(1).print()fsEnv.execute("FlinkWordCountsCollection")

1.0.0+FlinkKafkaConsumer|FlinkKafkaProducer

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.8.1</version>
</dependency>
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironmentval props = new Properties()
props.setProperty("bootstrap.servers", "hadoopnode00:9092")
props.setProperty("group.id", "flink")
fsEnv.addSource[String](new FlinkKafkaConsumer("flink_kafka",new SimpleStringSchema(),props)).flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
.print()fsEnv.execute("FlinkWordCountsCollection")
  • 获取key信息
class UserDefineKeyedDeserializationSchema extends KafkaDeserializationSchema[(String,String,Int,Long,Long)]{override def isEndOfStream(t: (String, String, Int, Long, Long)): Boolean = falseoverride def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String, Int, Long, Long) = {var value:String=nullvar key:String=nullif(consumerRecord.value()!=null){value=new String(consumerRecord.value())}if(consumerRecord.key()!=null){key=new String(consumerRecord.key())}var partition=consumerRecord.partition()var offset=consumerRecord.offset()var timestamp=consumerRecord.timestamp()(value,key,partition,offset,timestamp)}override def getProducedType: TypeInformation[(String, String, Int, Long, Long)] = {createTypeInformation[(String, String, Int, Long, Long)]}
}
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironmentval props = new Properties()
props.setProperty("bootstrap.servers", "hadoopnode00:9092")
props.setProperty("group.id", "flink")
fsEnv.addSource[(String,String,Int,Long,Long)](new FlinkKafkaConsumer("flink_kafka",new UserDefineKeyedDeserializationSchema(),props)).print()fsEnv.execute("FlinkWordCountsKafkaConsumer02")
  • 解析Kafka中json数据
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironmentval props = new Properties()
props.setProperty("bootstrap.servers", "hadoopnode00:9092")
props.setProperty("group.id", "flink")
fsEnv.addSource[ObjectNode](new FlinkKafkaConsumer011("flink_kafka",new JSONKeyValueDeserializationSchema(true),props))
.map(on=>(on.get("value").get("name").asText(),on.get("value").get("id").asInt(),on.get("metadata")))
.print()fsEnv.execute("FlinkWordCountsKafkaConsumer04")
}

更多请参考:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/

如果没有,必须考虑自定义Source

DataSink

DataSink负责消费Datastream中数据,然后将数据写入到网络/消息队列/数据库/文件等外围系统。flink预定义了一些常用的DataSink,同时也允许用户自定义DataSink通过实现SinkFunction或者是RichSinkFunction

write*

writeAsText()/writeAsCsv/writeUsingOutputFormat/writeToSocket)——不做要求

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironmentval props = new Properties()
props.setProperty("bootstrap.servers", "hadoopnode00:9092")
props.setProperty("group.id", "flink")
fsEnv.addSource[String](new FlinkKafkaConsumer011("flink_kafka",new SimpleStringSchema(),props)).flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
.writeAsText("file:///D:/flink/results",WriteMode.OVERWRITE)fsEnv.execute("FlinkWordCountsCollection")

由于测试,该输出到目标文件系统可能存在延迟。一般用于测试。

BucketingSink

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-filesystem_2.11</artifactId><version>1.8.1</version>
</dependency>
<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>2.6.0</version>
</dependency>
<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>2.6.0</version>
</dependency>
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
var inoutFormat= new TextInputFormat(null)
var filePath="file:///D:/demo"val bucketingSink = new BucketingSink[(String,Int)]("hdfs://HadoopNode00:9000/BucketingSink")
bucketingSink.setBucketer(new DateTimeBucketer[(String,Int)]("yyyy-MM-dd--HHmm", ZoneId.of("Asia/Shanghai")))
bucketingSink.setBatchSize(1024 * 1024 * 128); // this is 400 MB,
bucketingSink.setBatchRolloverInterval(20 * 60 * 1000)
bucketingSink.setWriter(new StringWriter[(String,Int)]());
fsEnv.readFile(inoutFormat,filePath)
.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
.addSink(bucketingSink)fsEnv.execute("FlinkWordCountsBucketingSink")

UserDefineSink √

class UserDefineSinkFunction(addationalKey:String) extends RichSinkFunction[(String,Int)]{var pool:JedisPool=_var resource:Jedis = _override def open(parameters: Configuration): Unit = {pool=new JedisPool("HadoopNode00",6379)resource=pool.getResource}override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = {resource.hset(addationalKey,value._1,value._2.toString)}override def close(): Unit = {resource.close()pool.close()}
}
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
var inoutFormat= new TextInputFormat(null)
var filePath="file:///D:/demo"fsEnv.readFile(inoutFormat,filePath)
.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
.addSink(new UserDefineSinkFunction("wordcounts"))fsEnv.execute("FlinkWordCountsUserDefineSink")

print/printToError

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.socketTextStream("HadoopNode00",9999)
.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
.printToErr("测试")fsEnv.execute("FlinkWordCounts")

RedisSink(√)

参考:https://bahir.apache.org/docs/flink/current/flink-streaming-redis/

<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
class UserDefineRedisMapper(addationKey:String) extends RedisMapper[(String,Int)]{override def getCommandDescription: RedisCommandDescription = {new RedisCommandDescription(RedisCommand.HSET,addationKey)}override def getKeyFromData(t: (String, Int)): String = {t._1}override def getValueFromData(t: (String, Int)): String = {t._2.toString}
}
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironmentval props = new Properties()
props.setProperty("bootstrap.servers", "hadoopnode00:9092")
props.setProperty("group.id", "flink")val jedisConfig = new FlinkJedisPoolConfig.Builder().setHost("hadoopnode00").setPort(6379).buildfsEnv.addSource[String](new FlinkKafkaConsumer011("flink_kafka",new SimpleStringSchema(),props)).flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
.addSink(new RedisSink[(String,Int)](jedisConfig,new UserDefineRedisMapper("wc_redis")))fsEnv.execute("FlinkWordCountsRedisSink")

KafkaSink-√

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_2.11</artifactId><version>1.8.1</version>
</dependency>
class UserDefineKeyedSerializationSchema extends KeyedSerializationSchema[(String,Int)]{override def serializeKey(element: (String, Int)): Array[Byte] = {element._1.getBytes()}override def serializeValue(element: (String, Int)): Array[Byte] = {element._2.toString.getBytes()}override def getTargetTopic(element: (String, Int)): String = {"count_topic"}
}
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironmentval props1 = new Properties()
props1.setProperty("bootstrap.servers", "hadoopnode00:9092")
props1.setProperty("group.id", "flink")val props2 = new Properties()
//无需指定key-value序列化
props2.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoopnode00:9092")
props2.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true")
props2.setProperty(ProducerConfig.ACKS_CONFIG,"all")
props2.setProperty(ProducerConfig.BATCH_SIZE_CONFIG,"1024")
props2.setProperty(ProducerConfig.LINGER_MS_CONFIG,"500")fsEnv.addSource[String](new FlinkKafkaConsumer011("flink_kafka",new SimpleStringSchema(),props1)).flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
.addSink(new FlinkKafkaProducer011[(String, Int)]("defaultTopic",new UserDefineKeyedSerializationSchema,props2) )fsEnv.execute("FlinkWordCountsKafkaSink")

Flink Operators(会用)

参考:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/

DataStream → DataStream

Map

Takes one element and produces one element. A map function that doubles the values of the input stream:

dataStream.map { x => x * 2 }
FlatMap

Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:

dataStream.flatMap { str => str.split("\\s+") }
Filter

Evaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values:

dataStream.filter { _ != 0 }
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.socketTextStream("HadoopNode00",9999).flatMap(line=>line.split("\\s+")).map(word=>(word,1)).filter(!_._1.equals("error"))//过滤掉含有error的tuple.print()fsEnv.execute("FlinkDatastream2Datastream")

DataStream → KeyedStream

KeyBy

Logically partitions a stream into disjoint partitions, each partition containing elements of the same key. Internally, this is implemented with hash partitioning.

dataStream.keyBy("field name") // Key by field "someKey"
dataStream.keyBy(position) // Key by the first element of a Tuple
Reduce

A “rolling” reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value.

keyedStream.reduce(_+_ )
Fold

A “rolling” fold on a keyed data stream with an initial value. Combines the current element with the last folded value and emits the new value.

A fold function that, when applied on the sequence (1,2,3,4,5), emits the sequence “start-1”, “start-1-2”, “start-1-2-3”, …

val result: DataStream[String] =
keyedStream.fold("start")((str, i) => { str + "-" + i })
Aggregations

Rolling aggregations on a keyed data stream. The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).

keyedStream.sum(0)
keyedStream.sum("key")
keyedStream.min(0)
keyedStream.min("key")
keyedStream.max(0)
keyedStream.max("key")
keyedStream.minBy(0)
keyedStream.minBy("key")
keyedStream.maxBy(0)
keyedStream.maxBy("key")
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
//001 zhangsan 研发部 25000 25
//002 lisi 研发部 30000 30
//003 wangwu 产品部 35000 35
fsEnv.socketTextStream("HadoopNode00",9999)
.map(line=>line.split("\\s+"))
.map(tokens=>(tokens(0),tokens(1),tokens(2),tokens(3).toDouble,tokens(4).toInt))
.keyBy(2)
.minBy(4)//返回含有最小值那一条记录,而min仅仅返回最小值,其它信息不变
.print()fsEnv.execute("FlinkDatastream2KeyedStream03")
case class Employee(id:String,name:String,dept:String,salary:Double,age:Int)
object FlinkDatastream2KeyedStream01 {def main(args: Array[String]): Unit = {val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment//001 zhangsan 研发部 25000 25//002 lisi 研发部 30000 30//003 wangwu 产品部 35000 35fsEnv.socketTextStream("HadoopNode00",9999).map(line=>line.split("\\s+")).map(tokens=>Employee(tokens(0),tokens(1),tokens(2),tokens(3).toDouble,tokens(4).toInt)).keyBy("dept").sum("salary").print()fsEnv.execute("FlinkDatastream2KeyedStream01")}
}

DataStream* → DataStream

Union

Union of two or more data streams creating a new stream containing all the elements from all the streams. Note: If you union a data stream with itself you will get each element twice in the resulting stream.

dataStream.union(otherStream1, otherStream2, ...)
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
//001 zhangsan 研发部 25000 25
//002 lisi 研发部 30000 30
//003 wangwu 产品部 35000 35
var dataStream1:DataStream[String]= fsEnv.socketTextStream("HadoopNode00",9999)
var dataStream2:DataStream[String]= fsEnv.socketTextStream("HadoopNode00",8888)
var dataStream3:DataStream[String]= fsEnv.socketTextStream("HadoopNode00",7777)
dataStream1.union(dataStream2,dataStream3)
.map(line=>line.split("\\s+"))
.map(tokens=>(tokens(2),tokens(3).toDouble))
.keyBy(0)
.fold(("",0.0,0))((init,value)=>( value._1,init._2+value._2,init._3+1))
.map(v=>(v._1,v._2,v._2/v._3))
.print()fsEnv.execute("FlinkManyDatastream2OneDatastream")

DataStream,DataStream → ConnectedStreams

Connect

“Connects” two data streams retaining their types, allowing for shared state between the two streams.

someStream : DataStream[Int] = ...
otherStream : DataStream[String] = ...
val connectedStreams = someStream.connect(otherStream)
CoMap, CoFlatMap

Similar to map and flatMap on a connected data stream

connectedStreams.map((_ : Int) => true,(_ : String) => false
)
connectedStreams.flatMap((_ : Int) => true,(_ : String) => false
)
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironmentval props = new Properties()
props.setProperty("bootstrap.servers", "hadoopnode00:9092")
props.setProperty("group.id", "flink")
//1 zhangsan 18000
var dataStream1:DataStream[String]= fsEnv.socketTextStream("HadoopNode00",9999)
//{"id":1,"name":"zhangsan","salary":18000}
var dataStream2:DataStream[ObjectNode]= fsEnv.addSource[ObjectNode](new FlinkKafkaConsumer011[ObjectNode]("flink_kafka",new JSONKeyValueDeserializationSchema(true),props))dataStream1.connect(dataStream2)
.map(new CoMapFunction[String,ObjectNode,User] {override def map1(value: String): User = {val tokens = value.split("\\s+")User(tokens(0).toInt,tokens(1),tokens(2).toDouble)}override def map2(value: ObjectNode): User = {val id = value.get("value").get("id").asInt()val name = value.get("value").get("name").asText()val salary = value.get("value").get("salary").asDouble()User(id,name,salary)}
})
.keyBy("id","name")
.sum("salary")
.print()

DataStream → SplitStream

Split

Split the stream into two or more streams according to some criterion.

val split = someDataStream.split((num: Int) =>(num % 2) match {case 0 => List("even")case 1 => List("odd")}
)
Select

Select one or more streams from a split stream.

val even = split.select("even")
val odd = split select "odd"
val all = split.select("even","odd")
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironmentval splitStream: SplitStream[String] = fsEnv.socketTextStream("HadoopNode00", 9999)
.split(value=>{if (value.contains("error")) {List("error")} else {List("info")}
})
splitStream.select("error").printToErr("错误")
splitStream.select("info").print("信息")
splitStream.select("error","info").print("所有信息")fsEnv.execute("FlinkConnectStream2Datastream02")

另外一种写法

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironmentval outputTag = new OutputTag[String]("error")
val dataStream: DataStream[String] = fsEnv.socketTextStream("HadoopNode00", 9999)
.process(new ProcessFunction[String, String] {override def processElement(value: String,ctx: ProcessFunction[String, String]#Context,out: Collector[String]): Unit = {if (value.contains("error")) {ctx.output(outputTag, value)} else {out.collect(value)}}
})
dataStream.print("正常信息")
dataStream.getSideOutput(outputTag).printToErr("错误")

Physical partitioning(物理分区)

Rebalancing (Round-robin partitioning) 默认策略

轮询,会将数据轮询发送给下游任务

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.socketTextStream("HadoopNode00",9999)
.rebalance
.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.printToErr("测试")fsEnv.execute("FlinkWordCounts")

Random partitioning

随机将数据发送给下游

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.socketTextStream("HadoopNode00",9999)
.shuffle
.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.printToErr("测试")fsEnv.execute("FlinkWordCounts")

Rescaling

上游分区的数据 会 轮询方式发送给下游的子分区,上下游任务并行度呈现整数倍

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.socketTextStream("HadoopNode00",9999)
.flatMap(line=>line.split("\\s+"))
.setParallelism(4)
.rescale
.map(word=>(word,1))
.setParallelism(2)
.print("测试")
.setParallelism(2)
fsEnv.execute("FlinkWordCounts")

Broadcasting

将上游数据广播给下游所有分区。

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.socketTextStream("HadoopNode00",9999)
.broadcast
.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.print("测试")
fsEnv.execute("FlinkWordCounts")

Custom partitioning

自定义分区

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.socketTextStream("HadoopNode00",9999)
.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.partitionCustom(new Partitioner[String] {override def partition(key: String, numPartitions: Int): Int = {//保证是正整数 key.hashCode&Integer.MAX_VALUE(key.hashCode&Integer.MAX_VALUE)%numPartitions}
},t=>t._1).print("测试")
fsEnv.execute("Custom Partitions")

State & Fault Tolerance 重点

参考资料:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/

Flink状态使用

Flink中常见的状态大致分为了两类:Keyed StateOperator State.

Keyed State:Keyed State is always relative to keys and can only be used in functions and operators on a KeyedStream. each Keyed state is bound to <keyed-parallel-operator-instance, key>.and since each key “belongs” to exactly one parallel instance of a keyed operator, we can think of this simply as <operator, key>.

Keyed State is further organized into so-called Key Groups. Key Groups are the atomic unit by which Flink can redistribute Keyed State; there are exactly as many Key Groups as the defined maximum parallelism. During execution each parallel instance of a keyed operator works with the keys for one or more Key Groups.

Operator State:With Operator State (or non-keyed state), each operator state is bound to one parallel operator instance.

总结:在flink中无论是Keyed State或者是Operator State都可以两种形式存在Managed StateRaw State,推荐使用Managed State,因为Flink可以优化状态存储,在故障恢复期间可以做状态重新分发。

Managed KeyedState

  • ValueState
object FlinkWordCountsValueState {def main(args: Array[String]): Unit = {val fsEnv = StreamExecutionEnvironment.getExecutionEnvironmentfsEnv.socketTextStream("HadoopNode00",9999).flatMap(line=>line.split("\\s+")).map(word=>(word,1)).keyBy(0).map(new ValueStateRichMapFunction).printToErr("测试")fsEnv.execute("FlinkWordCounts")}
}
class ValueStateRichMapFunction extends RichMapFunction[(String,Int),(String,Int)]{var historyCount:ValueState[Int]=_override def open(parameters: Configuration): Unit = {val vsd = new ValueStateDescriptor[Int]("wordcount",createTypeInformation[Int])historyCount=getRuntimeContext.getState[Int](vsd)}override def map(value: (String, Int)): (String, Int) = {var hisrtyCountNum:Int = historyCount.value()var currentCount=hisrtyCountNum+value._2historyCount.update(currentCount)(value._1,currentCount)}
}
  • ListSate
object FlinkUserPasswordListState {def main(args: Array[String]): Unit = {val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment//001 zhangsan 123456fsEnv.socketTextStream("HadoopNode00",9999).map(line=>line.split("\\s+")).map(tokens=>(tokens(0),tokens(1),tokens(2))).keyBy(0).map(new ListStateRichMapFunction).printToErr("测试")fsEnv.execute("FlinkWordCounts")}
}
class ListStateRichMapFunction extends RichMapFunction[(String,String,String),(String,String,String)]{var historyPasswords:ListState[String]=_override def open(parameters: Configuration): Unit = {val vsd = new ListStateDescriptor[String]("historyPasssword",createTypeInformation[String])historyPasswords=getRuntimeContext.getListState[String](vsd)}override def map(value: (String, String, String)): (String, String, String) = {historyPasswords.add(value._3)val list = historyPasswords.get().asScala.toList(value._1,value._2,list.mkString(","))}
}
  • Map state
object FlinkUserMapState {def main(args: Array[String]): Unit = {val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment//001 zhangsan 水果类 4.5fsEnv.socketTextStream("HadoopNode00",9999).map(line=>line.split("\\s+")).map(tokens=>(tokens(0),tokens(1),tokens(2),tokens(3).toDouble)).keyBy(0).map(new MapStateRichMapFunction).printToErr("测试")fsEnv.execute("FlinkWordCounts")}
}
class MapStateRichMapFunction extends RichMapFunction[(String,String,String,Double),(String,String,String)]{var historyMapState:MapState[String,Double]=_override def open(parameters: Configuration): Unit = {val vsd = new MapStateDescriptor[String,Double]("mapstate",createTypeInformation[String],createTypeInformation[Double])historyMapState=getRuntimeContext.getMapState[String,Double](vsd)}override def map(value: (String, String, String,Double)): (String, String, String) = {val cost = historyMapState.get(value._3)var currentCost=cost+value._4historyMapState.put(value._3,currentCost)(value._1,value._2,historyMapState.iterator().asScala.map(t=>t.getKey+":"+t.getValue).mkString(","))}
}
  • ReducingState
object FlinkWordCountReducingState {def main(args: Array[String]): Unit = {val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment//001 zhangsan 水果类 4.5fsEnv.socketTextStream("HadoopNode00",9999).flatMap(line=>line.split("\\s+")).map((_,1)).keyBy(0).map(new ReducingStateRichMapFunction).printToErr("测试")fsEnv.execute("FlinkWordCountReducingState")}
}
class ReducingStateRichMapFunction extends RichMapFunction[(String,Int),(String,Int)]{var historyReducingState:ReducingState[Int]=_override def open(parameters: Configuration): Unit = {val vsd = new ReducingStateDescriptor[Int]("reducecount",new ReduceFunction[Int] {override def reduce(v1: Int, v2: Int): Int = {v1+v2}},createTypeInformation[Int])historyReducingState=getRuntimeContext.getReducingState[Int](vsd)}override def map(value: (String, Int)): (String, Int) = {historyReducingState.add(value._2)(value._1,historyReducingState.get())}
}
  • AggregatingState
object FlinkUserAggregatingState {def main(args: Array[String]): Unit = {val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment//001 zhangsan 10000fsEnv.socketTextStream("HadoopNode00",9999).map(line=>line.split("\\s+")).map(tokens=>(tokens(0),tokens(1),tokens(2).toDouble)).keyBy(0).map(new AggregatingStateRichMapFunction).printToErr("测试")fsEnv.execute("FlinkWordCounts")}
}
class AggregatingStateRichMapFunction extends RichMapFunction[(String,String,Double),(String,String,Double)]{var avgSalaryState:AggregatingState[Double,Double]=_override def open(parameters: Configuration): Unit = {val vsd = new AggregatingStateDescriptor[Double,(Double,Int),Double]("avgstate",new AggregateFunction[Double,(Double,Int),Double] {override def createAccumulator(): (Double, Int) = (0.0,0)override def add(value: Double, accumulator: (Double, Int)): (Double, Int) = (accumulator._1+value,accumulator._2+1)override def getResult(accumulator: (Double, Int)): Double = accumulator._1/accumulator._2override def merge(a: (Double, Int), b: (Double, Int)): (Double, Int) = (a._1+b._1,a._2+b._2)},createTypeInformation[((Double,Int))])avgSalaryState=getRuntimeContext.getAggregatingState[Double,(Double,Int),Double](vsd)}override def map(value: (String, String, Double)): (String, String, Double) = {avgSalaryState.add(value._3)(value._1,value._2,avgSalaryState.get())}
}

Managed Operator State √

To use managed operator state, a stateful function can implement either the more general CheckpointedFunction interface, or the ListCheckpointed<T extends Serializable> interface.Currently, list-style managed operator state is supported.

  • CheckpointedFunction

public interface CheckpointedFunction {//系统在checkpoint的时候需要用户调用Context执行状态快照void snapshotState(FunctionSnapshotContext context) throws Exception;// 初始化状态|故障恢复  void initializeState(FunctionInitializationContext context) throws Exception;
}

故障状态恢复方式

class UserDefineBufferSink(threshold:Int)  extends SinkFunction[String] with CheckpointedFunction{@transientprivate var checkpointedState: ListState[String] = _private val bufferedElements = ListBuffer[String]()override def invoke(value: String): Unit = {bufferedElements += valueif(bufferedElements.size >= threshold){for(e <- bufferedElements){println("Element:"+e)}bufferedElements.clear()}}override def snapshotState(context: FunctionSnapshotContext): Unit = {checkpointedState.clear()for(e<-bufferedElements){checkpointedState.add(e)}}override def initializeState(context: FunctionInitializationContext): Unit = {println("initializeState...")val lsd = new ListStateDescriptor[String]("list",createTypeInformation[String])checkpointedState = context.getOperatorStateStore.getUnionListState(lsd) //Even Splitif(context.isRestored){var list=checkpointedState.get().asScalaprintln("State Restore :"+list.mkString(","))for(e<-list){bufferedElements += e}}}}
object FlinkWordPrint {def main(args: Array[String]): Unit = {val fsEnv = StreamExecutionEnvironment.getExecutionEnvironmentfsEnv.socketTextStream("HadoopNode00",9999).flatMap(line=>line.split("\\s+")).addSink(new UserDefineBufferSink(5)).uid("wordsink")//对有状态的算子打标记,名字必须唯一 推荐添加fsEnv.execute("FlinkWordCounts")}
}
  • ListCheckpointed

public interface ListCheckpointed<T extends Serializable> {//系统在checkpoint的时候,用户只需要返回需要做快照的List数据集即可,由系统自动帮助用户做快照List<T> snapshotState(long checkpointId, long timestamp) throws Exception;// 故障恢复void restoreState(List<T> state) throws Exception;
}

以上两个标记接口作用适用于Operate State存储和恢复,不同的是CheckpointedFunction在做状态恢复的时候,状态恢复有两种方式:Event-split或者Union但是ListCheckpointed仅仅支持Event-split

class UserDefineCounterSource extends RichParallelSourceFunction[Long] with ListCheckpointed[JLong]{@volatileprivate var isRunning = trueprivate var offset = 0Loverride def snapshotState(checkpointId: Long, timestamp: Long): util.List[JLong] = {var v:JLong=offsetList(v).asJava}override def restoreState(state: util.List[JLong]): Unit = {for(v<-state.asScala){println("restoreState:"+v)offset=v}}override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {val lock = ctx.getCheckpointLockwhile (isRunning) {Thread.sleep(1000)lock.synchronized({ctx.collect(offset)offset += 1})}}override def cancel(): Unit = isRunning=false
}
object FlinkCountPrint {def main(args: Array[String]): Unit = {val fsEnv = StreamExecutionEnvironment.getExecutionEnvironmentfsEnv.addSource[Long](new UserDefineCounterSource).uid("countsource").print()fsEnv.execute("FlinkCountPrint")}
}

State Time-To-Live (TTL)

基本使用(√)

所有keyed state的状态都可以指定一个TTL配置,一旦配置TTL,且状态值已经过期了,Flink将会尽最大的努力删除过期的数据。所有的集合类型的状态数据,每个元素或者Entry都有独立的过期时间。

A time-to-live (TTL) can be assigned to the keyed state of any type. If a TTL is configured and a state value has expired, the stored value will be cleaned up on a best effort basis which is discussed in more detail below.

All state collection types support per-entry TTLs. This means that list elements and map entries expire independently.

使用TTL特性,只需要用户创建一个StateTtlConfig 对象,然后调用XxxStateDescriptor的enableTimeToLive方法即可:

import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.common.time.Timeval ttlConfig = StateTtlConfig.newBuilder(Time.seconds(10))//必须指定,表示过期时间10s//可选,OnCreateAndWrite(默认),OnReadAndWrite.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//可选,NeverReturnExpired(默认)|ReturnExpiredIfNotCleanedUp.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).buildval stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])
stateDescriptor.enableTimeToLive(ttlConfig)

案例

object FlinkWordCountsValueStateWithTTL {def main(args: Array[String]): Unit = {val fsEnv = StreamExecutionEnvironment.getExecutionEnvironmentfsEnv.socketTextStream("HadoopNode00",9999).flatMap(line=>line.split("\\s+")).map(word=>(word,1)).keyBy(0).map(new ValueStateRichMapFunction).printToErr("测试")fsEnv.execute("FlinkWordCountsValueStateWithTTL")}
}
class ValueStateRichMapFunction extends RichMapFunction[(String,Int),(String,Int)]{var historyCount:ValueState[Int]=_override def open(parameters: Configuration): Unit = {val vsd = new ValueStateDescriptor[Int]("wordcount",createTypeInformation[Int])val ttlConfig = StateTtlConfig.newBuilder(Time.seconds(10)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).buildvsd.enableTimeToLive(ttlConfig)historyCount=getRuntimeContext.getState[Int](vsd)}override def map(value: (String, Int)): (String, Int) = {var hisrtyCountNum:Int = historyCount.value()var currentCount=hisrtyCountNum+value._2historyCount.update(currentCount)(value._1,currentCount)}
}

注意

1,开启TTL增加state存储,因为每个状态都要额外存储8bytes的long类型的时间戳

2,如果用户一开始没有开启TTL,在故障恢复时开启TTL,会导致恢复失败。

3,TTL是时间是处理节点时间

Cleanup of Expired State

Flink默认情况下并不会主动的删除过期的state,只用使用到该state的时候flink才会对State实行过期检查,将过期数据清除。这可能导致一些不经常使用的数据可能已经过期很长时间,但是因为没有使用的机会导致长时间驻留在Flink的内存中,带来内存浪费。

  • Cleanup in full snapshot

仅仅是在服务重启的时候,回去加载状态快照信息,在加载的时候检查过期的数据,并且删除数据,但是在程序的运行期间并不会主动的删除过期数据,一般运维人员只能通过定期创建savepoint或者checkpoint然后执行故障恢复才可以释放内存。

val ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).cleanupFullSnapshot.build

注意:如果用户使用的是RocksDB的增量式的检查点机制,该种机制就不起作用。

This option is not applicable for the incremental checkpointing in the RocksDB state backend.

  • Cleanup inbackground

用户除了可以开启cleanupFullSnapshot在系统快照的时候清除过期的数据,同时还可以开启cleanupInBackground策略,该策略会根据用户使用state backend存储策略自动选择一种后台清理模式。

import org.apache.flink.api.common.state.StateTtlConfig
val ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).cleanupInBackground//5 false | 1000.build

目前Flink的state backend实现统共分为两大类:heap(堆) state backend和 RocksDB backend,其中基于heap(堆) state backend使用的是incremental cleanup而 RocksDB backend使用的是compaction filter清理策略。

  • Incremental cleanup
import org.apache.flink.api.common.state.StateTtlConfig
val ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1))//一次性检查5条state,false表示 只用在state访问的时候才会触发检查//如果设置为true表示只要有数据过来就执行一次检查.cleanupIncrementally(5, false).build
  • Cleanup during RocksDB compaction
import org.apache.flink.api.common.state.StateTtlConfigval ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1))//当系统compact处理1000state合并的时候,系统会执行一次查询,过期的数据清理掉.cleanupInRocksdbCompactFilter(1000).build

注意用户需要额外开启CompactFilter特性,有两种途径:

1,配置flink-conf.yaml添加如下配置

state.backend.rocksdb.ttl.compaction.filter.enabled: true

2,或者通过API设置开启

RocksDBStateBackend::enableTtlCompactionFilter

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val rocksDBStateBackend = new RocksDBStateBackend("hdfs:///rockdbs-statebackend")
rocksDBStateBackend.enableTtlCompactionFilter()
fsEnv.setStateBackend(rocksDBStateBackend)

Broadcast State

DataStream -> BoradcastStream

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
//001 zhangsan 母婴 200
var userOrderItem = fsEnv.socketTextStream("HadoopNode00",9999)
.map(line=>line.split("\\s+"))
.map(tokens=>(tokens(0),tokens(1),tokens(2),tokens(3).toDouble))
//母婴 150  参与抽奖
var configStream = fsEnv.socketTextStream("HadoopNode00",8888)
.map(line=>line.split("\\s+"))
.map(tokens=>(tokens(0),tokens(1).toDouble))var ruleMapSateDescriptor=new MapStateDescriptor[String,Double]("bmsd",createTypeInformation[String],createTypeInformation[Double])
//实现流的状态广播
userOrderItem.connect(configStream.broadcast(ruleMapSateDescriptor))
.process(new UserDefineBroadcastProcessFunction(ruleMapSateDescriptor) )
.print()fsEnv.execute("FlinkBroadCastState")

class UserDefineBroadcastProcessFunction(msd:MapStateDescriptor[String,Double]) extends BroadcastProcessFunction[(String,String,String,Double),(String,Double),(String,String,Double)]{//读取状态-只读 并且产生输出结果override def processElement(value: (String, String, String, Double),ctx: BroadcastProcessFunction[(String, String, String, Double), (String, Double), (String, String, Double)]#ReadOnlyContext,out: Collector[(String, String, Double)]): Unit = {val readOnlyState = ctx.getBroadcastState(msd)if(readOnlyState.contains(value._3)){val threshold = readOnlyState.get(value._3)if(value._4>=threshold){val random = new Random().nextInt(10)out.collect((value._1,value._3,random*1.0))//将数据输出到下游}}}//更新状态 读写override def processBroadcastElement(value: (String, Double),ctx: BroadcastProcessFunction[(String, String, String, Double), (String, Double), (String, String, Double)]#Context,out: Collector[(String, String, Double)]): Unit = {val stateTobeBroadcast = ctx.getBroadcastState(msd)stateTobeBroadcast.put(value._1,value._2)}
}

KeyedDataStream -> BoradcastStream

def main(args: Array[String]): Unit = {val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment//001 zhangsanvar userLoginStream = fsEnv.socketTextStream("HadoopNode00",9999).map(line=>line.split("\\s+")).map(tokens=>(tokens(0),tokens(1))).keyBy(t=>t._1)//001 zhangsan 100.0var ordeerStream = fsEnv.socketTextStream("HadoopNode00",8888).map(line=>line.split("\\s+")).map(tokens=>(tokens(0),tokens(1),tokens(2).toDouble)).keyBy(0).sum(2)var ruleMapSateDescriptor=new MapStateDescriptor[String,Double]("bmsd",createTypeInformation[String],createTypeInformation[Double])//实现流的状态广播userLoginStream.connect(ordeerStream.broadcast(ruleMapSateDescriptor)).process(new UserDefineKeyedBroadcastProcessFunction(ruleMapSateDescriptor) ).print()fsEnv.execute("FlinkBroadCastState")
}
class UserDefineKeyedBroadcastProcessFunction(msd:MapStateDescriptor[String,Double]) extends KeyedBroadcastProcessFunction[String,(String,String),(String,String,Double),(String,String,String)]{override def processElement(value: (String, String),ctx: KeyedBroadcastProcessFunction[String, (String, String), (String, String, Double), (String, String, String)]#ReadOnlyContext,out: Collector[(String, String, String)]): Unit = {val userID = ctx.getCurrentKeyval readOnlyState = ctx.getBroadcastState(msd)val historyCost = readOnlyState.get(userID)if(historyCost > 10000){out.collect((value._1,value._2,"金牌"))}else if(historyCost > 1000){out.collect((value._1,value._2,"银牌"))}else if(historyCost > 500){out.collect((value._1,value._2,"铜牌"))}else{out.collect((value._1,value._2,"铁牌"))}}override def processBroadcastElement(value: (String, String, Double),ctx: KeyedBroadcastProcessFunction[String, (String, String), (String, String, Double), (String, String, String)]#Context,out: Collector[(String, String, String)]): Unit = {val state = ctx.getBroadcastState(msd)state.put(value._1,value._3)}
}

Checkpoint&SavePoint&State Backend

概念

检查点是一种机制,由JobManger定期发出checkpoint指令-barrier/栅栏,下游任务收到barrier信号时都尝试持久化自己的状态到state backend中,当整个job中所有Task都完成持久化,JobManager会将此次的checkpoint标记为成功,并且删除上一次checkpoint,默认情况Flink不会给程序开启Checkpoint,需要用户手动开启。

SavePoint是一种人工触发一种检查点机制,由运维人员在关闭任务前指定savepoint目录。本质依然是有系统创建一个检查点,不同是有savepoint创建的检查点永远不会被删除。

无论是checkpoint还是save point最终都是要把状态数据存储到state backend中,目前Flink提供三种state backend策略。

  • MemoryStateBackend(测试):系统在做检查点的时候,会将所有状态信息进行快照-持久化,同时会将该状态信息发送给JobManager的节点,并存储在JobManager节点(单机)的内存中。(默认配置)

    1.默认单个state大小不能超过5MB

    2.所有聚合状态大小必须适配JobManager的内存

    fsEnv.setStateBackend(new MemoryStateBackend(1024*1024*5,true) )//5MB 异步快照
    
  • FsStateBackend:将所有使用状态数据存在TaskManager的内存中(多机环境),系统在做checkpoint的时候,系统会将状态快照数据存储在配置的文件系统路径下。

    1.当计算集群有大规模状态(相比较单机)存储的时候

    2.所有的生产环境推荐使用

    fsEnv.setStateBackend(new FsStateBackend("hdfs:///xxxx路径",true) )//异步快照
    
  • RocksDBStateBackend:每一个TaskManager持有一个本地的RocksDB数据库(内存+磁盘),用于存储状态数据。系统在做checkpoint的时候,系统会将RocksDB数据存储在配置的文件系统路径下。

    1.当计算超级大规模状态(相比基于内存的集群)存储的时候

    2.所有的生产环境推荐使用

    3.受限于RocksDB数据库本身,最大允许的Key或者Value不能超过2^31Bytes大小-(4GB左右)

    fsEnv.setStateBackend(new RocksDBStateBackend("hdfs:///xxxx路径",true) )//增量checkpoint
    

flink-conf.yaml- 全局配置

#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================state.backend: rocksdbstate.checkpoints.dir: hdfs:///flink-checkpointsstate.savepoints.dir: hdfs:///flink-savepointsstate.backend.incremental: truestate.backend.rocksdb.ttl.compaction.filter.enabled: true

案例

  object FlinkWordCountsCheckpoint {def main(args: Array[String]): Unit = {val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment//设置检查点CheckpointInterval参数为5sfsEnv.enableCheckpointing(5000,CheckpointingMode.EXACTLY_ONCE)//检查点必须在2s以内完成,如果失败放弃本次checkpointfsEnv.getCheckpointConfig.setCheckpointTimeout(4000)//检查点间时间间隔必须大于2s 优先级高于 CheckpointIntervalfsEnv.getCheckpointConfig.setMinPauseBetweenCheckpoints(2000)//如果检查点失败,终止计算任务fsEnv.getCheckpointConfig.setFailOnCheckpointingErrors(true)//在用户取消任务的时候,是否删除检查点数据,推荐配置为RETAIN_ON_CANCELLATION,仅仅当用户在取消的时候没有指定savepoint会保留检查点fsEnv.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)fsEnv.socketTextStream("HadoopNode00",9999).flatMap(line=>line.split("\\s+")).map(word=>(word,1)).keyBy(0).map(new WordCountMapFunction()).uid("wordcount").print("测试")fsEnv.execute("FlinkWordCountsCheckpoint")}}class WordCountMapFunction extends RichMapFunction[(String,Int),(String,Int)]{var wcs:ValueState[Int]=_override def open(parameters: Configuration): Unit = {val wcvsd = new ValueStateDescriptor[Int]("wordcount",createTypeInformation[Int])wcs=getRuntimeContext.getState(wcvsd)}override def map(value: (String, Int)): (String, Int) = {var count=wcs.value()wcs.update(count+value._2)(value._1,wcs.value())}}

Task Failure Recovery

RestartStrategy

重启策略描述的是程序在故障的时候何时进行重启策略。目前Flink给用户提供了以下几种策略:

  • noRestart - 失败就会终止服务

  • fixedDelayRestart - 固定重启次数,用户可以设定时间间隔

    //每间隔5秒中重启1次,总共尝试5次
    fsEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(5,Time.seconds(5)))
    
  • failureRateRestart - 在规定时间间隔内,出错次数达到固定值,认定任务失败

    //1分钟内总共失败5次,每次尝试间隔5秒
    fsEnv.setRestartStrategy(RestartStrategies.failureRateRestart(5,Time.minutes(1),Time.seconds(5)))
    
  • fallBackRestart - 如果集群配置重启策略则使用集群配置策略,如果没有配置默认策略,系统会使用fixedDelayRestart

    fsEnv.setRestartStrategy(RestartStrategies.fallBackRestart())
    

Failover Strategies

该配置配置系统已何种方式做故障重启,目前Flink支持两种重启策略region-局部| full-全部重启,需要用户配置flink-conf。yaml

jobmanager.execution.failover-strategy: region

窗口计算- 流计算重点

参考:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html

概述

窗口计算是流计算的核心,通过窗口将一个无线的数据流在时间轴上切分成有限大小的数据集-bucket,然后在对切分后的数据做计算。Flink根据流的特点将窗口计算分为两大类。

  • Keyed Windows

    stream.keyBy(...)               <-  对数据进行分组.window(...)              <-  必须指定: "assigner",如果将数据划分到窗口中[.trigger(...)]            <-  可选: "trigger" 每个窗口都有默认触发器,规定窗口什么时候触发[.evictor(...)]            <-  可选: "evictor",剔除器负责将窗口中数据在聚合之前或者之后剔除[.allowedLateness(...)]    <-  可选: "lateness" 默认不允许迟到,设置窗口数据迟到时间-EventTime[.sideOutputLateData(...)] <-  可选: "output tag" 可以通过边输出,将太迟的数据通过SideOut输出到特定流中.reduce/aggregate/fold/apply() <-  必须: "function" 负责窗口聚合计算[.getSideOutput(...)]      <-  可选: "output tag",获取太迟的数据
    
  • Non-Keyed Windows

    stream.windowAll(...)           <-  必须指定: "assigner",如果将数据划分到窗口中[.trigger(...)]            <-  可选: "trigger" 每个窗口都有默认触发器,规定窗口什么时候触发[.evictor(...)]            <-  可选: "evictor",剔除器负责将窗口中数据在聚合之前或者之后剔除[.allowedLateness(...)]    <-  可选: "lateness" 默认不允许迟到,设置窗口数据迟到时间-EventTime[.sideOutputLateData(...)] <-  可选: "output tag" 可以通过边输出,将太迟的数据通过SideOut输出到特定流中.reduce/aggregate/fold/apply() <-  必须: "function" 负责窗口聚合计算[.getSideOutput(...)]      <-  可选: "output tag",获取太迟的数据
    

Window Lifecycle

简而言之,一旦应属于该窗口的第一个元素到达,就会“创建”窗口,并且当时间(事件或处理时间)超过其结束时间戳时,会“完全删除”该窗口。用户指定的“允许的延迟”(请参阅允许的延迟)。 Flink保证只删除基于时间的窗口,而不能删除其他类型的窗口,例如*全局窗口

此外,每个窗口都会有一个“触发器”(请参阅Triggers和一个函数(“ ProcessWindowFunction”,“ ReduceFunction”,“ AggregateFunction”或“ FoldFunction”)(请参见[Window Functions](https://ci.apache.org/projects/flink/flink-docs-release-1.9 /dev/stream/operators/windows.html#window-functions))附加到它。该函数将包含要应用于窗口内容的计算,而“ Trigger”则指定条件,在该条件下,该窗口被视为可以应用该函数的条件。触发策略可能类似于“当窗口中的元素数大于4时”或“当水印通过窗口末尾时”。触发器还可以决定在创建和删除窗口之间的任何时间清除窗口的内容。在这种情况下,清除仅是指窗口中的元素,而不是指窗口元数据。这意味着仍可以将新数据添加到该窗口。

除上述内容外,您还可以指定一个“ Evictor”(请参阅Evictors),将能够在触发触发器之后以及应用此功能之前和/或之后从窗口中删除元素。

In a nutshell, a window is created as soon as the first element that should belong to this window arrives, and the window is completely removed when the time (event or processing time) passes its end timestamp plus the user-specified allowed lateness (see Allowed Lateness). Flink guarantees removal only for time-based windows and not for other types, e.g. global windows

In addition, each window will have a Trigger (see Triggers) and a function (ProcessWindowFunction, ReduceFunction, AggregateFunction or FoldFunction) (see Window Functions) attached to it. The function will contain the computation to be applied to the contents of the window, while the Trigger specifies the conditions under which the window is considered ready for the function to be applied. A triggering policy might be something like “when the number of elements in the window is more than 4”, or “when the watermark passes the end of the window”. A trigger can also decide to purge a window’s contents any time between its creation and removal. Purging in this case only refers to the elements in the window, and not the window metadata. This means that new data can still be added to that window.

Apart from the above, you can specify an Evictor (see Evictors) which will be able to remove elements from the window after the trigger fires and before and/or after the function is applied.

Window Assigners

Tumbling Windows(时间)

窗口长度和步长一样,不存在窗口的交叠

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironmentfsEnv.socketTextStream("HadoopNode00",9999)
.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.reduce((v1,v2)=>(v1._1,v1._2+v2._2))
.print()fsEnv.execute("FlinkWordCountsTumblingWindow_ReduceFunction")

Sliding Windows(时间)

窗口长度一般大于或等于步长,否则会产生丢数据,存在窗口的交叠

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironmentfsEnv.socketTextStream("HadoopNode00",9999)
.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.window(SlidingProcessingTimeWindows.of(Time.seconds(4),Time.seconds(2)))
.aggregate(new AggregateFunction[(String,Int),(String,Int),(String,Int)]{override def createAccumulator(): (String, Int) = ("",0)override def add(value: (String, Int), accumulator: (String, Int)): (String, Int) = {(value._1,value._2+accumulator._2)}override def getResult(accumulator: (String, Int)): (String, Int) = {accumulator}override def merge(a: (String, Int), b: (String, Int)): (String, Int) = {(a._1,a._2+b._2)}
})
.print()fsEnv.execute("FlinkWordCountsTumblingWindow_ReduceFunction")

Session Windows(时间)

每一个元素都会产一个窗口,如果窗口与窗口间的间隔小于指定Window Gap,则系统会合并当前窗口。相比较于前两种窗口,会话窗口又称为可合并窗口,长度不固定。

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironmentfsEnv.socketTextStream("HadoopNode00",9999)
.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(t=>t._1)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
.apply(new WindowFunction[(String,Int),String,String,TimeWindow]{override def apply(key: String, window: TimeWindow, input: Iterable[(String, Int)],out: Collector[String]): Unit = {val start = window.getStartval end = window.getEndval sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")out.collect(sdf.format(start)+" ~ "+sdf.format(end)+"\t"+input.mkString(","))}
})
.print()fsEnv.execute("FlinkWordCountsTumblingWindow_ReduceFunction")

Global Windows(非时间)

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironmentfsEnv.socketTextStream("HadoopNode00",9999)
.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(t=>t._1)
.window(GlobalWindows.create())
.trigger(CountTrigger.of(2))
.fold(("",0))((z,v)=>(v._1,z._2+v._2))
.print()fsEnv.execute("FlinkWordCountsGlobalWindow_FoldFunction")

Window Functions

After defining the window assigner, we need to specify the computation that we want to perform on each of these windows. This is the responsibility of the window function, which is used to process the elements of each (possibly keyed) window once the system determines that a window is ready for processing (see triggers for how Flink determines when a window is ready).

ReduceFunction

val input: DataStream[(String, Long)] = ...input.keyBy(<key selector>).window(<window assigner>).reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }

AggregateFunction

class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {override def createAccumulator() = (0L, 0L)override def add(value: (String, Long), accumulator: (Long, Long)) =(accumulator._1 + value._2, accumulator._2 + 1L)override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2override def merge(a: (Long, Long), b: (Long, Long)) =(a._1 + b._1, a._2 + b._2)
}val input: DataStream[(String, Long)] = ...input.keyBy(<key selector>).window(<window assigner>).aggregate(new AverageAggregate)

FoldFunction

val input: DataStream[(String, Long)] = ...input.keyBy(<key selector>).window(<window assigner>).fold("") { (acc, v) => acc + v._2 }

不可以用在会话窗口中。

WindowFunction (Legacy)

In some places where a ProcessWindowFunction can be used you can also use a WindowFunction. This is an older version of ProcessWindowFunction that provides less contextual information and does not have some advances features, such as per-window keyed state. This interface will be deprecated at some point.

trait WindowFunction[IN, OUT, KEY, W <: Window] extends Function with Serializable {def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT])
}
val input: DataStream[(String, Long)] = ...input.keyBy(<key selector>).window(<window assigner>).apply(new MyWindowFunction())

ProcessWindowFunction

是WindowFunction的替代方案,因为可以操作WindowState个GlobalState。但是虽然 可以操作原始状态,但是计算效率相比较ReduceFunction或者AggregateFunction而言较低。

object FlinkWordCountsTumblingWindow_ProcessWindowFunction {def main(args: Array[String]): Unit = {val fsEnv = StreamExecutionEnvironment.getExecutionEnvironmentfsEnv.socketTextStream("HadoopNode00",9999).flatMap(line=>line.split("\\s+")).map(word=>(word,1)).keyBy(t=>t._1).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).process(new UserDefineProcessWindowFunction).print()fsEnv.execute("FlinkWordCountsTumblingWindow_ReduceFunction")}
}
class UserDefineProcessWindowFunction extends ProcessWindowFunction[(String,Int),(String,Int,Int),String,TimeWindow]{var windowStateDescriptor:ValueStateDescriptor[Int]=_var gloablStateDescriptor:ValueStateDescriptor[Int]=_override def open(parameters: Configuration): Unit = {windowStateDescriptor=new ValueStateDescriptor[Int]("wc",createTypeInformation[Int])gloablStateDescriptor=new ValueStateDescriptor[Int]("gc",createTypeInformation[Int])}override def process(key: String, context: Context,elements: Iterable[(String, Int)],out: Collector[(String, Int,Int)]): Unit = {val wstate = context.windowState.getState(windowStateDescriptor)val gstate = context.globalState.getState(gloablStateDescriptor)var windowCount=elements.map(_._2).sumwstate.update(wstate.value()+windowCount)gstate.update(gstate.value()+windowCount)out.collect((key,wstate.value(),gstate.value()))}

为了解决着这个问题,ProcessWindowFunction可以和Reduce或者AggregaFunction连用。

object FlinkWordCountsTumblingWindow_ProcessWindowFunctionAndReduceFunction {def main(args: Array[String]): Unit = {val fsEnv = StreamExecutionEnvironment.getExecutionEnvironmentfsEnv.socketTextStream("HadoopNode00",9999).flatMap(line=>line.split("\\s+")).map(word=>(word,1)).keyBy(t=>t._1).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).reduce((v1:(String,Int),v2:(String,Int))=>(v1._1,v2._2+v1._2),new UserDefineProcessWindowFunction1).print()fsEnv.execute("FlinkWordCountsTumblingWindow_ReduceFunction")}
}
class UserDefineProcessWindowFunction1 extends ProcessWindowFunction[(String,Int),(String,Int,Int),String,TimeWindow]{var windowStateDescriptor:ValueStateDescriptor[Int]=_var gloablStateDescriptor:ValueStateDescriptor[Int]=_override def open(parameters: Configuration): Unit = {windowStateDescriptor=new ValueStateDescriptor[Int]("wc",createTypeInformation[Int])gloablStateDescriptor=new ValueStateDescriptor[Int]("gc",createTypeInformation[Int])}override def process(key: String, context: Context,elements: Iterable[(String, Int)],out: Collector[(String, Int,Int)]): Unit = {val wstate = context.windowState.getState(windowStateDescriptor)val gstate = context.globalState.getState(gloablStateDescriptor)val list = elements.toListprintln("list:"+list)var windowCount=list.map(_._2).sumwstate.update(wstate.value()+windowCount)gstate.update(gstate.value()+windowCount)out.collect((key,wstate.value(),gstate.value()))}
}

Triggers

A Trigger determines when a window (as formed by the window assigner) is ready to be processed by the window function. Flink的Trigger接口提供了5个重要方法用于处理响应事件。

  • TriggerResult onElement - 只要有元素落入到窗口中,就会回调。
  • TriggerResult onProcessingTime - 当用户注册的ProcessingTime定时器时间到达,系统回调。
  • TriggerResult onEventTime - 当用户注册的EventTime定时器时间到达,系统回调
  • onMerge - 当用户使用SessionWindow的时候,系统在做窗口合并的时候会回调该方法。
  • clear() - 当窗口被删除的时候系统会回调

用户在覆盖Trigger时候需要关注onElement ,onProcessingTime ,onEventTime 这些方法的返回值是TriggerResult,该值决定窗口是否就绪。TriggerResult可选值有以下:

  • CONTINUE: 表示窗口没有就绪,继续保持。
  • FIRE: 表示窗口就绪,可以实行WindowFunction计算
  • PURGE: 丢弃窗口,并且丢弃窗口数据。
  • FIRE_AND_PURGE: 触发窗口计算,窗口计算完成后删除窗口中的元素。

Each WindowAssigner comes with a default Trigger.

窗口类型 默认触发器 说明
ProcessTimeWindow(滚动/滑动/会话) ProcessingTimeTrigger 当系统时间达到了Window EndTime,系统触发三窗口
EventTimeWindow(滚动/滑动/会话) EventTimeTrigger 当Watermarker时间大于或等于Window EndTime,系统触发三窗口
GlobalWindow NeverTrigger

自定义触发器

public class UserDefineCountTrigger<W extends Window> extends Trigger<Object, W> {private static final long serialVersionUID = 1L;private final long maxCount;private final ReducingStateDescriptor<Long> stateDesc =new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);private CountTrigger(long maxCount) {this.maxCount = maxCount;}@Overridepublic TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {ReducingState<Long> count = ctx.getPartitionedState(stateDesc);count.add(1L);if (count.get() >= maxCount) {count.clear();return TriggerResult.FIRE;}return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onEventTime(long time, W window, TriggerContext ctx) {return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {return TriggerResult.CONTINUE;}@Overridepublic void clear(W window, TriggerContext ctx) throws Exception {ctx.getPartitionedState(stateDesc).clear();}@Overridepublic boolean canMerge() {return true;}@Overridepublic void onMerge(W window, OnMergeContext ctx) throws Exception {ctx.mergePartitionedState(stateDesc);}@Overridepublic String toString() {return "CountTrigger(" +  maxCount + ")";}/*** Creates a trigger that fires once the number of elements in a pane reaches the given count.** @param maxCount The count of elements at which to fire.* @param <W> The type of {@link Window Windows} on which this trigger can operate.*/public static <W extends Window> CountTrigger<W> of(long maxCount) {return new CountTrigger<>(maxCount);}private static class Sum implements ReduceFunction<Long> {private static final long serialVersionUID = 1L;@Overridepublic Long reduce(Long value1, Long value2) throws Exception {return value1 + value2;}}
}

Evictors

Flink’s windowing model allows specifying an optional Evictor in addition to the WindowAssigner and the Trigger. This can be done using the evictor(...) method (shown in the beginning of this document). The evictor has the ability to remove elements from a window after the trigger fires and before and/or after the window function is applied. To do so, the Evictor interface has two methods:

public class UserDefineErrorEvictor implements Evictor<String, GlobalWindow> {private Boolean isBeforeEvictor;public UserDefineErrorEvictor(Boolean isBeforeEvictor) {this.isBeforeEvictor = isBeforeEvictor;}@Overridepublic void evictBefore(Iterable<TimestampedValue<String>> elements, int size, GlobalWindow window, EvictorContext evictorContext) {if (isBeforeEvictor) {evict(elements,size,window,evictorContext);}}@Overridepublic void evictAfter(Iterable<TimestampedValue<String>> elements, int size, GlobalWindow window, EvictorContext evictorContext) {if (!isBeforeEvictor) {evict(elements,size,window,evictorContext);}}private void evict(Iterable<TimestampedValue<String>> elements, int size,GlobalWindow window, EvictorContext evictorContext) {for(Iterator<TimestampedValue<String>> iterator = elements.iterator();iterator.hasNext();){TimestampedValue<String> value = iterator.next();String strValue = value.getValue();if(strValue.contains("error")){iterator.remove();}}}
}
public class UserDefineCountTrigger extends Trigger<String, GlobalWindow> {private Long maxCount;private ReducingStateDescriptor<Long> reducingStateDescriptor;public UserDefineCountTrigger(Long maxCount) {this.maxCount = maxCount;reducingStateDescriptor=new ReducingStateDescriptor<Long>("count", new ReduceFunction<Long>() {@Overridepublic Long reduce(Long v1, Long v2) throws Exception {return v1+v2;}}, LongSerializer.INSTANCE);}@Overridepublic TriggerResult onElement(String element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {ReducingState<Long> countState = ctx.getPartitionedState(reducingStateDescriptor);countState.add(1L);if(countState.get()>= maxCount){countState.clear();return TriggerResult.FIRE;}return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {return TriggerResult.CONTINUE;}@Overridepublic void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}
}
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironmentfsEnv.socketTextStream("HadoopNode00",9999)
.windowAll(GlobalWindows.create())
.trigger(new UserDefineCountTrigger(5L))
.evictor(new UserDefineErrorEvictor(false))
.apply(new AllWindowFunction[String,String,GlobalWindow] {override def apply(window: GlobalWindow, input: Iterable[String], out: Collector[String]): Unit = {out.collect(input.mkString(" | "))}
})
.print()fsEnv.execute("FlinkErrorEvictor")

EventTime-重点

Flink支持三种语义时间窗口计算:Processing time/Event time/Ingestion time

设置时间特性

用户可以通过fsEnv#setStreamTimeCharacteristic改变流计算的时间特性,默认情况下如果用户不设定,系统默认使用时Processing time,针对于Flink而言,目前支持三种时间特性,其中ProcessingTime和IngestionTime无需做特殊设置,改时间由Flink系统产生。

fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

如果用户设定了EventTime,需要用实现watermarker计算机制。在Flink中watermarker计算方式:

watermarker=(max eventtime seen by operator node  -  max orderness time)

由于所有EventTime窗口默认触发器使用的是EventTimeTrigger,该Trigger默认行为时当watermarker时间越过window endtime,系统就会触发window function计算。当watermarker - allowed lateness > window end time系统会删除窗口。

分配TimestampsAndWatermarks

  • AssignerWithPeriodicWatermarks - 定期计算watermarker 固定时间间隔 (推荐 )
class WordAssignerWithPeriodicWatermarks extends AssignerWithPeriodicWatermarks[(String,Long)] {private val maxOrderness:Long=2000L//最大乱序2sprivate var maxEventTime:Long= -1Lprivate val sdf=new SimpleDateFormat("HH:mm:ss")//计算水位线override def getCurrentWatermark: Watermark = {println("当前水位线:"+sdf.format(maxEventTime-maxOrderness))new Watermark(maxEventTime-maxOrderness)}//抽取时间戳override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long): Long = {maxEventTime=Math.max(element._2,maxEventTime)element._2}
}
fsEnv.getConfig.setAutoWatermarkInterval(1000L)
  • AssignerWithPunctuatedWatermarks - 只要有记录过来,就立即计算watermarker
class WordAssignerWithPeriodicWatermarks extends AssignerWithPeriodicWatermarks[(String,Long)] {private val maxOrderness:Long=2000L//最大乱序2sprivate var maxEventTime:Long= -1Lprivate val sdf=new SimpleDateFormat("HH:mm:ss")//计算水位线override def getCurrentWatermark: Watermark = {println("当前水位线:"+sdf.format(maxEventTime-maxOrderness))new Watermark(maxEventTime-maxOrderness)}//抽取时间戳override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long): Long = {maxEventTime=Math.max(element._2,maxEventTime)element._2}
}

综合案例

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.setParallelism(1)
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//fsEnv.getConfig.setAutoWatermarkInterval(1000L)
//a 1577672530000   10:22:10
//字符 时间戳
fsEnv.socketTextStream("HadoopNode00",9999)
.map(line=>line.split("\\s+"))
.map(tokens=>(tokens(0),tokens(1).toLong))
.assignTimestampsAndWatermarks(new WordAssignerWithPunctuatedWatermarks)
.windowAll(TumblingEventTimeWindows.of(Time.seconds(4)))
.process(new ProcessAllWindowFunction[(String,Long),String,TimeWindow]{private val sdf=new SimpleDateFormat("HH:mm:ss")override def process(context: Context, elements: Iterable[(String, Long)],out: Collector[String]): Unit = {val window = context.windowval start =sdf.format(window.getStart)val end = sdf.format(window.getEnd)var result= elements.toList.sortBy(_._2).map(t=>t._1+":"+sdf.format(t._2)).mkString(" | ")out.collect(start+" ~ "+ end +"\t"+result)}
})
.print()fsEnv.execute("FlinkWordCountsTumblingWindow_ReduceFunction")

迟到数据

默认情况下,Flink并不会处理迟到的数据,一旦水位线没过窗口endtime,后续的数据不会参与窗口的计算-原因是Flink会删除该窗口。,用户可以通过配置allowedLateness配置最大允许的延迟时间。当水位线-allowedLateness >= 窗口的endtime系统才会删除窗口。

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.setParallelism(1)
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.getConfig.setAutoWatermarkInterval(1000L)
//a 1577672530000   10:22:10
//字符 时间戳
fsEnv.socketTextStream("HadoopNode00",9999)
.map(line=>line.split("\\s+"))
.map(tokens=>(tokens(0),tokens(1).toLong))
.assignTimestampsAndWatermarks(new WordAssignerWithPunctuatedWatermarks)
.windowAll(TumblingEventTimeWindows.of(Time.seconds(4)))
.allowedLateness(Time.seconds(2))
.process(new ProcessAllWindowFunction[(String,Long),String,TimeWindow]{private val sdf=new SimpleDateFormat("HH:mm:ss")override def process(context: Context, elements: Iterable[(String, Long)],out: Collector[String]): Unit = {val window = context.windowval start =sdf.format(window.getStart)val end = sdf.format(window.getEnd)var result= elements.toList.sortBy(_._2).map(t=>t._1+":"+sdf.format(t._2)).mkString(" | ")out.collect(start+" ~ "+ end +"\t"+result)}
})
.print()fsEnv.execute("FlinkWordCountsTumblingWindow_ReduceFunction")

当水位线-allowedLateness >= Window1的endtime,且后续任有数据落入到Window1中,由于系统已经将Window1彻底删除了,后续的数据无法加入Flink窗口计算中,因此默认Flink会将该数据丢弃(太迟的数据)。用户可以使用sideout机制获取too late数据,但是这些数据无法再次参与窗口计算。

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.setParallelism(1)
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.getConfig.setAutoWatermarkInterval(1000L)//a 1577672530000   10:22:10
//字符 时间戳
val lateTag = new OutputTag[(String,Long)]("lateTag")
val windowStream = fsEnv.socketTextStream("HadoopNode00", 9999)
.map(line => line.split("\\s+"))
.map(tokens => (tokens(0), tokens(1).toLong))
.assignTimestampsAndWatermarks(new WordAssignerWithPunctuatedWatermarks)
.windowAll(TumblingEventTimeWindows.of(Time.seconds(4)))
.allowedLateness(Time.seconds(2))
.sideOutputLateData(lateTag)
.process(new ProcessAllWindowFunction[(String, Long), String, TimeWindow] {private val sdf = new SimpleDateFormat("HH:mm:ss")override def process(context: Context, elements: Iterable[(String, Long)],out: Collector[String]): Unit = {val window = context.windowval start = sdf.format(window.getStart)val end = sdf.format(window.getEnd)var result = elements.toList.sortBy(_._2).map(t => t._1 + ":" + sdf.format(t._2)).mkString(" | ")out.collect(start + " ~ " + end + "\t" + result)}
})windowStream.print("正常")
windowStream.getSideOutput(lateTag).printToErr("迟到")fsEnv.execute("FlinkWordCountsTumblingWindow_ReduceFunction")

Joining

Window Join(窗口间Join)

stream.join(otherStream).where(<KeySelector>)//第一个流 join key.equalTo(<KeySelector>)//第二个流的 join key.window(<WindowAssigner>)// 指定窗口类型.apply(<JoinFunction>) //执行 join的逻辑

Tumbling Window Join

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.getConfig.setAutoWatermarkInterval(1000L)
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.setParallelism(1)//001 zhangsan 18 1577672530000
val userstream = fsEnv.socketTextStream("HadoopNode00", 9999)
.map(line => line.split("\\s+"))
.map(tokens => (User(tokens(0), tokens(1), tokens(2).toInt), tokens(3).toLong))
.assignTimestampsAndWatermarks(new UserAssignerWithPunctuatedWatermarks)//1 apple 2 4.5 001 1577672530000
val orderstream = fsEnv.socketTextStream("HadoopNode00", 8888)
.map(line => line.split("\\s+"))
.map(tokens => (OrderItem(tokens(0), tokens(1), tokens(2).toInt,tokens(3).toDouble,tokens(4)), tokens(5).toLong))
.assignTimestampsAndWatermarks(new OrderAssignerWithPunctuatedWatermarks)userstream.join(orderstream)
.where(t=>t._1.id)
.equalTo(t=>t._1.userId)
.window(TumblingEventTimeWindows.of(Time.seconds(4)))
//.allowedLateness(Time.seconds(2))
.apply((u,o)=>{(u._1.id,u._1.name,o._1.item,o._1.count*o._1.price)
})
.print()

Sliding Window Join

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.getConfig.setAutoWatermarkInterval(1000L)
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.setParallelism(1)//001 zhangsan 18 1577672530000
val userstream = fsEnv.socketTextStream("HadoopNode00", 9999)
.map(line => line.split("\\s+"))
.map(tokens => (User(tokens(0), tokens(1), tokens(2).toInt), tokens(3).toLong))
.assignTimestampsAndWatermarks(new UserAssignerWithPunctuatedWatermarks)//1 apple 2 4.5 001 1577672530000
val orderstream = fsEnv.socketTextStream("HadoopNode00", 8888)
.map(line => line.split("\\s+"))
.map(tokens => (OrderItem(tokens(0), tokens(1), tokens(2).toInt,tokens(3).toDouble,tokens(4)), tokens(5).toLong))
.assignTimestampsAndWatermarks(new OrderAssignerWithPunctuatedWatermarks)userstream.join(orderstream)
.where(t=>t._1.id)
.equalTo(t=>t._1.userId)
.window(SlidingEventTimeWindows.of(Time.seconds(4),Time.seconds(2)))
.apply((u,o)=>{(u._1.id,u._1.name,o._1.item,o._1.count*o._1.price)
})
.print()fsEnv.execute("FlinkSlidingWindowJoin")

Session Window Join

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.getConfig.setAutoWatermarkInterval(1000L)
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.setParallelism(1)//001 zhangsan 18 1577672530000
val userstream = fsEnv.socketTextStream("HadoopNode00", 9999)
.map(line => line.split("\\s+"))
.map(tokens => (User(tokens(0), tokens(1), tokens(2).toInt), tokens(3).toLong))
.assignTimestampsAndWatermarks(new UserAssignerWithPunctuatedWatermarks)//1 apple 2 4.5 001 1577672530000
val orderstream = fsEnv.socketTextStream("HadoopNode00", 8888)
.map(line => line.split("\\s+"))
.map(tokens => (OrderItem(tokens(0), tokens(1), tokens(2).toInt,tokens(3).toDouble,tokens(4)), tokens(5).toLong))
.assignTimestampsAndWatermarks(new OrderAssignerWithPunctuatedWatermarks)userstream.join(orderstream)
.where(t=>t._1.id)
.equalTo(t=>t._1.userId)
.window(EventTimeSessionWindows.withGap(Time.seconds(2)))
.apply((u,o)=>{(u._1.id,u._1.name,o._1.item,o._1.count*o._1.price)
})
.print()fsEnv.execute("FlinkSessionWindowJoin")

Interval Join(区间Join)

The interval join joins elements of two streams (we’ll call them A & B for now) with a common key and where elements of stream B have timestamps that lie in a relative time interval to timestamps of elements in stream A.

This can also be expressed more formally as b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound] or a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.getConfig.setAutoWatermarkInterval(1000L)
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.setParallelism(1)//001 zhangsan 18 1577672530000
val userstream = fsEnv.socketTextStream("HadoopNode00", 9999)
.map(line => line.split("\\s+"))
.map(tokens => (User(tokens(0), tokens(1), tokens(2).toInt), tokens(3).toLong))
.assignTimestampsAndWatermarks(new UserAssignerWithPunctuatedWatermarks)
.keyBy(t=>t._1.id)//1 apple 2 4.5 001 1577672530000
val orderstream = fsEnv.socketTextStream("HadoopNode00", 8888)
.map(line => line.split("\\s+"))
.map(tokens => (OrderItem(tokens(0), tokens(1), tokens(2).toInt,tokens(3).toDouble,tokens(4)), tokens(5).toLong))
.assignTimestampsAndWatermarks(new OrderAssignerWithPunctuatedWatermarks)
.keyBy(_._1.userId)userstream.intervalJoin(orderstream)
.between(Time.seconds(0),Time.seconds(2))
.process(new ProcessJoinFunction[(User,Long),(OrderItem,Long),String]{override def processElement(left: (User, Long), right: (OrderItem, Long),ctx: ProcessJoinFunction[(User, Long), (OrderItem, Long), String]#Context,out: Collector[String]): Unit = {out.collect(left._1.id+","+left._1.name+","+left._1.age+","+right._1.item+","+(right._1.count*right._1.price))}
})
.print()fsEnv.execute("FlinkIntervalJoin")

CEP Pattern

FlinkCEP是构建在Flink流处理之上的的 Complex Event Processing (CEP)库。允许用户在无止境的数据流中检测到用户所关注的事件模型,并且将关注的事件模型数据抽取出来。

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep-scala_2.11</artifactId><version>1.8.1</version>
</dependency>
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
//001 zhangsan login
//001 zhangsan view
//001 zhangsan addCar
//001 zhangsan buy
val userAction=fsEnv.socketTextStream("HadoopNode00",9999)
.map(_.split("\\s+"))
.map(tokens=>(tokens(0),tokens(1),tokens(2)))
.keyBy(_._1)val pattern = Pattern.begin[(String, String, String)]("start").where(_._3.equals("login"))
.followedBy("end") //紧跟其后产生buy
.where(_._3.equals("buy"))
.timesOrMore(2) //至少2次
.consecutive()//中间不允许间断
.within(Time.seconds(10))//十秒以内发生val patternStream: PatternStream[(String, String, String)] = CEP.pattern(userAction,pattern)patternStream.select(new PatternSelectFunction[(String,String,String),String] {override def select(pattern: util.Map[String, util.List[(String, String, String)]]): String = {val start = pattern.get("start").get(0)val next = pattern.get("end")start._1+" "+start._2+" "+next.asScala.map(_._3).mkString(" | ")}
})
.print()fsEnv.execute("FlinkCEPDemo")

参考:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/cep.html

Flink HA

参考:https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/jobmanager_high_availability.html

  • 搭建HDFS HA集群,保证正常启动(以前课程)
  • 配置HADOOP_CLASSPATH
  • 配置FlinkHA(准备HadoopNode01~03)
[root@HadoopNodeXX ~]# mkdir /home/flink
[root@HadoopNodeXX ~]# tar -zxf flink-1.8.1-bin-scala_2.11.tgz -C /home/flink
[root@HadoopNodeXX ~]# cd /home/flink/
[root@HadoopNodeXX flink]# cd flink-1.8.1/
[root@HadoopNodeXX flink-1.8.1]# vi conf/masters
HadoopNode01:8081
HadoopNode02:8081
HadoopNode03:8081
[root@HadoopNodeXX flink-1.8.1]# vi conf/slaves
HadoopNode01
HadoopNode02
HadoopNode03
[root@HadoopNodeXX flink-1.8.1]# vi conf/flink-conf.yaml
taskmanager.numberOfTaskSlots: 4
parallelism.default: 3
high-availability: zookeeper
high-availability.storageDir: hdfs:///flink/ha/
high-availability.zookeeper.quorum: HadoopNode01:2181,HadoopNode02:2181,HadoopNode03:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /default_nsstate.backend: rocksdb
state.checkpoints.dir: hdfs:///flink-checkpoints
state.savepoints.dir: hdfs:///flink-savepoints
state.backend.incremental: false
state.backend.rocksdb.ttl.compaction.filter.enabled: true

启动Flink集群

[root@HadoopNode01 flink-1.8.1]# ./bin/start-cluster.sh
Starting HA cluster with 3 masters.
Starting standalonesession daemon on host HadoopNode01.
Starting standalonesession daemon on host HadoopNode02.
Starting standalonesession daemon on host HadoopNode03.
Starting taskexecutor daemon on host HadoopNode01.
Starting taskexecutor daemon on host HadoopNode02.
Starting taskexecutor daemon on host HadoopNode03.

cep-scala_2.11
1.8.1


```scala
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
//001 zhangsan login
//001 zhangsan view
//001 zhangsan addCar
//001 zhangsan buy
val userAction=fsEnv.socketTextStream("HadoopNode00",9999)
.map(_.split("\\s+"))
.map(tokens=>(tokens(0),tokens(1),tokens(2)))
.keyBy(_._1)val pattern = Pattern.begin[(String, String, String)]("start").where(_._3.equals("login"))
.followedBy("end") //紧跟其后产生buy
.where(_._3.equals("buy"))
.timesOrMore(2) //至少2次
.consecutive()//中间不允许间断
.within(Time.seconds(10))//十秒以内发生val patternStream: PatternStream[(String, String, String)] = CEP.pattern(userAction,pattern)patternStream.select(new PatternSelectFunction[(String,String,String),String] {override def select(pattern: util.Map[String, util.List[(String, String, String)]]): String = {val start = pattern.get("start").get(0)val next = pattern.get("end")start._1+" "+start._2+" "+next.asScala.map(_._3).mkString(" | ")}
})
.print()fsEnv.execute("FlinkCEPDemo")

参考:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/cep.html

Flink HA

参考:https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/jobmanager_high_availability.html

  • 搭建HDFS HA集群,保证正常启动(以前课程)
  • 配置HADOOP_CLASSPATH
  • 配置FlinkHA(准备HadoopNode01~03)
[root@HadoopNodeXX ~]# mkdir /home/flink
[root@HadoopNodeXX ~]# tar -zxf flink-1.8.1-bin-scala_2.11.tgz -C /home/flink
[root@HadoopNodeXX ~]# cd /home/flink/
[root@HadoopNodeXX flink]# cd flink-1.8.1/
[root@HadoopNodeXX flink-1.8.1]# vi conf/masters
HadoopNode01:8081
HadoopNode02:8081
HadoopNode03:8081
[root@HadoopNodeXX flink-1.8.1]# vi conf/slaves
HadoopNode01
HadoopNode02
HadoopNode03
[root@HadoopNodeXX flink-1.8.1]# vi conf/flink-conf.yaml
taskmanager.numberOfTaskSlots: 4
parallelism.default: 3
high-availability: zookeeper
high-availability.storageDir: hdfs:///flink/ha/
high-availability.zookeeper.quorum: HadoopNode01:2181,HadoopNode02:2181,HadoopNode03:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /default_nsstate.backend: rocksdb
state.checkpoints.dir: hdfs:///flink-checkpoints
state.savepoints.dir: hdfs:///flink-savepoints
state.backend.incremental: false
state.backend.rocksdb.ttl.compaction.filter.enabled: true

启动Flink集群

[root@HadoopNode01 flink-1.8.1]# ./bin/start-cluster.sh
Starting HA cluster with 3 masters.
Starting standalonesession daemon on host HadoopNode01.
Starting standalonesession daemon on host HadoopNode02.
Starting standalonesession daemon on host HadoopNode03.
Starting taskexecutor daemon on host HadoopNode01.
Starting taskexecutor daemon on host HadoopNode02.
Starting taskexecutor daemon on host HadoopNode03.

用户可以通过 ./bin/jobmanager.sh start|stop

Apache Flink_JZZ166_MBY相关推荐

  1. Docker安装Apache与运行简单的web服务——httpd helloworld

    Docker运行简单的web服务--httpd helloworld目录[阅读时间:约5分钟] 一.Docker简介 二.Docker的安装与配置[CentOS环境] 三.Docker运行简单的web ...

  2. Apache Maven 安装与配置-修改源

    Maven配置,强大的Java包管理器 Maven介绍 Apache Maven 下载 Apache Maven 安装 要求 启动 解压到适当的文件夹 添加环境变量 修改配置 启动测试 Maven介绍 ...

  3. debian10 简单的bash脚本监控apache运行状态

    需求: 在Rserver上编写脚本监控公司的网站运行情况: 脚本可以在后台持续运行: 每隔3S检查一次网站的运行状态,如果发现异常尝试3次: 如果确定网站无法访问,则返回用户"网站正在维护中 ...

  4. Apache POI:解决数据库和Excel之间相互转换的烦恼~

    目录 引言 一.简介 二.POI-Excel 写 1.创建项目 2.引入依赖 3.步骤 1. 创建工作簿 2. 创建工作表 3. 创建行 4. 创建单元格 5. 单元格中填入数据 6. 通过IO流生成 ...

  5. 使用Apache TVM将机器学习编译为WASM和WebGPU

    使用Apache TVM将机器学习编译为WASM和WebGPU TLDR 在Apache TVM深度学习编译器中引入了对WASM和WebGPU的支持.实验表明,在将模型部署到Web时,TVM的WebG ...

  6. 大规模数据处理Apache Spark开发

    大规模数据处理Apache Spark开发 Spark是用于大规模数据处理的统一分析引擎.它提供了Scala.Java.Python和R的高级api,以及一个支持用于数据分析的通用计算图的优化引擎.它 ...

  7. 2021年大数据ELK(二十二):采集Apache Web服务器日志

    全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 采集Apache Web服务器日志 一.需求 二.准备日志数据 三.使用Fil ...

  8. 2021年大数据HBase(十二):Apache Phoenix 二级索引

    全网最详细的大数据HBase文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 前言 Apache Phoenix 二级索引 一.索引分类 ...

  9. 2021年大数据HBase(十一):Apache Phoenix的视图操作

    全网最详细的大数据HBase文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 前言 Apache Phoenix的视图操作 一.应用场景 ...

最新文章

  1. Google Analytics功能篇 - 如何跟踪邮件打开率与点击率
  2. CentOS 7 修改时区例如上海时区
  3. 手机长时间不用自动断网_不用蓝牙的感应音箱,只需百元!放上手机自动播放,媲美千元音质...
  4. golang中的strings.SplitAfter
  5. Codeforces 919D Substring (拓扑图DP)
  6. 02.uri-search
  7. iframe嵌套改变url地址
  8. Burpsuite学习(4) 1
  9. 如何低成本实现Flutter富文本,看这一篇就够了!
  10. Vue err:This dependency was not found
  11. db2设置默认schema_dataSource配置jdbc连接db2源url项指定currentSchema
  12. WDS+MDT网络部署操作系统
  13. 计算机鼠标知识,计算机基本组成及键盘鼠标知识.ppt
  14. Win10系统如何关闭防火墙?
  15. 人脸识别服务器架构设计
  16. Windows电脑 添加 安卓或者苹果平板作为拓展屏(spacedesk )
  17. [C]sprintf用法
  18. 如何将镜像文件上传到服务器,通过把docker镜像保存为文件载入到别的服务器
  19. arcgis 栅格数据热点分析
  20. 20145204《信息安全系统设计基础》课程总结

热门文章

  1. linux ln -sv命令,linux ln 命令详解
  2. 笔记本触控板无法滑动解决办法
  3. 鏖战双十一:阿里直播平台面临的技术挑战
  4. 眼动数据分析基础_02
  5. Unity - Ray射线检测
  6. opencore 启动总是在win_黑苹果OpenCore引导总结
  7. 一文帮你理解模型选择方法:AIC、BIC和交叉验证!
  8. 一些受益匪浅的句子,太哲理了
  9. 红牛整装待发,功能饮料市场地位不可撼动
  10. 华为路由器AR6300 取消密码重置提醒和密码长期有效