前言

流式计算可能在日常不多见,主要统计一个阶段内的PV、UV,在风控场景很常见,比如统计某个用户一天内同地区下单总量来判断该用户是否为异常用户。还有一些大数据处理场景,如将某一段时间生成的日志按需要加工后倒入到存储DB中做查询报表。为什么要学习Flink,因为最近碰到一些实时计算性能问题,其次也不太理解实时计算底层实现原理,这里拿当下很流行的开源工具Flink作为待学习对象,一步一步深入Flink底层探索实时计算奥秘。

第一个程序

导maven依赖,主要依赖项如下:

<properties><blink.version>1.5.1</blink.version><scala.binary.version>2.11</scala.binary.version><blink-streaming.version>1.5.1</blink-streaming.version><log4j.version>1.2.17</log4j.version><slf4j-log4j.version>1.7.9</slf4j-log4j.version></properties><dependencyManagement><dependencies><dependency><groupId>com.alibaba.blink</groupId><artifactId>flink-core</artifactId><version>${blink.version}</version></dependency><dependency><groupId>com.alibaba.blink</groupId><artifactId>flink-clients_2.11</artifactId><version>${blink.version}</version></dependency><!-- blink stream java --><dependency><groupId>com.alibaba.blink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${blink-streaming.version}</version></dependency><dependency><groupId>com.alibaba.blink</groupId><artifactId>flink-test-utils-junit</artifactId><version>${blink.version}</version><scope>test</scope></dependency><!-- logging framework --><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>${log4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j-log4j.version}</version></dependency></dependencies></dependencyManagement>

这里引入比较干净,只包含flink相关核心包+日志包,接下来开始使用flink API完成第一个Hello World程序,这里我用的是flink官方WordCount Demo,代码如下:

package com.alibaba.security.blink;import com.alibaba.security.blink.util.WordCountData;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.util.Collector;public class WordCount {private final ParameterTool params;private final ExecutionEnvironment env;public WordCount(String[] args) {this.params = ParameterTool.fromArgs(args);this.env = ExecutionEnvironment.createLocalEnvironment();env.getConfig().setGlobalJobParameters(params);}public static void main(String[] args) throws Exception {WordCount wordCount = new WordCount(args);DataSet<String> dataSet = wordCount.getDataSetFromCommandLine();wordCount.executeFrom(dataSet);}private DataSet<String> getDataSetFromCommandLine() {DataSet<String> text;if (params.has("input")) {text = env.readTextFile(params.get("input"));} else {System.out.println("Executing WordCount example with default input data set.");System.out.println("Use --input to specify file input.");text = WordCountData.getDefaultTextLineDataSet(env);}return text;}private void executeFrom(DataSet<String> text) throws Exception {DataSet<Tuple2<String, Integer>> counts =text.flatMap(new Tokenizer()).groupBy(0).sum(1);if (this.params.has("output")) {counts.writeAsCsv(this.params.get("output"), "\n", " ");env.execute("WordCount Example");} else {System.out.println("Printing result to stdout. Use --output to specify output path.");counts.print();}}public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) {// normalize and split the lineString[] tokens = value.toLowerCase().split("\\W+");// emit the pairsfor (String token : tokens) {if (token.length() > 0) {out.collect(new Tuple2<String, Integer>(token, 1));}}}}}

WordCountData.java代码如下:

package com.alibaba.security.blink.util;import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;public class WordCountData {public static final String[] WORDS = new String[] {"To be, or not to be,--that is the question:--","Whether 'tis nobler in the mind to suffer","The slings and arrows of outrageous fortune","Or to take arms against a sea of troubles,","And by opposing end them?--To die,--to sleep,--","No more; and by a sleep to say we end","The heartache, and the thousand natural shocks","That flesh is heir to,--'tis a consummation","Devoutly to be wish'd. To die,--to sleep;--","To sleep! perchance to dream:--ay, there's the rub;","For in that sleep of death what dreams may come,","When we have shuffled off this mortal coil,","Must give us pause: there's the respect","That makes calamity of so long life;","For who would bear the whips and scorns of time,","The oppressor's wrong, the proud man's contumely,","The pangs of despis'd love, the law's delay,","The insolence of office, and the spurns","That patient merit of the unworthy takes,","When he himself might his quietus make","With a bare bodkin? who would these fardels bear,","To grunt and sweat under a weary life,","But that the dread of something after death,--","The undiscover'd country, from whose bourn","No traveller returns,--puzzles the will,","And makes us rather bear those ills we have","Than fly to others that we know not of?","Thus conscience does make cowards of us all;","And thus the native hue of resolution","Is sicklied o'er with the pale cast of thought;","And enterprises of great pith and moment,","With this regard, their currents turn awry,","And lose the name of action.--Soft you now!","The fair Ophelia!--Nymph, in thy orisons","Be all my sins remember'd."};public static DataSet<String> getDefaultTextLineDataSet(ExecutionEnvironment env) {return env.fromElements(WORDS);}
}

运行时如果加了命令行参数--input则从自定义输入文件中读取内容,否则从WordCountData中读取。

Flink代码启动通过org.apache.flink.api.java.ExecutionEnvironment#createLocalEnvironment()来完成,表示flink本地启动。flink只能处理DataSet,因此任何数据想要在flink里处理,都要被转换成DataSet,这里将文本转化为DataSet通过调org.apache.flink.api.java.ExecutionEnvironment#readTextFile方法。下面executeFrom方法就是flink核心处理流程了,先将一行行文本打散转化为Tuple2对象,Tuple2就是一个KV。然后对打散后的Tuple2集合进行groupBy,相同单词将被groupBy一起,最后将所有相同单词相加(sum),最终得到每个单词出现次数。

API调用

flatMap

flatMap在java里用的也不多,主要用的还是map,这里我用jdk1.8 Stream API写了一个flatMap demo

import org.apache.commons.lang3.tuple.Pair;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;class Scratch {public static void main(String[] args) {String str = "abc,debf";String[] strArray = str.split(",");// 使用flatMap返回多个元素(Stream)List<Object> result = Arrays.stream(strArray).flatMap(new Function<String, Stream<?>>() {@Overridepublic Stream<?> apply(String s) {return Stream.of(s.split("b"));}}).collect(Collectors.toList());System.out.println(result);// 使用map方式只能返回一个元素List<Pair<String, Integer>> tupleResultList = Arrays.stream(strArray).map(new Function<String, Pair<String, Integer>>() {@Overridepublic Pair<String, Integer> apply(String s) {return Pair.of(s, s.length());}}).collect(Collectors.toList());System.out.println(tupleResultList);}
}

先将字符串str拆分成一个数组,然后遍历数组对数据中每个字符串再进行切割,将切割后生成的字符串数组重新构建为一个Stream对象并返回。也就是说flatMap做了一个一变多的事,一个流变成多个流了:

将Stream1中每个元素都遍历一遍,然后将遍历的每个元素又转化成一个Stream对象,最终生成的就是一个Stream集合。

如果用map只能返回一个元素

flink中flatMap和map也是一样的道理,上面flink例子里用的是flatMap,将每行记录转化后的单词都保存到Collector里,后面该Collector可以作为输入执行groupBy操作。而如果是换成map该怎么写呢?代码如下


// 这里map每次方法调用只会返回一个Tuple2对象
AggregateOperator<Tuple2<String, Integer>> firstSumResult = text.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {String[] tokens = value.toLowerCase().split("\\W+");if (tokens.length > 0) {return new Tuple2<String, Integer>(tokens[0], 1);}return null;}}).groupBy(0).sum(1);
List<Tuple2<String, Integer>> result = firstSumResult.collect();
result.forEach(e -> LOGGER.info("word={},count={}", e.f0, e.f1));

可以看出MapFunction#map是有返回值的,且返回值为单元素,后面groupBy都是针对map后生成的集合来操作。

因此如何选择map与flatMap我个人认为:如果只是便利DataSet将一个对象转化成另一个对象可以使用map函数,如果是一个对象转化成多个对象,可以使用flatMap。

groupBy

groupBy也是DataSet提供的标准API之一,该方法有3个重载的方法,如下

groupBy(int... fields)

该方法只能对Tuple类型DataSet起作用,Tuple有哪些类呢?

使用该种groupBy方法举个例子:

/*** @author shaoxian.ssx* @date 2021/11/7*/
public class SimpleGroupBy {private final static Logger LOGGER = LoggerFactory.getLogger(SimpleGroupBy.class);public static void main(String[] args) throws Exception {LocalEnvironment env = ExecutionEnvironment.createLocalEnvironment();List<Tuple2<String, Integer>> list = new ArrayList<>(16);Random random = new Random();for (int i = 0; i < 16; i++) {int num = random.nextInt(20);list.add(new Tuple2<String, Integer>(String.valueOf(num), num));}LOGGER.info("listResult={}", list);List<Tuple2<String, Integer>> flinkRes = env.fromCollection(list).groupBy(0).sum(1).collect();LOGGER.info("flinkResult={}", flinkRes);}
}

输出如下:

16:41:33,708 [           main] INFO  com.alibaba.security.blink.SimpleGroupBy                      - listResult=[(1,1), (9,9), (14,14), (18,18), (1,1), (7,7), (16,16), (1,1), (4,4), (16,16), (15,15), (17,17), (16,16), (0,0), (12,12), (15,15)]

16:41:37,573 [           main] INFO  com.alibaba.security.blink.SimpleGroupBy                      - flinkResult=[(12,12), (18,18), (9,9), (14,14), (15,30), (7,7), (16,48), (17,17), (4,4), (0,0), (1,3)]

通过上面groupBy例子可以看出,groupBy(int... fields) 方法仅针对DataSet类型为Tuple系列的数据源才有效,fields顺序为Tuple中属性位置,如Tuple第0号属性,则参数为0,以此类推。

groupBy(String... fields)

该方法可以针对那些DataSet为POJO类型数据源,方法参数为POJO属性且该属性必须有公共的setter、getter方法,并且该POJO必须有一个默认无参数构造方法。举个例子,获取某个用户所有下单IP个数,代码如下:

/*** @author shaoxian.ssx* @date 2021/11/7*/
public class SimpleGroupBy {private final static Logger LOGGER = LoggerFactory.getLogger(SimpleGroupBy.class);public static void main(String[] args) throws Exception {LocalEnvironment env = ExecutionEnvironment.createLocalEnvironment();Order order1 = new Order(1001L, "张三", "192.168.1.10");Order order2 = new Order(1002L, "李四", "192.168.1.212");Order order3 = new Order(1001L, "张三", "192.168.1.50");Order order4 = new Order(1003L, "王五", "192.168.1.71");DataSource<Order> dataSource = env.fromElements(order1, order2, order3, order4);List<Order> result = dataSource.groupBy("byrId").reduce(new ReduceFunction<Order>() {@Overridepublic Order reduce(Order value1, Order value2) throws Exception {if (!value1.getIp().equals(value2.getIp())) {value1.setIpCount(value1.getIpCount() + value2.getIpCount());return value1;}return value1;}}).collect();result.forEach(e -> LOGGER.info("order={}", e));}/*** 必须为public类型,否则flink校验类型会报错*/public static class Order {private Long byrId;private String name;private String ip;private int ipCount = 1;// 必须提供无参数构造方法public Order() {}public Order(Long byrId, String name, String ip) {this.byrId = byrId;this.name = name;this.ip = ip;}// 省略setter、getter方法...@Overridepublic String toString() {return new StringJoiner(", ", Order.class.getSimpleName() + "[", "]").add("byrId=" + byrId).add("name='" + name + "'").add("ip='" + ip + "'").add("ipCount=" + ipCount).toString();}}
}

输出结果如下:

17:08:02,922 [           main] INFO  com.alibaba.security.blink.SimpleGroupBy                      - order=Order[byrId=1001, name='张三', ip='192.168.1.10', ipCount=2]
17:08:02,922 [           main] INFO  com.alibaba.security.blink.SimpleGroupBy                      - order=Order[byrId=1003, name='王五', ip='192.168.1.71', ipCount=1]
17:08:02,922 [           main] INFO  com.alibaba.security.blink.SimpleGroupBy                      - order=Order[byrId=1002, name='李四', ip='192.168.1.212', ipCount=1]

groupBy(KeySelector<T, K> keyExtractor)

该方法个人感觉跟上看属性groupBy差不多,只是写起来更好看点,也是针对POJO类型数据源,其实准确说是有public类型setter、getter方法属性,例如Tuple2中f0、f1也可以用,还是用上面例子改用KeySelector如下:

// 告诉flink使用Order对象byrId值进行groupByList<Order> result = dataSource.groupBy(new KeySelector<Order, Long>() {@Overridepublic Long getKey(Order value) throws Exception {return value.getByrId();}}).reduce(new ReduceFunction<Order>() {@Overridepublic Order reduce(Order value1, Order value2) throws Exception {if (!value1.getIp().equals(value2.getIp())) {value1.setIpCount(value1.getIpCount() + value2.getIpCount());return value1;}return value1;}}).collect();result.forEach(e -> LOGGER.info("order={}", e));

UnsortedGrouping

UnsortedGrouping为groupBy返回对象,为什么要说groupBy呢?因为在流式计算中groupBy是最常见的场景,如groupBy商品ID来判断哪个商品买的最多;groupBy地址来判断哪个地方地址聚集度等等。一般sql写完了group by后通常都要进行count,那flink在flink中怎么做呢?flink最终聚合计算调的方法都在这个UnsortedGrouping类中,count在这里为reduce操作,reduce计算逻辑封装在ReduceFunction中。如上面统计所有订单相同买家IP个数,在reduce中针对不同IP做了+1操作,在reduce执行完后,拿到的那个Order对象里ipCount就是最终累加后的总IP个数。当然这个UnsortedGrouping里还有很多有用方法,如maxBy、minBy、sum,这里写个demo演示一下:

/*** @author shaoxian.ssx* @date 2021/11/7*/
public class SimpleGroupBy {private final static Logger LOGGER = LoggerFactory.getLogger(SimpleGroupBy.class);public static void main(String[] args) throws Exception {LocalEnvironment env = ExecutionEnvironment.createLocalEnvironment();Order order1 = new Order(1001L, "张三", "192.168.1.10", 30d);Order order2 = new Order(1002L, "李四", "192.168.1.212", 27d);Order order3 = new Order(1001L, "张三", "192.168.1.50", 100d);Order order4 = new Order(1003L, "王五", "192.168.1.71", 30d);DataSource<Order> dataSource = env.fromElements(order1, order2, order3, order4);// 先使用map将Order转化为Tuple类型,然后再按照买家ID进行groupBy,最后筛选出每组中金额最大的一笔订单并输出List<Tuple2<Double, Order>> result = dataSource.map(new MapFunction<Order, Tuple2<Double, Order>>() {@Overridepublic Tuple2<Double, Order> map(Order value) throws Exception {return new Tuple2<>(value.getTotal(), value);}}).groupBy("f1.byrId").maxBy(0).collect();result.forEach(e -> LOGGER.info("order={}", e));}/*** 必须为public类型,否则flink校验类型会报错*/public static class Order {private Long byrId;private String name;private String ip;// 订单总金额private double total;private int ipCount = 1;// 必须提供无参数构造方法public Order() {}public Order(Long byrId, String name, String ip, double total) {this.byrId = byrId;this.name = name;this.ip = ip;this.total = total;}// 省略setter、getter方法...public void setTotal(double total) {this.total = total;}@Overridepublic String toString() {return new StringJoiner(", ", Order.class.getSimpleName() + "[", "]").add("byrId=" + byrId).add("name='" + name + "'").add("ip='" + ip + "'").add("ipCount=" + ipCount).add("double=" + total).toString();}}
}

总结

今天学习的这些Demo及API也已入门flink,后续需要持续投入并带来更多API调用探索及flink底层原理解析。

Flink学习记录--入门篇相关推荐

  1. C# 学习笔记入门篇(上)

    文章目录 C# 学习笔记入门篇 〇.写在前面 Hello World! 这篇学习笔记适合什么人 这篇学习笔记到底想记什么 附加说明 一.命名空间 "进入"命名空间 嵌套的命名空间. ...

  2. R语言学习笔记——入门篇:第一章-R语言介绍

    R语言 R语言学习笔记--入门篇:第一章-R语言介绍 文章目录 R语言 一.R语言简介 1.1.R语言的应用方向 1.2.R语言的特点 二.R软件的安装 2.1.Windows/Mac 2.2.Lin ...

  3. PHP学习记录第一篇:Ubuntu14.04下LAMP环境的搭建

    PHP学习记录第一篇:Ubuntu14.04下LAMP环境的搭建 最近一段时间会学习一下PHP全栈开发,将会写一系列的文章来总结学习的过程,以自勉. 第一篇记录一下LAMP环境的安装 0. 安装Apa ...

  4. Vue学习笔记入门篇——数据及DOM

    本文为转载,原文:Vue学习笔记入门篇--数据及DOM 数据 data 类型 Object | Function 详细 Vue 实例的数据对象.Vue 将会递归将 data 的属性转换为 getter ...

  5. 猿编程python代码_程序猿编程课堂 Python学习之入门篇3:简单的数据类型

    1. 什么是数据类型? 我们知道编程就是编写程序解决某个问题,而任何能使用编程来解决的问题都是能够提取或者转换出相应的数据,只是数据的表达形式是不一样的,而这表达形式就是数据类型. 比如,数学中的数字 ...

  6. 程序猿编程课堂 Python学习之入门篇1:环境搭建与第一个程序

    前言: Python作为目前比较热门的编程语言,其简单和简洁的语法使它成为一种非常好的通用编程语言,它是一种面向对象的动态类型语言,最初被设计用于编写自动化脚本(shell),并且随着版本的不断更新和 ...

  7. Java工程师学习指南 入门篇

    Java工程师学习指南 入门篇 最近有很多小伙伴来问我,Java小白如何入门,如何安排学习路线,每一步应该怎么走比较好.原本我以为之前的几篇文章已经可以解决大家的问题了,其实不然,因为我之前写的文章都 ...

  8. R语言学习笔记——入门篇:第三章-图形初阶

    R语言 R语言学习笔记--入门篇:第三章-图形初阶 文章目录 R语言 一.使用图形 1.1.基础绘图函数:plot( ) 1.2.图形控制函数:dev( ) 补充--直方图函数:hist( ) 补充- ...

  9. 前端学习记录 —— HTML篇(下)

    前端学习记录 -- HTML篇(下) 一.表格标签 table,tr,td 1. 表格标题 <caption></caption><caption></cap ...

最新文章

  1. UVA11825 黑客的攻击 Hackers' Crackdown 状压DP,二进制,子集枚举
  2. Intel RealsenseD435 color图与depth图的两种对齐(align)方式
  3. 什么是流量劫持,如何防止流量劫持?
  4. matplotlib旋转y轴标题为垂直方向
  5. Openbiz Cubi 企业级应用程序开发(一)
  6. 840. 模拟哈希表(模板)
  7. 18.QThread线程创建
  8. 软件测试52讲-用机器设计测试用例:基于模型的测试
  9. Linux宝库快讯 | OpenStack中国日更名OpenInfra中国日
  10. Android时间与服务器同步方法
  11. matlab中如何画直方图,用电脑怎么画直方图,如何用matlab画直方图已知频数和组距matl...
  12. 半导体上下游最核心供应商名单(建议收藏)
  13. 《CCNet: Criss-Cross Attention for Semantic Segmentation》论文笔记
  14. 数值分析(7):函数逼近
  15. html半圆形,【实例】CSS3画一个半圆的方法
  16. Unity中实现高级相机操作——Cinemachine插件
  17. html滚轮下拉动画,html5+css3齿轮滚动动画代码
  18. sql 计算周环比wow_通过对周进行编号计算周环比(WOW)
  19. 02 原生平台反射机制的消息处理
  20. /proc/cpuinfo 里的 CPU flags

热门文章

  1. 老年人是否适合做种植牙?
  2. 【rmzt】动漫生活win7主题_8.13
  3. 科目三: 济南章丘三号线
  4. 为什么改元“令和”,竟然成了日本程序员的魔咒?
  5. 【java后台面经】春招秋招求职大佬面试经验分享
  6. excel查找在哪里_excel大神教程:5个职场人士最常用的公式,马住不谢
  7. (16) pytorch之Dropout
  8. 【一起学习输入法】华宇拼音输入法开源版本解析(10)
  9. 东财《组织学习与知识管理》综合作业
  10. Wannafly挑战赛22 D-整数序列 (线段树)