Flink介绍

  • 介绍
    • 原理
    • 简单使用
    • 初步编程

介绍

1 什么是Flink

Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。
Flink起源于Stratosphere项目, 2014年4月Stratosphere的代码被复制并捐赠给了Apache软件基金会, 2014年12月,Flink一跃成为Apache软件基金会的顶级项目。
2 Flink 对比 Spark
Spark是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。 Spark掀开了内存计算的先河,但是在其火热的同时,开发人员发现,在Spark中,计算框架普遍存在的缺点和不足依然没有完全解决,而这些问题随着5G时代的来临以及决策者对实时数据分析结果的迫切需要而凸显的更加明显:
数据精准一次性处理(Exactly-Once)
乱序数据,迟到数据
低延迟,高吞吐,准确性
容错性
Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。在Spark火热的同时,也默默地发展自己,并尝试着解决其他计算框架的问题。
慢慢地,随着这些问题的解决,Flink慢慢被绝大数程序员所熟知并进行大力推广,阿里公司在2015年改进Flink,并创建了内部分支Blink,目前服务于阿里集团内部搜索、推荐、广告和蚂蚁等大量核心实时业务。
3 Flink,Spark
Spark 和 Flink 一开始都拥有着同一个梦想,他们都希望能够用同一个技术把流处理和批处理统一起来,但他们走了完全不一样的两条路前者是以批处理的技术为根本,并尝试在批处理之上支持流计算;后者则认为流计算技术是最基本的,在流计算的基础之上支持批处理。正因为这种架构上的不同,今后二者在能做的事情上会有一些细微的区别。比如在低延迟场景,Spark 基于微批处理的方式需要同步会有额外开销,因此无法在延迟上做到极致。在大数据处理的低延迟场景,Flink 已经有非常大的优势。
Spark和Flink的主要差别就在于计算模型不同。Spark采用了微批处理模型,而Flink采用了基于操作符的连续流模型。因此,对Apache Spark和Apache Flink的选择实际上变成了计算模型的选择,而这种选择需要在延迟、吞吐量和可靠性等多个方面进行权衡。
批处理的特点是有界、持久、大量,非常适合需要访问全套记录才能完成的计算工作,一般用于需要长时间运行的离线统计。在Spark的世界观中,一切都是由批次组成的,离线数据是一个大批次,而实时数据是由一个一个无限的小批次组成的。
流处理的特点是无界、实时, 无需针对整个数据集执行操作,而是对通过系统传输的每个数据项执行操作,一般用于延迟小的实时统计。在Flink的世界观中,一切都是由流组成的,离线数据是有界限的流,实时数据是一个没有界限的流,
无界数据流:无界数据流有开始但是没有结束,必须在获取流数据后立即处理。对于无界数据流我们无法等待所有数据都到达,因为输入是无界的,并且在任何时间点都不会完成。处理无界数据通常要求以特定顺序(例如事件发生的顺序)获取数据,以便能够推断结果完整性。
有界数据流:有界数据流有明确定义的开始和结束,可以在执行任何计算之前通过获取指定范围内所有数据来处理有界流,处理有界流不需要有序获取,因为可以对在指定范围内的有界数据集进行排序后再处理,有界流的处理也称为批处理。

如果企业中非要技术选型从Spark和Flink这两个主流框架中选择一个来进行流数据处理,我们推荐使用Flink,主(显而)要(易见)的原因为:
Flink灵活的窗口
Exactly Once语义保证

原理


上图是从一个较为高层级的视角,来看应用中各组件的交互协作。如果部署的集群环境不同(例如YARN,Mesos,Kubernetes,Standalone等),其中一些步骤可以被省略,或是有些组件会运行在同一个JVM进程中。
具体地,如果我们将Flink集群部署到YARN上,那么就会有如下的提交流程:

1)Flink任务提交后,Client向HDFS上传Flink的Jar包和配置
2)向Yarn ResourceManager提交任务,ResourceManager分配Container资源
3)通知对应的NodeManager启动ApplicationMaster,ApplicationMaster启动后加载Flink的Jar包和配置构建环境,然后启动JobManager
4)ApplicationMaster向ResourceManager申请资源启动TaskManager
5)ResourceManager分配Container资源后,由ApplicationMaster通知资源所在节点的NodeManager启动TaskManager
6)NodeManager加载Flink的Jar包和配置构建环境并启动TaskManager
TaskManager启动后向JobManager发送心跳包,并等待JobManager向其分配任务

简单使用

Standalone模式
1 这里我们来看看只使用Flink自身节点运行的集群模式,也就是我们所谓的独立部署(Standalone)模式
2 解压缩文件
将下载好的flink-1.10.0-bin-scala_2.11.tgz文件上传到Linux并解压缩,放置在指定位置,路径中不要包含中文或空格
tar -zxvf flink-1.10.0-bin-scala_2.11.tgz -C /opt/module
mv flink-1.10.0 flink
3修改配置文件
1)修改 conf/flink-conf.yaml 文件
jobmanager.rpc.address: hadoop12
2)修改 conf/slaves文件
Hadoop13
Hadoop14
3)分发Flink到其他节点
xsync flink

4 启动Standalone环境
1)执行启动命令
bin/start-cluster.sh

2)访问Web UI对Flink集群和任务进行监控管理
http://hadoop12:8081

5 命令行提交应用程序
1)启动netcat
nc -lk 1111
2)执行脚本命令
bin/flink run -c com.xian.WordC_unBoundedStream /opt/module/data/input/wordcount.jar
6 UI界面提交应用程序
1)准备数据文件
在flink目录下,创建input文件夹并增加word.txt文件,并增加内容
Hi Spark
Hi Flink
2)将数据文件分发到不同的节点
xsync input
3)使用maven工具将之前的WordCount程序打包,然后在UI页面中提交

执行时,如果出现文件找不到的异常,可以在读取的文件路径前增加系统参数
System.getProperty(“user.dir”)
7 HA高可用
任何时候都有一个 主 JobManager 和多个备用 JobManagers,以便在主节点失败时有备用 JobManagers 来接管集群。这保证了没有单点故障,一旦备 JobManager 接管集群,作业就可以正常运行。主备 JobManager 实例之间没有明显的区别。每个 JobManager 都可以充当主备节点。
1)修改conf/flink-conf.yaml配置文件
配置参数中冒号后面的参数值都需要增加空格
high-availability: zookeeper
high-availability.storageDir: hdfs://hadoop12:9000/flink/ha/
high-availability.zookeeper.quorum: hadoop12:2282,hadoop13:2282,hadoop14:2282
2)修改conf/master配置文件
Hadoop12:8081
Hadoop13:8081
3)修改zoo.cfg配置文件
clientPort=2282
server.28=hadoop12:2888:3888
server.29=hadoop13:2888:3888
server.30=hadoop14:2888:3888
4)分发配置文件
xsync conf
5)启动HDFS集群
引入hadoop关联jar包:flink-shaded-hadoop-2-uber-2.7.5-7.0.jar
分发Jar包
启动略
6)启动Flink Zookeeper集群
bin/start-zookeeper-quorum.sh
7)启动Flink HA集群
bin/start-cluster.sh
8)启动成功后,分别访问地址
http://hadoop12:8081
http://hadoop13:8081

Yarn模式
独立部署(Standalone)模式由Flink自身提供计算资源,无需其他框架提供资源,这种方式降低了和其他第三方资源框架的耦合性,独立性非常强。但是你也要记住,Flink主要是计算框架,而不是资源调度框架,所以本身提供的资源调度并不是它的强项,所以还是和其他专业的资源调度框架集成更靠谱,所以接下来我们来学习在强大的Yarn环境中Flink是如何使用的。(其实是因为在国内工作中,Yarn使用的非常多)
以Yarn模式部署Flink任务时,要求Flink是有Hadoop支持的版本,Hadoop环境需要保证版本在2.2以上,并且集群中安装有HDFS服务。
1 解压缩文件
将flink-1.10.0-bin-scala_2.11.tgz文件上传到Linux并解压缩,放置在指定位置,路径中不要包含中文或空格
tar -zxvf flink-1.10.0-bin-scala_2.11.tgz -C /opt/module
mv flink-1.10.0 flink
2 启动Yarn环境
瞅啥呢,自己启动去!
3 命令行提交应用程序
bin/flink run -m yarn-cluster -c com.xian.WordC_unBoundedStream /opt/module/data/input/wordc.jar
提交时,会出现如下错误

错误的原因是Flink默认情况下类库中是不包含hadoop相关依赖的,所以提交时会发生错误,解决方案非常简单,引入hadoop相关依赖jar包即可:flink-shaded-hadoop-2-uber-2.7.5-7.0.jar
上传后,重新执行上面的指令即可。运行过程可以通过Yarn的应用服务页面查看

应用程序运行结束后,Yarn中的应用程序也自动结束

4 两种集群方式

1)Per-Job-Cluster
在上面的应用程序提交时,一个Job会对应一个yarn-session集群,每提交一个作业会根据自身的情况,都会单独向yarn申请资源,直到作业执行完成,一个作业的失败与否并不会影响下一个作业的正常提交和运行。独享Dispatcher和ResourceManager,按需接受资源申请;适合规模大长时间运行的作业

这种方式每次提交都会创建一个新的flink集群,任务之间互相独立,互不影响,方便管理。任务执行完成之后创建的集群也会消失。
2)Session-Cluster
在规模小执行时间短的作业执行时,频繁的申请资源并不是一个好的选择,所以Flink还提供了一种可以事先申请一定资源,然后在这个资源中并行执行多个作业的集群方式。

在yarn中初始化一个flink集群,开辟指定的资源,以后提交任务都向这里提交。这个flink集群会常驻在yarn集群中,除非手工停止。
Session-Cluster集群模式和Per-Job-Cluster不一样的是需要事先创建Yarn应用后再提交Flink应用程序。
Yarn会按需动态分配TaskManager和slot,其实-n -s参数已经失效。
bin/yarn-session.sh -d -n 2 -s 2 -jm 1024 -tm 1024 -nm test
-n(–container) TaskManager的数量
-s(–slots) 每个TaskManager的slot数量,默认一个slot一个core,默认每个taskmanager的slot的个数为1,有时可以多一些taskmanager,做冗余
-jm JobManager的内存(单位MB)
-tm 每个Taskmanager的内存(单位MB)
-nm yarn 的appName(现在yarn的ui上的名字)
-d 后台执行,需要放在前面,否则不生效

初步编程


1 Flink框架可以从不同的来源获取数据,将数据提交给框架进行处理, 我们将获取数据的来源称之为数据源。
准备一个WaterSensor类方便演示。
// 定义实体类:水位传感器:用于接收空高数据

public class WaterSensor {// id:传感器编号private String id;// ts:时间戳private Long ts;// vc:空高private Double vc;public Watersensor() {}public Watersensor(String id, Long ts, Double vc) {this.id = id;this.ts = ts;this.vc = vc;}public String getId() {return id;}public void setId(String id) {this.id = id;}public Long getTs() {return ts;}public void setTs(Long ts) {this.ts = ts;}public Integer getVc() {return vc;}public void setVc(double vc) {this.vc = vc;}@Overridepublic String toString() {return "WaterSensor{" +"id='" + id + '\'' +", ts=" + ts +", vc=" + vc +'}';}
}

2 从集合读取数据
一般情况下,可以将数据临时存储到内存中,形成特殊的数据结构后,作为数据源使用。这里的数据结构采用集合类型是比较普遍的。

public class Source_Collection {public static void main(String[] args) throws Exception {// 0.创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 1.从集合中读取数据DataStreamSource<WaterSensor> collectionDS = env.fromCollection(Arrays.asList(new WaterSensor("ws_1001", 1577844001L, 45),new WaterSensor("ws_1002", 1577844015L, 43),new WaterSensor("ws_1003", 1577844020L, 42)));// 2.打印collectionDS.print();// 3.执行env.execute();}
}

3 从文件读取数据
通常情况下,我们会从存储介质中获取数据,比较常见的就是将日志文件作为数据源

public class Source_File {public static void main(String[] args) throws Exception {// 0.创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 1.从集合中读取数据DataStreamSource<String> fileDS = env.readTextFile("input/sensor-data.log");// 2.打印fileDS.print();// 3.执行env.execute();}
}
在读取文件时,文件路径可以是目录也可以是单一文件。如果采用相对文件路径,会从当前系统参数user.dir中获取路径
System.getProperty("user.dir")
如果在IDEA中执行代码,那么系统参数user.dir自动指向项目根目录,如果是standalone集群环境, 默认为集群节点根目录,当然除了相对路径以外,也可以将路径设置为分布式文件系统路径,如HDFS
val fileDS: DataStream[String] =env.readTextFile( "hdfs://hadoop12:9000/test/1.txt")

4 从Kafka读取数据
Kafka作为消息传输队列,是一个分布式的,高吞吐量,易于扩展地基于主题发布/订阅的消息系统。在现今企业级开发中,Kafka 和 Flink 成为构建一个实时的数据处理系统的首选
1)引入kafka连接器的依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId><version>${flink.version}</version>
</dependency>2)代码实现参考// 0.创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 1.从kafka中读取Properties properties = new Properties();properties.setProperty("bootstrap.servers", "hadoop12:9092");properties.setProperty("group.id", "consumer-group");properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("auto.offset.reset", "latest");DataStreamSource<String> kafkaDS = env.addSource(new FlinkKafkaConsumer011<String>("sensor",new SimpleStringSchema(),properties));kafkaDS.print("kafka source");env.execute();

4 自定义数据源
大多数情况下,前面的数据源已经能够满足需要,但是难免会存在特殊情况的场合,所以flink也提供了能自定义数据源的方式

1)创建自定义数据源

public static class MySource implements SourceFunction<WaterSensor> {boolean flag = true;@Overridepublic void run(SourceContext<WaterSensor> ctx) throws Exception {Random random = new Random();while (flag) {ctx.collect(new WaterSensor("sensor_" + (random.nextInt(3) + 1),System.currentTimeMillis(),random.nextInt(10) + 40));Thread.sleep(1000L);}}@Overridepublic void cancel() {flag = false;}
}
2)代码实现参考// 0.创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 1.从kafka中读取DataStreamSource<WaterSensor> inputDS = env.addSource(new MySource());inputDS.print();env.execute();

5 Transform

在Spark中,算子分为转换算子和行动算子,转换算子的作用可以通过算子方法的调用将一个RDD转换另外一个RDD,Flink中也存在同样的操作,可以将一个数据流转换为其他的数据流,转换过程中,数据流的类型也会发生变化,那么到底Flink支持什么样的数据类型呢,其实我们常用的数据类型,Flink都是支持的。比如:Long, String, Integer, Int, 元组,样例类,List, Map等。
5.1map
映射:将数据流中的数据进行转换, 形成新的数据流,消费一个元素并产出一个元素
参数:lambda表达式或MapFunction实现类
返回:DataStream

SingleOutputStreamOperator<WaterSensor> sensorDS = inputDS.map((MapFunction<String, WaterSensor>) value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));});

上面算子方法的使用和Spark学习使用到的算子区别不大,学习起来比较容易,但是Flink在使用各种不同算子的同时,为了能更细粒度的控制数据和操作数据,给开发者提供了对现有函数功能进行扩展的能力,这就是函数类(FunctionClasses)。也可以简单地理解为UDF函数(用户自定义函数)
Flink每一个算子的参数都可以使用lambda表达式和函数类两种的方式,其中如果使用函数类作为参数的话,需要让自定义函数继承指定的父类或实现特定的接口。例如:MapFunction
// 自定义映射转换函数
// 1.实现MapFunction接口,并定义泛型(输入,输出)
// 2.实现方法

public static class MyMapFunction implements MapFunction<String,WaterSensor>{@Overridepublic WaterSensor map(String value) throws Exception {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));}
}

所有Flink函数类都有其Rich版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。也有意味着提供了更多的,更丰富的功能。例如:RichMapFunction

public static class MyRichMapFunction extends RichMapFunction<String, WaterSensor> {@Overridepublic WaterSensor map(String value) throws Exception {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));}@Overridepublic void open(Configuration parameters) throws Exception {System.out.println("open....");}@Overridepublic void close() throws Exception {System.out.println("close....");}
}

Rich Function有一个生命周期的概念。典型的生命周期方法有:
open()方法是rich function的初始化方法,当一个算子例如map或者filter被调 用之前open()会被调用
close()方法是生命周期中的最后一个调用的方法,做一些清理工作
getRuntimeContext()方法提供了函数的RuntimeContext的一些信息,例如函数执行 的并行度,任务的名字,以及state状态
5.2 flatMap
扁平映射:将数据流中的整体拆分成一个一个的个体使用,消费一个元素并产生零到多个元素
参数:lambda表达式或FlatMapFunction实现类
返回:DataStream

SingleOutputStreamOperator<Integer> resultDS = inputDS.flatMap(new FlatMapFunction<List<Integer>, Integer>() {@Overridepublic void flatMap(List<Integer> value, Collector<Integer> out) throws Exception {for (Integer element : value) {out.collect(element);}}}
);

5.3 filter
过滤:根据指定的规则将满足条件(true)的数据保留,不满足条件(false)的数据丢弃
参数:Scala匿名函数或FilterFunction
返回:DataStream

SingleOutputStreamOperator<Integer> filterDS = inputDS.filter(new FilterFunction<Integer>() {@Overridepublic boolean filter(Integer value) throws Exception {return value > 5;}}
);

5.4 keyBy
在Spark中有一个GroupBy的算子,用于根据指定的规则将数据进行分组,在flink中也有类似的功能,那就是keyBy,根据指定的key对数据进行分流
分流:根据指定的Key的hashcode将元素发送到不同的分区,相同的Key会被分到一个分区(这里分区指的就是下游算子多个并行节点的其中一个)。keyBy()是通过哈希来分区的

参数:Scala匿名函数或POJO属性或元组索引,不能使用数组
返回:KeyedStream

KeyedStream<Tuple2<String,Integer>, Tuple> streamKeyby = stream.keyBy(0);

5.5 select
将数据流进行 后,如何从流中将不同的标记取出呢,这时就需要使用select算子了。

      splitSS.select("normal").print("normal");splitSS.select("warn").print("warn");splitSS.select("alarm").print("alarm");

5.6 connect
在某些情况下,我们需要将两个不同来源的数据流进行连接,实现数据匹配,比如订单支付和第三方交易信息,这两个信息的数据就来自于不同数据源,连接后,将订单支付和第三方交易信息进行对账,此时,才能算真正的支付完成。
Flink中的connect算子可以连接两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。

DataStreamSource<Integer> numDS = env.fromCollection(Arrays.asList(1, 2, 3, 4));
ConnectedStreams<WaterSensor, Integer> resultCS = sensorDS.connect(numDS);

5.7 union
对两个或者两个以上的DataStream进行union操作,产生一个包含所有DataStream元素的新DataStream

DataStreamSource<Integer> numDS1 = env.fromCollection(Arrays.asList(1, 2, 3, 4));
DataStreamSource<Integer> numDS2 = env.fromCollection(Arrays.asList(5, 6, 7, 8));
DataStreamSource<Integer> numDS3 = env.fromCollection(Arrays.asList(9, 10));DataStream<Integer> unionDS = numDS1.union(numDS2).union(numDS3);

unionDS.print()
connect与 union 区别:
1)union之前两个流的类型必须是一样,connect可以不一样
2)connect只能操作两个流,union可以操作多个。

6 Operator
Flink作为计算框架,主要应用于数据计算处理上, 所以在keyBy对数据进行分流后,可以对数据进行相应的统计分析
6.1 滚动聚合算子(Rolling Aggregation)
这些算子可以针对KeyedStream的每一个支流做聚合。执行完成后,会将聚合的结果合成一个流返回,所以结果都是DataStream
sum()
min()
max()
6.2 reduce
一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。

 // 0.创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 1.读取数据SingleOutputStreamOperator<WaterSensor> sensorDS = ReadAsWaterSensor.getWaterSensorFromFile(env);// 2.转换成元组SingleOutputStreamOperator<Tuple2<String, Integer>> sensorTuple = sensorDS.map((MapFunction<WaterSensor, Tuple2<String, Integer>>) r -> new Tuple2<String, Integer>(r.getId(), r.getVc())).returns(new TypeHint<Tuple2<String, Integer>>() {});// 3.按照id分组KeyedStream<Tuple2<String, Integer>, String> sensorKS = sensorTuple.keyBy(r -> r.f0);// 4.聚合SingleOutputStreamOperator<Tuple2<String, Integer>> resultDS = sensorKS.reduce((v1, v2) -> new Tuple2<String, Integer>(v1.f0, v1.f1 + v2.f1));resultDS.print("reduce");env.execute();

6.3 process
Flink在数据流通过keyBy进行分流处理后,如果想要处理过程中获取环境相关信息,可以采用process算子自定义实现
1)继承KeyedProcessFunction抽象类,并定义泛型:[KEY, IN, OUT]

Public static class MyKeyedProcessFunction extends
KeyedProcessFunction<String, Tuple2<String, Integer>, String>
2)重写方法
@Override
public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {out.collect("key = " + ctx.getCurrentKey() + ",数据 = " + value);
}

7 Sink

在Flink中所谓的Sink其实可以表示为将数据存储起来的意思,也可以将范围扩大,表示将处理完的数据发送到指定的存储系统的输出操作
之前我们一直在使用的print方法其实就是一种Sink

public DataStreamSink<T> print() {PrintSinkFunction<T> printFunction = new PrintSinkFunction<>();return addSink(printFunction).name("Print to Std. Out");
}
//官方提供了一部分的框架的sink。除此以外,需要用户自定义实现sink

7.1 Kafka Sink
咱们可以将处理完的数据发送到Kafka消息队列中
1)增加依赖关系:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_2.11</artifactId><version>1.10.0</version>
</dependency>

2)主函数中添加sink:
val dataDS: DataStream[String] =
env.readTextFile(“input/data.txt”)

// 向Kafka中输出内容
dataDS.addSink(new FlinkKafkaProducer011[String](
“hadoop12:9092”,
“sensor”,
new SimpleStringSchema()))
3)通过kafka消费者控制台查看:
bin/kafka-console-consumer.sh --zookeeper hadoop12:2181 --topic sensor
7.2 Redis Sink
咱们可以将处理完的数据发送到Redis缓存数据库中

1)增加依赖关系:
<!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis -->
<dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version>
</dependency>
2)定义一个Redis的mapper类,用于定义保存到Redis时调用的命令:
// 向Redis中输出内容
val conf = new FlinkJedisPoolConfig
.Builder()
.setHost("hadoop12")
.setPort(6379)
.build()
dataDS.addSink(new RedisSink[String](conf, new RedisMapper[String] {override def getCommandDescription: RedisCommandDescription = {new RedisCommandDescription(RedisCommand.HSET,"sensor")}override def getKeyFromData(t: String): String = {t.split(",")(1)}override def getValueFromData(t: String): String = {t.split(",")(2)}
}))

3)访问redis客户端查看数据:
HGETALL sensor
7.3 ElasticSearch Sink
咱们可以将处理完的数据发送到ElasticSearch搜索服务器中
1)增加依赖关系:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch6_2.11</artifactId><version>1.10.0</version>
</dependency>
//2)在主函数中调用:
val httpHosts = new util.ArrayList[HttpHost]()
httpHosts.add(new HttpHost("hadoop12", 9200))val ds: DataStream[WaterSensor] = dataDS.map(s => {val datas = s.split(",")WaterSensor(datas(0), datas(1).toLong, datas(2).toDouble)}
)val esSinkBuilder = new ElasticsearchSink.Builder[WaterSensor]( httpHosts, new ElasticsearchSinkFunction[WaterSensor] {override def process(t: WaterSensor, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {println("saving data: " + t)val json = new util.HashMap[String, String]()json.put("data", t.toString)val indexRequest = Requests.indexRequest().index("sensor").`type`("readingData").source(json)requestIndexer.add(indexRequest)println("saved successfully")}
} )
// 启动ES时。请使用es用户
// 访问路径:http://hadoop12:9200/_cat/indices?v
// 访问路径:http://hadoop12:9200/sensor/_search
ds.addSink( esSinkBuilder.build() )

还有很多路要走,加油
每到过节,都很惨!自己渡劫

大数据之Flink的看了就可入门相关推荐

  1. Flink - 尚硅谷- 大数据高级 Flink 技术精讲 - 2

    七.Flink 时间语义与 Watermark 7.1 Flink 中的时间语义 7.2 设置 Event Time 7.3 水位线 - Watermark 7.3.1 基本概念 7.3.2 Wate ...

  2. 手把手教你搭建实时大数据引擎FLINK

    手把手教你搭建实时大数据引擎FLINK 服务器规划 Standalone高可用HA模式 架构图 下载并上传tar包 具体安装步骤 yarm 集群环境搭建 服务器规划 服务器规划 服务名称 职能 zhe ...

  3. 想成为云计算大数据Spark高手,看这里!

    Spark是发源于美国加州大学伯克利分校AMPLab的集群计算平台,它立足于内存计算,性能超过Hadoop百倍,从多迭代批量处理出发,兼收并蓄数据仓库.流处理和图计算等多种计算范式,是罕见的全能选手. ...

  4. 尚硅谷大数据技术Zookeeper教程-笔记01【Zookeeper(入门、本地安装、集群操作)】

    视频地址:[尚硅谷]大数据技术之Zookeeper 3.5.7版本教程_哔哩哔哩_bilibili 尚硅谷大数据技术Zookeeper教程-笔记01[Zookeeper(入门.本地安装.集群操作)] ...

  5. 深度解读!新一代大数据引擎Flink厉害在哪?(附实现原理细节)

    导语 | 大数据计算分为离线计算和实时计算,其中离线计算就是我们通常说的批计算,代表技术是Hadoop MapReduce.Hive等:实时计算也被称作流计算,代表技术是Storm.Spark Str ...

  6. 大数据之flink教程

    第一章 Flink简介 1.1  初识Flink Flink起源于Stratosphere项目,Stratosphere是在2010~2014年由3所地处柏林的大学和欧洲的一些其他的大学共同进行的研究 ...

  7. 【硬刚大数据】Flink在实时在实时计算平台和实时数仓中的企业级应用小结

    欢迎关注博客主页:https://blog.csdn.net/u013411339 欢迎点赞.收藏.留言 ,欢迎留言交流! 本文由[王知无]原创,首发于 CSDN博客! 本文首发CSDN论坛,未经过官 ...

  8. 「最有用」的特殊大数据:一文看懂文本信息系统的概念框架及功能

    导读:作为一种特殊的大数据,文本数据泛指各种以自然语言形式存在的数据. 目前,我们正处在一个以大数据与人工智能技术为核心的新的工业革命时代,其主要特征是大量各种可利用的数据可以视为一种特殊的生产资料, ...

  9. 大数据开发工程师必看书籍

    书籍是技术学习的源泉,也是很多程序员学习的重要工具.对于大数据的学习,除了要配合全新的大数据技术视频教程之外,我们还要看一下相关的书籍,更容易让我们深入了解学习大数据技术. 大数据学习相关书籍推荐阅读 ...

最新文章

  1. 华东理工大学计算机应用基础,最新华东理工大学计算机应用基础网上作业及全部答案...
  2. 基于java封装的语言_封装在java中的应用
  3. 路径包含空格_5分钟学会:矢量工具与路径-编辑路径
  4. 基础线性规划实现---python
  5. 为什么普通红包自己不能领_腾讯为推广新游王牌战士而豪撒千金?快去看看你能不能领红包...
  6. Flume+Kafka+Spark Streaming+MySQL实时日志分析
  7. 标号1-n的n个人首尾相接,1到3报数,报到3的退出,求最后一个人的标号
  8. Ubuntu16.04安装为知笔记(WizNote)
  9. 配置8086汇编环境
  10. 客户价值分析(聚类)
  11. 美国主要经济数据解注释
  12. 轻松升级各路硬件,简约时尚的大容量机箱,TT挑战者H6上手
  13. 回环检测之DBoW2
  14. BLE安全之SM剖析(3)
  15. 动图图解!既然IP层会分片,为什么TCP层也还要分段?
  16. 首个Nginx windows Stable 版--轻量级Web服务器Nginx 0.7.59
  17. 由浅入深学习Flash制作高射炮游戏(2)
  18. 微信里如何发微博的表情?
  19. frameset框架属性
  20. (4.6.30)组件化:Android项目构架演变之路

热门文章

  1. python 三维装箱可视化图代码
  2. 浅谈是否有永久名校office365教育版a1p桌面版套件的经验浅谈
  3. 计算机动漫价格,动画专业对电脑的价格和配置有什么要求吗
  4. vue 获取安卓原生方法_vue与原生app的对接交互的方法(混合开发)
  5. 《计算机操作系统》试题库——修订版
  6. KAFKA源码阅读——FetchRequestPurgatory, ProducerRequestPurgatory
  7. 代码质量随想录(四)排版,不只是为了漂亮
  8. 大数据漫谈2:大数据价值点在哪里
  9. c语言编写猜数字游戏
  10. 短视频的引爆点在哪?