摘自Flink官网https://flink.apache.org/

最近看到公司有Flink平台,正好做过storm和spark streaming上的业务,借着这个机会把flink也学了。正好比较下他们之间的优缺点。

一、流式处理平台

1.Storm

Topology为处理拓扑图

组成:

(1)Spout. 数据分发中心。

(2)Bolt. 数据处理中心

数据单元为Tuple。在Bolt处理完的数据可以发射给下一个Bolt。此时接收到的为Tuple。

缺点:

(1)消息传输保证为At least once. 但是可能出现重复发消息的情况。对每一条数据都做ack,所以容错的开销很大。

(2)延迟比flink大。

(3)吞吐量不如flink

(4)不支持批处理

2.Spark Streaming

(1)比较主流的实时计算引擎。但是是居于micro batch处理,并不是纯正的流式处理。

(2)支持处理时间,Structured streaming 支持处理时间和事件时间,同时支持 watermark 机制处理滞后数据。

(3)与Hadoop家族组件交互良好,例如Hbase等。

(4)容错机制,checkpoint。

(5)Spark Streaming 跟 kafka 结合是存在背压机制的,目标是根据当前 job 的处理情况来调节后续批次的获取 kafka 消息的条数.

(6)数据单元是RDD,新增了Dstream.直接度kafka获得。

(7)处理过程大致是transformation和action。

3.Flink

(1)数据形式DataStream(Streaming),DataSet(Batch)。

(2)处理过程是Source,Transformation 和 sink。

(3)时间。创建时间EventTime, 进入Flink DataFlow的时间。IngestionTime,对事件进行处理的本地系统时间Processing Time。

(4)窗口。按分割标准划分:timeWindow、countWindow。按窗口行为划分:Tumbling Window、Sliding Window、自定义窗口。

(5)轻量级容错机制。保证Execatly once执行。使用stream replay 和 checkpointing容错。

二、各个组件的介绍

1.JobManager用来分配任务,也就是常说的master

2.TaskManager用来分发task,缓存和交换数据流

3.Slot,把TaskManager根据task把内存抽象很多个slot,用来执行task。

三、Mac系统下安装Flink

Mac下很方便,mac装东西确实是方便。------brew install apache-flink

四、启动

1.启动本地集群环境,很快就能启动起来。在/usr/local/Cellar/apache-flink/1.7.0/libexec目录下。

./bin/start-cluster.sh 

2.然后在 http://localhost:8081/#/overview 就可以看见Flink的监控平台。

可以看到Task Managers是1个。Slots也是一个。

下面还有好几个选项,可以看到你的集群配置环境。

五、Example

WordCount

(1)Code分析

 1 package flinkjob;
 2
 3 import org.apache.flink.api.common.functions.FlatMapFunction;
 4 import org.apache.flink.api.common.functions.ReduceFunction;
 5 import org.apache.flink.api.java.utils.ParameterTool;
 6 import org.apache.flink.streaming.api.datastream.DataStream;
 7 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 8 import org.apache.flink.streaming.api.windowing.time.Time;
 9 import org.apache.flink.util.Collector;
10
11 /**
12  * Created by adrian.wu on 2018/12/17.
13  */
14 public class SocketWindowWordCount {
15     public static class WordWithCount {
16
17         public String word;
18         public long count;
19
20         public WordWithCount() {}
21
22         public WordWithCount(String word, long count) {
23             this.word = word;
24             this.count = count;
25         }
26
27         @Override
28         public String toString() {
29             return word + " : " + count;
30         }
31     }
32
33
34     public static void main(String[] args) throws Exception{
35         final int port;
36         try {
37             //得到提交时候的参数
38             final ParameterTool params = ParameterTool.fromArgs(args);
39             //得到端口号,因为这个例子是监听9000端口的例子
40             port = params.getInt("port");
41         } catch (Exception e) {
42             System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'");
43             return;
44         }
45         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
46
47         //数据单元 DataStream
48         DataStream<String> text = env.socketTextStream("localhost", port, "\n");
49         DataStream<WordWithCount> windowCounts = text
50                 .flatMap(new FlatMapFunction<String, WordWithCount>() {   //map
51                     @Override
52                     public void flatMap(String value, Collector<WordWithCount> out) {
53                         for (String word : value.split("\\s")) {
54                             out.collect(new WordWithCount(word, 1L));
55                         }
56                     }
57                 })
58                 .keyBy("word")
59                 .timeWindow(Time.seconds(5), Time.seconds(1))  //Window function, 5秒一个window,间隔1
60                 .reduce(new ReduceFunction<WordWithCount>() {
61                     @Override
62                     public WordWithCount reduce(WordWithCount a, WordWithCount b) {  //reduce
63                         return new WordWithCount(a.word, a.count + b.count);
64                     }
65                 });
66         windowCounts.print().setParallelism(1);
67
68         env.execute("Socket Window WordCount");
69
70     }
71 }

(2)打包提交代码

 ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000 #提交job

 nc -l 9000 #监听端口

tail -f log/flink-*-taskexecutor-*.out #查看log

(3)在监控平台可以看到你的job情况

转载于:https://www.cnblogs.com/ylxn/p/10038546.html

Flink 学习(一)相关推荐

  1. 年度总结 | 2020 Flink 学习路线总结

    2020年,最后几天了,不管这一年过的怎么样,也都过来了,来年还是得继续努力呀.大数据学习指南给大家整理了一份年度总结系列文章,今天分享的是 Flink 学习路线. 以下资料来源都有标注,基本都属于一 ...

  2. 2020 年 Flink 学习资料整合,建议收藏

    精选30+云产品,助力企业轻松上云!>>> 点击蓝色"大数据每日哔哔"关注我 加个"星标",第一时间获取大数据架构,实战经验 以下资料来源都有 ...

  3. Flink学习-DataStream-HDFSConnector(StreamingFileSink)

    Flink学习-DataStream-HDFSConnector(StreamingFileSink) Flink系列文章 更多Flink系列文章请点击Flink系列文章 更多大数据文章请点击大数据好 ...

  4. 全网第一 | Flink学习面试灵魂40问答案,文末有福利!

    大数据技术与架构 点击右侧关注,大数据开发领域最强公众号! 暴走大数据 点击右侧关注,暴走大数据! 来源:王知无 作者:王知无 By 暴走大数据 场景描述:这是一份Flink学习面试指北.看看你搞清楚 ...

  5. Flink学习4-流式SQL

    Flink学习4-流式SQL Flink系列文章 更多Flink系列文章请点击Flink系列文章 更多大数据文章请点击大数据好文推荐 摘要 介绍Flink Table Sql API相关概念,还会提供 ...

  6. Flink学习1-基础概念

    Flink学习1-基础概念 Flink系列文章 更多Flink系列文章请点击Flink系列文章 更多大数据文章请点击大数据好文推荐 摘要 本文是作者学习Flink的一些文档整理.记录和心得体会,希望与 ...

  7. Flink学习笔记:Operators之CoGroup及Join操作

    本文为<Flink大数据项目实战>学习笔记,想通过视频系统学习Flink这个最火爆的大数据计算框架的同学,推荐学习课程: Flink大数据项目实战:http://t.cn/EJtKhaz ...

  8. Flink学习1——运行时架构(standalone模式)

    本篇主要讲述Flink Standalone模式下的运行时架构以及各个组件负责的功能,Flink的运行方式有很多,但都大同小异,本文基本可以满足对flink运行时架构的学习. 正文 Flink系统是主 ...

  9. Flink学习笔记【巨详细!】(二)

    第 3 章 3.1.1 环境配置 Flink 是一个分布式的流处理框架,所以实际应用一般都需要搭建集群环境.我们在进行Flink 安装部署的学习时,需要准备 3 台 Linux 机器.具体要求如下: ...

  10. 关于Flink学习选哪家好 硅谷or黑马?

    尚硅谷有两个 java版(录制) scala版(录制) 黑马的大致有两个系列 flink1.12(课堂java版)–>这个老师讲过spark(scala版)推荐 flink入门到精通(录制) 怎 ...

最新文章

  1. 云服务器上mysql数据库环境安装配置
  2. oracle服务器找不到怎么解决,简析Oracle数据库常见问题及解决方案
  3. .NET Core Runtime vs .NET Framework Runtime
  4. werkzeug.local
  5. 【博客美化】09.评论带头像,且支持旋转
  6. linux向用户发送消息,Linux终端中向记录的用户发送消息
  7. 《AI算法工程师手册》读书笔记(1)
  8. 最全的Vista破解激活工具——成功激活并通过正版验证
  9. 计算机网络操作系统的主要功能和类型,操作系统的五大管理功能和四大分类
  10. SNAKER 工作流
  11. Python批量处理表格有用吗_python批量读入图片、处理并批量输出(可用于深度学习训练集的制作)...
  12. 电脑硬盘中毒了怎么办?u盘中毒数据丢失怎么恢复
  13. Jenkins——Jenkins介绍+基于云平台的Jenkins安装和持续集成环境配置(插件+用户权限+凭据+Maven打包)
  14. Java设计模式(03) -- 里氏替换原则
  15. 使用synergy/barrier服务端和客户端连接不上的问题
  16. 盛世昊通董车长2.0再上新,做任务吸粉看视频得收益
  17. Linux网络编程基础API
  18. Exchange Server 2007+0ffice Communication Server 2007构建统一消息平台
  19. go-micro使用Consul做服务发现的方法和原理
  20. JSONObject对象常用方法讲解--fromObject和toBean

热门文章

  1. Python实现计数排序
  2. Python列表排序 list.sort方法和内置函数sorted
  3. django项目的邮件发送及redis数据库应用
  4. 【opencv学习】【形态学】【腐蚀与膨胀】【开运算与闭运算】【礼帽和黑帽】
  5. win7找回开机密码_电脑密码忘记了?教你四步轻松找回电脑开机密码
  6. 深度学习笔记--多层感知器以及BP算法
  7. MySQL统计函数GROUP_CONCAT使用及报错分析
  8. Unity 4 3 制作一个2D横版射击游戏
  9. django的视图与模板
  10. java获取cpu核数_vn.py社区精选12 - 策略参数优化,你需要懂得压榨CPU!