一、Flink 流处理 API

1、Environment

  • getExecutionEnvironment

创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则
此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法
返回此集群的执行环境,也就是说,getExecutionEnvironment 会根据查询运行的方
式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。

(1) 从集合中读取数据

具体代码实现:

package com.apache.flinkapiimport org.apache.flink.streaming.api.scala._//温度传感器读取样例类
case class SensorReading( id: String, timestamp: Long, temperature: Double )object SourceTest {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment// 1. 从集合中读取数据val stream1 = env.fromCollection(List(SensorReading("sensor_1", 1547718199, 35.80018327300259),SensorReading("sensor_6", 1547718201, 15.402984393403084),SensorReading("sensor_7", 1547718202, 6.720945201171228),SensorReading("sensor_10", 1547718205, 38.101067604893444)))stream1.print("stream1").setParallelism(1)  //并行度为1//stream1.print("stream1").setParallelism(6)  //并行度为6env.execute("source api test")}
}

启动程序,控制台打印数据信息

(2)第二种方式:从文件读取数据

先在src/main/resources文件目录下创建sensor.txt文本,里面编辑的内容如下:

sensor_1,  1547718199,  35.80018327300259
sensor_6,  1547718201,  15.402984393403084
sensor_7,  1547718202,  6.720945201171228
sensor_10, 1547718205,  38.101067604893444

代码实现:

package com.apache.flinkapiimport org.apache.flink.streaming.api.scala._//温度传感器读取样例类
case class SensorReading( id: String, timestamp: Long, temperature: Double )object SourceTest {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment// 1. 从集合中读取数据val stream1 = env.fromCollection(List(SensorReading("sensor_1", 1547718199, 35.80018327300259),SensorReading("sensor_6", 1547718201, 15.402984393403084),SensorReading("sensor_7", 1547718202, 6.720945201171228),SensorReading("sensor_10", 1547718205, 38.101067604893444)))//2、从文件读取数据val stream2 = env.readTextFile("D:Flinksrcmainresourcessensor.txt")stream2.print("stream2").setParallelism(1)env.execute("source api test")}
}

启动程序,控制台打印数据信息

另外:

(3)第三种方式:从kafka中读取数据

  • 需要在pom文件引入 kafka 连接器的依赖
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_2.11</artifactId><version>1.7.2</version>
</dependency>

  • 编写代码

基本代码实现:

val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "consumer-group") properties.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
val stream3 = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new
SimpleStringSchema(), properties))

具体代码实现:

package com.apache.flinkapi
import java.util.Propertiesimport org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
//温度传感器读取样例类
case class SensorReading( id: String, timestamp: Long, temperature: Double )object SourceTest {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment// 1. 从集合中读取数据val stream1 = env.fromCollection(List(SensorReading("sensor_1", 1547718199, 35.80018327300259),SensorReading("sensor_6", 1547718201, 15.402984393403084),SensorReading("sensor_7", 1547718202, 6.720945201171228),SensorReading("sensor_10", 1547718205, 38.101067604893444)))// env.fromElements(1,2.0,"string").print()//2、从文件读取数据val stream2 = env.readTextFile("D:Flinksrcmainresourcessensor.txt")//3、从kafka中读取数据val properties = new Properties()properties.setProperty("bootstrap.servers", "spark2.x:9092")properties.setProperty("group.id", "consumer-group")properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")properties.setProperty("auto.offset.reset", "latest")val stream3 = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))stream3.print("stream3").setParallelism(1)env.execute("source api test")}
}

  • 先把kakfka进程启动起来
[root@spark2 kafka]# jps
18276 Kafka
17468 QuorumPeerMain
18574 Jps

  • 创建topic
//创建一个topic名叫sensor
[root@spark2 kafka]# bin/kafka-topics.sh --create --zookeeper spark2.x:2181 --partitions 2 --replication-factor 1 --topic sensor
Created topic "sensor".//创建生产者
[root@spark2 kafka]# bin/kafka-console-producer.sh --broker-list spark2.x:9092 --topic sensor// 先把idea代码程序跑起来,之后我们就可以在kafka环境生产数据
>sensor_1,  1547718199,  35.80018327300259
>sensor_6,  1547718201,  15.402984393403084
>sensor_7,  1547718202,  6.720945201171228
>sensor_1,  1547718199,  35.80018327300259
>sensor_10, 1547718205,  38.101067604893444        

之后,先把idea代码程序跑起来,之后我们就可以在kafka环境生产数据了,如图所示:

总结:

往大方面想,比如:保证kafka数据的容错性、一致性,在集群运行环境过程中,若机器发生宕机了,如何解决?涉及检查点,而这个检查点(Checkpoints),它原理机制:就是存盘

当我们使用kafka把数据读进来后,它 的偏移量就发生改变,若想把kakfa的数据进行再消费一遍,如何解决?

在spark中

第一种方式:,在整个处理完成之后,加一个机制,不要让它消费,直接把它的偏移量更改,到最后整个完成之后,再把它的偏移量做更改

第二种方式:在恢复之前后的状态的保存点的数据都要存下来,恢复的时候在之前kafka的偏移量手动更改,重新提交一次,相当于告诉kafka,重新再消费一次

在Flink中

与spark有所不同,区别:spark是一批一批的读取数据,而flink是一条一条的读取数据;

另外,flink的特性是:有状态的流处理,在这种过程中可以保证状态,所以可以把我们kafka的偏移量作为状态保存下来,言下之意:等到恢复的时候有检查点机制,后面处理的过程中也挂了,在之前存盘的重新读取、重新恢复,就相当于把之前偏移量恢复出来,然后flink本身的kafka连接器实现了,相当于重新手动提交的偏移量

(4)第四种方式:自定义数据源

我们希望可以随机生成传感器数据,基本代码实现:

class MySensorSource extends SourceFunction[SensorReading]{// flag: 表示数据源是否还在正常运行
var running: Boolean = true
override def cancel(): Unit = {running = false
}
override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {// 初始化一个随机数发生器
val rand = new Random()
var curTemp = 1.to(10).map(
i => ( "sensor_" + i, 65 + rand.nextGaussian() * 20 ) )
while(running){// 更新温度值
curTemp = curTemp.map(
t => (t._1, t._2 + rand.nextGaussian() )
)
// 获取当前时间戳
val curTime = System.currentTimeMillis()
curTemp.foreach(
t => ctx.collect(SensorReading(t._1, curTime, t._2))
)
Thread.sleep(100) } } }

具体代码实现:

package com.apache.flinkapi
import java.util.Propertiesimport org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.functions.source.SourceFunctionimport scala.util.Random
//温度传感器读取样例类
case class SensorReading( id: String, timestamp: Long, temperature: Double )object SourceTest {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment// 1. 从集合中读取数据val stream1 = env.fromCollection(List(SensorReading("sensor_1", 1547718199, 35.80018327300259),SensorReading("sensor_6", 1547718201, 15.402984393403084),SensorReading("sensor_7", 1547718202, 6.720945201171228),SensorReading("sensor_10", 1547718205, 38.101067604893444)))// env.fromElements(1,2.0,"string").print()//2、从文件读取数据val stream2 = env.readTextFile("D:Flinksrcmainresourcessensor.txt")//3、从kafka中读取数据val properties = new Properties()properties.setProperty("bootstrap.servers", "spark2.x:9092")properties.setProperty("group.id", "consumer-group")properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")properties.setProperty("auto.offset.reset", "latest")val stream3 = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))// 4. 自定义数据源val stream4 = env.addSource(new SensorSource())stream4.print("stream4").setParallelism(1)env.execute("source api test")}
}
class SensorSource() extends SourceFunction[SensorReading]{// 定义一个flag:表示数据源是否还在正常运行var running: Boolean = true//取消数据源的生成override def cancel(): Unit = {running =false}//正常生成的数据override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {//初始化一个随机数发生器val rand = new Random()//初始化定义一组传感器温度数据var curTemp=1.to(10).map(//nextGaussian高之分布随机数相当于坐标函数i =>("sensor_"+i,60 + rand.nextGaussian()*10))//无限循环产生流数据while(running){//在前一次温度的基础上更新温度curTemp = curTemp.map(t => (t._1, t._2 + rand.nextGaussian()))//获取当前的时间戳val curTime = System.currentTimeMillis()// 包装成SensorReading,输出curTemp.foreach(t => ctx.collect( SensorReading(t._1, curTime, t._2) ))// 间隔100msThread.sleep(100)}}
}

启动程序,控制台打印数据信息:数据不断产生、时间戳、温度不断变化

数据不断产生https://www.zhihu.com/video/1241021669457362944

2、Transform--转换算子

(1)map

val streamMap = stream.map { x => x * 2 }

(2)flatMap

flatMap 的函数签名:def flatMap[A,B](as: List[A])(f: A ⇒ List[B]): List[B]
例如: flatMap(List(1,2,3))(i ⇒ List(i,i))
结果是 List(1,1,2,2,3,3),
而 List("a b", "c d").flatMap(line ⇒ line.split(" "))
结果是 List(a, b, c, d)。

val streamFlatMap = stream.flatMap{ x => x.split(" ")
}

(3) Filter

val streamFilter = stream.filter{x => x == 1
}

(4)KeyBy

DataStream → KeyedStream:逻辑地将一个流拆分成不相交的分区,每个分

区包含具有相同 key 的元素,在内部以 hash 的形式实现的。

(5) 滚动聚合算子(Rolling Aggregation)

这些算子可以针对 KeyedStream 的每一个支流做聚合。

  •  sum()
  •  min()
  •  max()
  •  minBy()
  •  maxBy()

(6)Reduce

KeyedStream → DataStream:一个分组数据流的聚合操作,合并当前的元素
和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是
只返回最后一次聚合的最终结果。

val stream2 = env.readTextFile("YOUR_PATHsensor.txt").map( data => {val dataArray = data.split(",")SensorReading(dataArray(0).trim, dataArray(1).trim.toLong,
dataArray(2).trim.toDouble)}).keyBy("id").reduce( (x, y) => SensorReading(x.id, x.timestamp + 1, y.temperature) )

具体代码实现:基本转换算子

package com.apache.flinkapiimport org.apache.flink.streaming.api.scala._object TransformTest {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment// 读入数据val streamFromFile = env.readTextFile("D:Flinksrcmainresourcessensor.txt")val dataStream: DataStream[SensorReading] = streamFromFile.map(data => {val dataArray = data.split(",")SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)}).keyBy(0).sum(2)dataStream.print()env.execute("transform test job")}
}

启动程序,控制台打印数据信息:

需求:输出当前传感器最新的温度+10,而时间戳是上一次数据 时间+1

具体代码实现:聚合算子

package com.apache.flinkapiimport org.apache.flink.streaming.api.scala._object TransformTest {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment// 读入数据val streamFromFile = env.readTextFile("D:Flinksrcmainresourcessensor.txt")//1、基本转换算子与聚合算子val dataStream: DataStream[SensorReading] = streamFromFile.map(data => {val dataArray = data.split(",")SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)}).keyBy(0)// .sum(2)//输出当前传感器最新的温度+10,而时间戳是上一次数据 时间+1.reduce((x,y)=>SensorReading(x.id,x.timestamp+1,y.temperature+10))dataStream.print()env.execute("transform test job")}
}

启动程序,控制台打印数据信息:

(7)Split 和 Select(spile分流操作

DataStream → SplitStream:根据某些特征把一个 DataStream 拆分成两个或者
多个 DataStream。

SplitStream→DataStream:从一个 SplitStream 中获取一个或者多个 DataStream。
需求:传感器数据按照温度高低(以 30 度为界),拆分成两个流。

spile分流,具体代码实现

package com.apache.flinkapiimport org.apache.flink.streaming.api.scala._object TransformTest {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment// 读入数据val streamFromFile = env.readTextFile("D:Flinksrcmainresourcessensor.txt")//1、基本转换算子与聚合算子val dataStream: DataStream[SensorReading] = streamFromFile.map(data => {val dataArray = data.split(",")SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)})val aggStream=dataStream.keyBy("id")// .sum(2)//输出当前传感器最新的温度+10,而时间戳是上一次数据 时间+1.reduce((x,y)=>SensorReading(x.id,x.timestamp+1,y.temperature+10))//2、多流转换算子//spile分流val splitStream = dataStream.split(data=>{if(data.temperature > 30) Seq("hight")else Seq("low")})val hight=splitStream.select("hight")val low=splitStream.select("low")val all=splitStream.select("hight","low")//   dataStream.print()hight.print("hight")low.print("low")low.print("low")env.execute("transform test job")}
}

启动程序,控制台打印数据信息:

(8)Connect 和 CoMap (合并操作)

DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据
流,两个数据流被 Connect 之后,只是被放在了一个同一个流中,内部依然保持各
自的数据和形式不发生任何变化,两个流相互独立。

ConnectedStreams → DataStream:作用于 ConnectedStreams 上,功能与 map
和 flatMap 一样,对 ConnectedStreams 中的每一个 Stream 分别进行 map 和 flatMap
处理。

具体代码实现:

package com.apache.flinkapiimport org.apache.flink.streaming.api.scala._object TransformTest {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment// 读入数据val streamFromFile = env.readTextFile("D:Flinksrcmainresourcessensor.txt")//1、基本转换算子与聚合算子val dataStream: DataStream[SensorReading] = streamFromFile.map(data => {val dataArray = data.split(",")SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)})val aggStream=dataStream.keyBy("id")// .sum(2)//输出当前传感器最新的温度+10,而时间戳是上一次数据 时间+1.reduce((x,y)=>SensorReading(x.id,x.timestamp+1,y.temperature+10))//2、多流转换算子//spile分流val splitStream = dataStream.split(data=>{if(data.temperature > 30) Seq("hight")else Seq("low")})val hight=splitStream.select("hight")val low=splitStream.select("low")val all=splitStream.select("hight","low")//合并流val warning =hight.map(data =>(data.id,data.temperature))val connectStream = warning.connect(low)val coMapDataStream = connectStream.map(warningData =>(warningData._1,warningData._2,"warning"),lowData =>(lowData.id,"healthy"))coMapDataStream.print()env.execute("transform test job")}
}

启动程序,控制台打印数据信息:

DataStream → DataStream:对两个或者两个以上的 DataStream 进行 union 操
作,产生一个包含所有 DataStream 元素的新 DataStream。

//合并以后打印
val unionStream: DataStream[StartUpLog] = appStoreStream.union(otherStream)
unionStream.print("union:::") 

Connect 与 Union 区别:

  • Union 之前两个流的类型必须是一样,Connect 可以不一样,在之后的 coMap 中再去调整成为一样的。
  • Connect 只能操作两个流,Union 可以操作多个

union方式,具体代码实现:

package com.apache.flinkapiimport org.apache.flink.streaming.api.scala._object TransformTest {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment// 读入数据val streamFromFile = env.readTextFile("D:Flinksrcmainresourcessensor.txt")//1、基本转换算子与聚合算子val dataStream: DataStream[SensorReading] = streamFromFile.map(data => {val dataArray = data.split(",")SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)})val aggStream=dataStream.keyBy("id")// .sum(2)//输出当前传感器最新的温度+10,而时间戳是上一次数据 时间+1.reduce((x,y)=>SensorReading(x.id,x.timestamp+1,y.temperature+10))//2、多流转换算子//spile分流val splitStream = dataStream.split(data=>{if(data.temperature > 30) Seq("hight")else Seq("low")})val hight=splitStream.select("hight")val low=splitStream.select("low")val all=splitStream.select("hight","low")//3、合并流val warningStream =hight.map(data =>(data.id,data.temperature))val connectStream = warningStream.connect(low)val coMapDataStream = connectStream.map(warningData =>(warningData._1,warningData._2,"warning"),lowData =>(lowData.id,"healthy"))//union关键代码val unionStream = hight.union(low)unionStream.print("unionStream")env.execute("transform test job")}
}

启动程序,控制台打印数据信息:

3、支持的数据类型

Flink 流应用程序处理的是以数据对象表示的事件流。所以在 Flink 内部,我们
需要能够处理这些对象。它们需要被序列化和反序列化,以便通过网络传送它们;
或者从状态后端、检查点和保存点读取它们。为了有效地做到这一点,Flink 需要明
确知道应用程序所处理的数据类型。Flink 使用类型信息的概念来表示数据类型,并
为每个数据类型生成特定的序列化器、反序列化器和比较器。
Flink 还具有一个类型提取系统,该系统分析函数的输入和返回类型,以自动获
取类型信息,从而获得序列化器和反序列化器。但是,在某些情况下,例如 lambda
函数或泛型类型,需要显式地提供类型信息,才能使应用程序正常工作或提高其性
能。

Flink 支持 Java 和 Scala 中所有常见数据类型。使用最广泛的类型有以下几种。

  • 基础数据类型

Flink 支持所有的 Java 和 Scala 基础数据类型,Int, Double, Long, String, …

val numbers: DataStream[Long] = env.fromElements(1L, 2L, 3L, 4L)
numbers.map( n => n + 1 )

  • Java 和 Scala 元组(Tuples)
val persons: DataStream[(String, Integer)] = env.fromElements( ("Adam", 17),
("Sarah", 23) )
persons.filter(p => p._2 > 18)

  • Scala 样例类(case classes)
case class Person(name: String, age: Int)
val persons: DataStream[Person] = env.fromElements(
Person("Adam", 17),
Person("Sarah", 23) )
persons.filter(p => p.age > 18)

  • Java 简单对象(POJOs)
public class Person {public String name;
public int age;public Person() {}public Person(String name, int age) {
this.name = name;
this.age = age;
} }
DataStream<Person> persons = env.fromElements(
new Person("Alex", 42),
new Person("Wendy", 23));

  • 其它(Arrays, Lists, Maps, Enums, 等等)

Flink 对 Java 和 Scala 中的一些特殊目的的类型也都是支持的,比如 Java 的
ArrayList,HashMap,Enum 等等。

4、实现 UDF 函数——更细粒度的控制流

(1)函数类(Function Classes)

Flink 暴露了所有 udf 函数的接口(实现方式为接口或者抽象类)。例如
MapFunction, FilterFunction, ProcessFunction 等等。
下面例子实现了 FilterFunction 接口:

class FilterFilter extends FilterFunction[String] {override def filter(value: String): Boolean = {value.contains("flink")}
}
val flinkTweets = tweets.filter(new FlinkFilter)

还可以将函数实现成匿名类

val flinkTweets = tweets.filter(
new RichFilterFunction[String] {override def filter(value: String): Boolean = {value.contains("flink") }}})

我们 filter 的字符串"flink"还可以当作参数传进去

val tweets: DataStream[String] = ...
val flinkTweets = tweets.filter(new KeywordFilter("flink"))
class KeywordFilter(keyWord: String) extends FilterFunction[String] {override def filter(value: String): Boolean = {value.contains(keyWord)
}}

具体代码实现:

package com.apache.flinkapiimport org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.streaming.api.scala._object TransformTest {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment// 读入数据val streamFromFile = env.readTextFile("D:Flinksrcmainresourcessensor.txt")//1、基本转换算子与聚合算子val dataStream: DataStream[SensorReading] = streamFromFile.map(data => {val dataArray = data.split(",")SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)})val aggStream=dataStream.keyBy("id")//输出当前传感器最新的温度+10,而时间戳是上一次数据 时间+1.reduce((x,y)=>SensorReading(x.id,x.timestamp+1,y.temperature+10))//2、多流转换算子//spile分流val splitStream = dataStream.split(data=>{if(data.temperature > 30) Seq("hight")else Seq("low")})val hight=splitStream.select("hight")val low=splitStream.select("low")val all=splitStream.select("hight","low")//3、合并流val warningStream =hight.map(data =>(data.id,data.temperature))val connectStream = warningStream.connect(low)val coMapDataStream = connectStream.map(warningData =>(warningData._1,warningData._2,"warning"),lowData =>(lowData.id,"healthy"))//union关键代码val unionStream = hight.union(low)//UDF 函数dataStream.filter( new MyFilter() ).print()env.execute("transform test job")}
}class MyFilter() extends FilterFunction[SensorReading]{override def filter(t: SensorReading): Boolean = {t.id.startsWith("sensor_1")}
}

启动程序,控制台打印数据信息:

(2)匿名函数(Lambda Functions)

  dataStream.filter( data => data.id.startsWith("sensor_1") ).print()

(3)富函数(Rich Functions)

“富函数”是 DataStream API 提供的一个函数类的接口,所有 Flink 函数类都
有其 Rich 版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一
些生命周期方法,所以可以实现更复杂的功能。

  •  RichMapFunction
  •  RichFlatMapFunction
  •  RichFilterFunction
  •  …

Rich Function 有一个生命周期的概念。典型的生命周期方法有:

  •  open()方法是 rich function 的初始化方法,当一个算子例如 map 或者 filter

被调用之前 open()会被调用。

  •  close()方法是生命周期中的最后一个调用的方法,做一些清理工作。
  •  getRuntimeContext()方法提供了函数的 RuntimeContext 的一些信息,例如函

数执行的并行度,任务的名字,以及 state 状态

class MyFlatMap extends RichFlatMapFunction[Int, (Int, Int)] {var subTaskIndex = 0
override def open(configuration: Configuration): Unit = {subTaskIndex = getRuntimeContext.getIndexOfThisSubtask
// 以下可以做一些初始化工作,例如建立一个和 HDFS 的连接
}
override def flatMap(in: Int, out: Collector[(Int, Int)]): Unit = {if (in % 2 == subTaskIndex) {out.collect((subTaskIndex, in))
} }
override def close(): Unit = {// 以下做一些清理工作,例如断开和 HDFS 的连接。
}
}

5、Sink

Flink 没有类似于 spark 中 foreach 方法,让用户进行迭代的操作。虽有对外的
输出操作都要利用 Sink 完成。最后通过类似如下方式完成整个任务最终输出操作。

stream.addSink(new MySink(xxxx))

官方提供了一部分的框架的 sink。除此以外,需要用户自定义实现 sink。

(1)pom.xml 引入

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_2.11</artifactId><version>1.7.2</version>
</dependency>

(2)Kafka Sink代码编写

package com.apache.flinkapiimport org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011object KafkaSinkTest {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)//2、从文件读取数据val inputStream = env.readTextFile("D:Flinksrcmainresourcessensor.txt")// Transform操作val dataStream = inputStream.map(data => {val dataArray = data.split(",")SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble ).toString  // 转成String方便序列化输出})// sinkdataStream.addSink( new FlinkKafkaProducer011[String]( "spark2.x:9092","sinkTest", new SimpleStringSchema()) )dataStream.print()env.execute("kafka sink test")}
}

(3)虚拟机创建kafka的消费者

[root@spark2 kafka]# bin/kafka-console-consumer.sh --bootstrap-server spark2.x:9092 --topic sinkTest

(4 ) 启动程序,控制台打印数据信息:

上面这种方式方便测试,而在工作当中,我们在获取数据不是在文件读数据的,我们使用kafka一边生产数据一边消费数据,这种方式需要如下操作:

  • Kafka Sink代码编写
package com.apache.flinkapiimport java.util.Propertiesimport org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}object KafkaSinkTest {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val properties = new Properties()properties.setProperty("bootstrap.servers", "spark2.x:9092")properties.setProperty("group.id", "consumer-group")properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")properties.setProperty("auto.offset.reset", "latest")val inputStream = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))// Transform操作val dataStream = inputStream.map(data => {val dataArray = data.split(",")SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble ).toString  // 转成String方便序列化输出})// sinkdataStream.addSink( new FlinkKafkaProducer011[String]( "spark2.x:9092","sinkTest", new SimpleStringSchema()) )dataStream.print()env.execute("kafka sink test")}
}

  • 虚拟机在kafka创建生产数据与消费数据
//启动kafka服务
[root@spark2 kafka]# bin/kafka-server-start.sh config/server.properties &//查看kafka进程
[root@spark2 ~]# jps
25633 Kafka
26721 ConsoleProducer
17468 QuorumPeerMain
[root@spark2 ~]#//创建生产者
[root@spark2 kafka]# bin/kafka-console-producer.sh --broker-list spark2.x:9092 --topic sensor//创建消费者
[root@spark2 kafka]# bin/kafka-console-consumer.sh --bootstrap-server spark2.x:9092 --topic sinkTest

  • 启动idea代码程序,当我们在虚拟机的kafka生产数据,控制台也将会打印出来,同时消费者也会消费生产者的 数据,如图所示:

视频演示:

Kafka Sink生产与消费https://www.zhihu.com/video/1241138716938530816

6、Redis 的Sink操作

(1)pom.xml 引入

<dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version>
</dependency>

(2)Redis 代码编写

package com.apache.flinkapiimport org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}object RedisSinkTest {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)//从文件读取数据val inputStream = env.readTextFile("D:Flinksrcmainresourcessensor.txt")// transformval dataStream = inputStream.map(data => {val dataArray = data.split(",")SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble )})val conf = new FlinkJedisPoolConfig.Builder().setHost("spark2.x").setPort(6379).build()// sinkdataStream.addSink(new RedisSink(conf, new MyRedisMapper()))env.execute("redis sink test")}}
class MyRedisMapper() extends RedisMapper[SensorReading]{// 定义保存数据到redis的命令override def getCommandDescription: RedisCommandDescription = {// 把传感器id和温度值保存成哈希表 HSET key field valuenew RedisCommandDescription( RedisCommand.HSET, "sensor_temperature" )}// 定义保存到redis的keyoverride def getKeyFromData(t: SensorReading): String = t.id// 定义保存到redis的valueoverride def getValueFromData(t: SensorReading): String = t.temperature.toString
}

(3)虚拟机命令行基本操作

//启动redis服务
[root@spark2 ~]# service redisd start
Starting Redis server...
8730:C 17 Feb 13:23:28.977 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
8730:C 17 Feb 13:23:28.978 # Redis version=4.0.6, bits=64, commit=00000000, modified=0, pid=8730, just started
8730:C 17 Feb 13:23:28.978 # Configuration loaded//启动redis客户端
[root@spark2 ~]# redis-cli
127.0.0.1:6379> //查看当前的数据,表示是空数据
127.0.0.1:6379> keys *
(empty list or set)
127.0.0.1:6379> 

(4)启动idea代码的程序,控制台打印无数据,表示跑完了

(5)再次到redis的数据库查看

查看当前的数据,发现数据进来了
127.0.0.1:6379> keys *
1) "sensor_temperature"//查看数据内容
127.0.0.1:6379> hgetall sensor_temperature1) "sensor_1"2) "31.0"3) "sensor_6"4) "15.402984393403084"5) "sensor_5"6) "21.0"7) "sensor_7"8) "6.720945201171228"9) "sensor_10"
10) "38.101067604893444"
127.0.0.1:6379> 

7、Elasticsearch的Sink操作

(1)pom.xml 引入

  <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch6_2.11</artifactId><version>1.7.2</version></dependency>

(2)Elasticsearch代码编写

package com.apache.flinkapiimport java.util
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requestsobject EsSinkTest {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)//从文件读取数据val inputStream = env.readTextFile("D:Flinksrcmainresourcessensor.txt")// transformval dataStream = inputStream.map(data => {val dataArray = data.split(",")SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble )})val httpHosts = new util.ArrayList[HttpHost]()httpHosts.add(new HttpHost("spark2.x", 9200))// 创建一个esSink 的builderval esSinkBuilder = new ElasticsearchSink.Builder[SensorReading](httpHosts, new ElasticsearchSinkFunction[SensorReading] {override def process(element: SensorReading, ctx: RuntimeContext, indexer: RequestIndexer): Unit = {println("saving data: " + element)// 包装成一个Map或者JsonObjectval json = new util.HashMap[String, String]()json.put("sensor_id", element.id)json.put("temperature", element.temperature.toString)json.put("ts", element.timestamp.toString)// 创建index request,准备发送数据val indexRequest = Requests.indexRequest().index("sensor").`type`("readingdata").source(json)// 利用index发送请求,写入数据indexer.add(indexRequest)println("data saved.")}})// sinkdataStream.addSink(esSinkBuilder.build() )env.execute("es sink test")}
}

(3) 配置Elasticsearch环境,参照自己写的博客:大数据ELK环境平台搭建

查看当前所有的索引:http://spark2.x:9200/_cat/indices?v

(4)启动idea代码的程序,控制台打印出数据

当然,若想查看数据的详细信息,可以这样查看:http://spark2.x:9200/sensor

8、JDBC的Sink操作

(1)pom.xml 引入

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.44</version>
</dependency>

(2)代码编写

package com.apache.flinkapiimport java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._
/*** JDBC的Sink操作*/
object JdbcSinkTest {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)//从文件读取数据val inputStream = env.readTextFile("D:Flinksrcmainresourcessensor.txt")// transformval dataStream = inputStream.map(data => {val dataArray = data.split(",")SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)})// sinkdataStream.addSink( new MyJdbcSink() )env.execute("jdbc sink test")}}
class MyJdbcSink() extends RichSinkFunction[SensorReading]{// 定义sql连接、预编译器var conn: Connection = _var insertStmt: PreparedStatement = _var updateStmt: PreparedStatement = _// 初始化,创建连接和预编译语句override def open(parameters: Configuration): Unit = {super.open(parameters)conn = DriverManager.getConnection("jdbc:mysql://spark2.x:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=false", "root", "123456")insertStmt = conn.prepareStatement("INSERT INTO temperatures (sensor, temp) VALUES (?,?)")updateStmt = conn.prepareStatement("UPDATE temperatures SET temp = ? WHERE sensor = ?")}// 调用连接,执行sqloverride def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {// 执行更新语句updateStmt.setDouble(1, value.temperature)updateStmt.setString(2, value.id)updateStmt.execute()// 如果update没有查到数据,那么执行插入语句if( updateStmt.getUpdateCount == 0 ){insertStmt.setString(1, value.id)insertStmt.setDouble(2, value.temperature)insertStmt.execute()}}// 关闭时做清理工作override def close(): Unit = {insertStmt.close()updateStmt.close()conn.close()}
}

(3)虚拟机mysql数据库基本创建

//创建test数据库
mysql> create database test;
Query OK, 1 row affected (0.00 sec)//显示当前数据库,看看test是否成功被创建出来
mysql> show databases;
+--------------------+
| Database           |
+--------------------+
| information_schema |
| mysql              |
| performance_schema |
| sys                |
| test               |
+--------------------+
5 rows in set (0.00 sec)//使用test数据库
mysql> use test;
Database changed//显示数据表有哪些,没有就创建
mysql> show tables;
Empty set (0.00 sec)//创建数据表temperatures
mysql> create table temperatures( ->  sensor varchar(20)not null,-> temp double not null);
Query OK, 0 rows affected (0.01 sec)//查看数据表temperatures
mysql> show tables;
+----------------+
| Tables_in_test |
+----------------+
| temperatures   |
+----------------+
1 row in set (0.00 sec)

(4)启动idea代码的程序,控制台打印无数据,表示跑完了

( 5) 在虚拟机查看数据,进来了

mysql> select * from temperatures;
+-----------+--------------------+
| sensor    | temp               |
+-----------+--------------------+
| sensor_1  |                 31 |
| sensor_6  | 15.402984393403084 |
| sensor_7  |  6.720945201171228 |
| sensor_10 | 38.101067604893444 |
| sensor_5  |                 21 |
+-----------+--------------------+
5 rows in set (0.00 sec)


六、Flink 中的 Window

1、Window

1.1、window 概述

streaming 流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限
数据集是指一种不断增长的本质上无限的数据集,而 window 是一种切割无限数据 为有限块进行处理的手段。
Window 是无限数据流处理的核心,Window 将一个无限的 stream 拆分成有限大
小的”buckets”桶,我们可以在这些桶上做计算操作。

1.2、window 类型

Window 可以分成两类:

  •  CountWindow:按照指定的数据条数生成一个 Window,与时间无关。
  •  TimeWindow:按照时间生成 Window。

对于 TimeWindow,可以根据窗口实现原理的不同分成三类:滚动窗口(Tumbling

Window)、滑动窗口(Sliding Window)和会话窗口(Session Window)。

  1. 滚动窗口(Tumbling Windows)

将数据依据固定的窗口长度对数据进行切片。 特点:时间对齐,窗口长度固定,没有重叠。
滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一
个固定的大小,并且不会出现重叠。

例如:如果你指定了一个 5 分钟大小的滚动窗 口,窗口的创建如下图所示:

适用场景:适合做 BI 统计等(做每个时间段的聚合计算)。

2. 滑动窗口(Sliding Windows)

滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动
间隔组成。 特点:时间对齐,窗口长度固定,可以有重叠。
滑动窗口分配器将元素分配到固定长度的窗口中,与滚动窗口类似,窗口的大
小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。因此,
滑动窗口如果滑动参数小于窗口大小的话,窗口是可以重叠的,在这种情况下元素
会被分配到多个窗口中。

例如,你有 10 分钟的窗口和 5 分钟的滑动,那么每个窗口中 5 分钟的窗口里包

含着上个 10 分钟产生的数据,如下图所示:

适用场景:对最近一个时间段内的统计(求某接口最近 5min 的失败率来决定是

否要报警)。

3. 会话窗口(Session Windows)

由一系列事件组合一个指定时间长度的 timeout 间隙组成,类似于 web 应用的

session,也就是一段时间没有接收到新数据就会生成新的窗口。

特点:时间无对齐。
session 窗口分配器通过 session 活动来对元素进行分组,session 窗口跟滚动窗
口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况,相反,当它
在一个固定的时间周期内不再收到元素,即非活动间隔产生,那个这个窗口就会关
闭。一个 session 窗口通过一个 session 间隔来配置,这个 session 间隔定义了非活跃
周期的长度,当这个非活跃周期产生,那么当前的 session 将关闭并且后续的元素将
被分配到新的 session 窗口中去。

2、 Window API

2.1、TimeWindow

TimeWindow 是将指定时间范围内的所有数据组成一个 window,一次对一个

window 里面的所有数据进行计算。

(1)滚动窗口

Flink 默认的时间窗口根据 Processing Time 进行窗口的划分,将 Flink 获取到的

数据根据进入 Flink 的时间划分到不同的窗口中。

val minTempPerWindow = dataStream.map(r => (r.id, r.temperature)).keyBy(_._1).timeWindow(Time.seconds(15)).reduce((r1, r2) => (r1._1, r1._2.min(r2._2)))

时间间隔可以通过 Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其

中的一个来指定。

(2)滑动窗口(SlidingEventTimeWindows)

滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参
数,一个是 window_size,一个是 sliding_size。

下面代码中的 sliding_size 设置为了 5s,也就是说,窗口每 5s 就计算一次,每

一次计算的 window 范围是 15s 内的所有元素。

val minTempPerWindow: DataStream[(String, Double)] = dataStream.map(r => (r.id, r.temperature)).keyBy(_._1).timeWindow(Time.seconds(15), Time.seconds(5)).reduce((r1, r2) => (r1._1, r1._2.min(r2._2)))
.window(EventTimeSessionWindows.withGap(Time.minutes(10))

时间间隔可以通过 Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其

中的一个来指定。

2.2、CountWindow

CountWindow 根据窗口中相同 key 元素的数量来触发执行,执行时只计算元素
数量达到窗口大小的 key 对应的结果。
注意:CountWindow 的 window_size 指的是相同 Key 的元素的个数,不是输入
的所有元素的总数。

  • 滚动窗口

默认的 CountWindow 是一个滚动窗口,只需要指定窗口大小即可,当元素数量
达到窗口大小时,就会触发窗口的执行。

val minTempPerWindow: DataStream[(String, Double)] = dataStream.map(r => (r.id, r.temperature)).keyBy(_._1).countWindow(5).reduce((r1, r2) => (r1._1, r1._2.max(r2._2)))

  • 滑动窗口

滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参
数,一个是 window_size,一个是 sliding_size。
下面代码中的 sliding_size 设置为了 2,也就是说,每收到两个相同 key 的数据
就计算一次,每一次计算的 window 范围是 5 个元素。

val keyedStream: KeyedStream[(String, Int), Tuple] = dataStream.map(r =>
(r.id, r.temperature)).keyBy(0)
//每当某一个 key 的个数达到 2 的时候,触发计算,计算最近该 key 最近 10 个元素的内容
val windowedStream: WindowedStream[(String, Int), Tuple, GlobalWindow] =
keyedStream.countWindow(10,2)
val sumDstream: DataStream[(String, Int)] = windowedStream.sum(1)

2.3、 window function

window function 定义了要对窗口中收集的数据做的计算操作,主要可以分为两

类:

  • 增量聚合函数(incremental aggregation functions)

每条数据到来就进行计算,保持一个简单的状态。典型的增量聚合函数有 ReduceFunction, AggregateFunction。

  • 全窗口函数(full window functions)

先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。

ProcessWindowFunction 就是一个全窗口函数。

2.4、其它可选 API

  • .trigger() —— 触发器

定义 window 什么时候关闭,触发计算并输出结果

  • .evitor() —— 移除器

定义移除某些数据的逻辑

  • .allowedLateness() —— 允许处理迟到的数据
  •  .sideOutputLateData() —— 将迟到的数据放入侧输出流
  •  .getSideOutput() —— 获取侧输出流

中读取数据_Flink入门实战 (中)相关推荐

  1. java 从excel中读取数据_在Java中读取Excel文件的内容和导出数据到Excel文件中

    转自www.chianjavaworld.net 原作者:SonyMusic 读:rrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrr 在Java ...

  2. html 值追加,从JSON中读取数据追加到HTML中

    在写内容逻辑重复性的页面时,用json数据可以显著提高编程效率,并且便于后期的数据维护.因此,在视频专题页面,需要展示多列视频数据,我选择了用json. HTML如下(只展示重点部分,需要引用JQ) ...

  3. 猜数游戏(从文件中读取数据)

    猜数游戏:有三次猜数字机会,如果用完三次机会则需要去网站充值.如果没有用完三次机会则可以继续玩.玩游戏的次数存储在文件中. 思路:(待补充) import java.util.Random; impo ...

  4. 《python 数据可视化编程实战》-从excel表中读取数据

    从excel表中读取数据 #!/usr/nim/env python # _*_ coding:utf-8 _*_ import xlrd from numpy import *file1 = 'D: ...

  5. c 语言如何处理表格文件中的数据库,C#程序从Excel表格中读取数据并进行处理

    今天做了一个Excel表格数据处理的事情,因为数据量表较大(接近7000条)所以处理起来有点麻烦,于是写了一个程序, 先将程序记下以便将来查找. using System; using System. ...

  6. HBase建表高级属性,hbase应用案例看行键设计,HBase和mapreduce结合,从Hbase中读取数据、分析,写入hdfs,从hdfs中读取数据写入Hbase,协处理器和二级索引

    1. Hbase高级应用 1.1建表高级属性 下面几个shell 命令在hbase操作中可以起到很到的作用,且主要体现在建表的过程中,看下面几个create 属性 1. BLOOMFILTER 默认是 ...

  7. POI:从Excel文件中读取数据,向Excel文件中写入数据,将Excel表格中的数据插入数据库,将数据库中的数据添加到Excel表

    POI 简介: POI是Apache软件基金会用Java编写的免费开源的跨平台的 Java API,Apache POI提供API给Java程序对Microsoft Office格式档案读和写的功能. ...

  8. VC++中从txt文本中读取数据并且存到二维数组中

    这几天因为在做作业,所以不仅会想到这一类有关的问题.现在我需要实现的是讲txt文件的数据读取出来并且存储到一个二维数组中.,首先为了对待什么样的矩阵我们都可以读取,我们就要设置一个动态的矩阵,这样当我 ...

  9. vc++从txt文件中读取数据

    数值分析课上老师说要将数据写在txt文件上,然后让程序从txt文件中读取数据.让本来C++已经遗忘了很久的我们无从下手,在网上也查看了很多,发现大多都是扯淡,放在VC++编辑器上发现并不能运行,不知道 ...

最新文章

  1. Android中的资源访问
  2. MFC关于Radio按钮分组与选择的操作
  3. python常用英文单词怎么写,Python常用英文单词
  4. AttributeError: module 'select' has no attribute 'error'解决方法
  5. 文件修改如何简单修改Android的so文件
  6. 遍历聚合对象中的元素——迭代器模式
  7. layui绑定json_认识定制:JSON绑定概述系列
  8. 创建oracle 数据库表空间,角色,用户的sql语句
  9. 联络员(信息学奥赛一本通-T1393)
  10. 万万没想到,JVM内存区域的面试题也可以问的这么难?
  11. SQL Server 实现递归获取层级数据
  12. QByteArray与char、int、float(及其数组)之间的互相转化
  13. 官方实锤!微软宣布以 75 亿美元收购 GitHub
  14. 《java程序员全攻略:从小工到专家》连载一:外行人眼中的IT人
  15. ADSL路由切换IP
  16. @scheduled注解配置时间_《SpringBoot整合redis、Scheduled/quartz定时任务》
  17. 参数整定类毕业论文文献有哪些?
  18. matlab编写禁忌搜索算法,禁忌搜索算法matlab
  19. 本地上传文件到服务器
  20. windows必备软件系列

热门文章

  1. 首款搭载鸿蒙OS的智能手机,首款搭载鸿蒙OS 华为智选智能摄像头Pro零点全网开售:299元...
  2. onnx 测试_pytorch onnx onnxruntime tensorrt踩坑 各种问题
  3. Java 8 为什么要使用Lambda表达式
  4. 带通滤波中零相位和最小相位_相位器在Perl 6中的工作方式
  5. jmeter 脚本 排除_对Buildah脚本进行故障排除
  6. 运放组成的吉他放大电路_如何通过5个步骤构建开放式硬件吉他放大器
  7. drupal 迁移_关于如何迁移到Drupal的4个技巧
  8. openstack 云_OpenStack作为云,CoreOS等的未来
  9. jquery scrollTop及其应用例子
  10. Bootstrap 按钮菜单的尺寸