Flink Environment

  1. getExecutionEnvironment()

    根据当前平台, 获取对应的执行环境, 若未设置并行度, 使用 flink-conf.yaml 中的并行度配置, 默认 1.
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
  2. createLocalEnviroment()

    创建本地环境, 并行度默认为 CPU 核数, 也可在构造函数中传参设置
    LocalStreamEnvironment localEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
    
  3. createRemoteEnviroment()

    创建远程环境, 将 jar 提交到远程环境执行
    StreamExecutionEnvironment remoteEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("localhost", 7777, "/home/WordCount.jar");
    

Flink 输入源

  1. 使用集合数据作为输入源

    env.fromCollection(new ArrayList<>());
    env.fromElements(1, 2, 3);
    
  2. 使用文件作为输入源
    env.readTextFile("/home/test.txt");
    
  3. 使用消息队列作为输入源
    如下, 使用 Kafka 作为输入源

    引入连接器依赖:
    <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_2.12</artifactId><version>1.10.1</version>
    </dependency>env.addSource(new FlinkKafkaConsumer011<String>("sensor", new SimpleStringSchema(), properties));
    
  4. 用户自定义输入源(实现 SourceFunction 接口)
    主要用于测试, 定义假数据.

具体实操代码如下:

import com.regotto.entity.SensorReading;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;import java.util.Arrays;
import java.util.Properties;
import java.util.Random;/*** @author regotto*/
public class SourceTest {private static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();private static void readFromCollectionAndElement() {/*从集合中读取, SensorReading 自定义实体(String id, Long timestamp, Double temperature)*/DataStream<SensorReading> dataStream = env.fromCollection(Arrays.asList(new SensorReading("1", 1111L, 35.1),new SensorReading("2", 2222L, 32.1),new SensorReading("3", 3333L, 33.1),new SensorReading("4", 12345L, 36.1)));DataStreamSource<Integer> elements = env.fromElements(1, 2, 3, 4, 5);dataStream.print("data");elements.print("int");}private static void readFromText() {DataStream<String> dataStream = env.readTextFile("D:\\sensor.txt");dataStream.print();}private static void readFromKafka() {Properties properties = new Properties();properties.setProperty("bootstrap.servers", "localhost:9999");properties.setProperty("group.id", "consumer-group");properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");DataStream<String> dataStream = env.addSource(new FlinkKafkaConsumer011<String>("sensor", new SimpleStringSchema(), properties));dataStream.print();}/*** 用户自定义输入源*/private static void readFromUserDefine() {// 实现 SourceFunction 接口, run 方法中定义数据并使用 collect 资源输出DataStream<SensorReading> dataStream = env.addSource(new SourceFunction<SensorReading>() {private volatile boolean running = true;public void run(SourceContext<SensorReading> ctx) throws Exception {Random random = new Random();while (running) {for (int i = 0; i < 10; i++) {ctx.collect(new SensorReading(i + "", System.currentTimeMillis(), random.nextGaussian()));}}}public void cancel() {running = false;}});dataStream.print();}public static void main(String[] args) throws Exception {readFromUserDefine();env.execute();}
}

Transform

映射转换算子

  1. map: 将数据一一映射
  2. flatMap: 将数据打散后进行映射
  3. filter: 对数据进行过滤

聚合转换算子

  1. keyBy: 聚合操作, 将一个流 hash 运算拆分为不相交的分区, 每个分区包含相同key
    滚动聚合: sum, min, max, minBy(), maxBy();
  2. reduce: 聚合操作, 合并当前元素与上次聚合的结果, 返回流包含所有聚合的结果

多流转换算子

  1. split 和 select: 根据某些特征将 DataStream 拆分为 2 个或 多个 DataStream
    split: 将 DataStream 打上标签.
    select: 将打上标签的 DataStream 进行一个拆分.
  2. Connect 和 CoMap: 2个 DataStream 包装为 1 个 DataStream
    connect: 包装后内部流依旧保持各自的状态, 流与流之间相互独立
    coMap/coFlatMap: 对 connect 操作后的流, 进行 map/flatMa 合并操作
  3. union: 将 2 个以上相同类型的 DataStream 合并为同一个流

具体实操代码如下:

import com.regotto.entity.SensorReading;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.util.Collector;import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
import java.util.Random;/*** @author regotto*/
public class TransformTest {private static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();public static void main(String[] args) throws Exception {DataStream<String> dataStream = env.readTextFile("D:\\sensor.txt");env.setParallelism(1);// map, 映射操作, 将数据映射封装为 SensorReadingDataStream<SensorReading> map = dataStream.map(value -> {String[] fields = value.split(",");return new SensorReading(fields[0], Long.valueOf(fields[1]), Double.valueOf(fields[2]));});map.print("map");// flatMap, 将原来的数据打散然后映射dataStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {for (String s : value.split(",")) {out.collect(s);}}}).print("flatMap");// filter, 过滤器dataStream.filter((FilterFunction<String>) value -> value.startsWith("1")).print("filter");// map 进行滚动聚合求当前温度最大值, keyBy 可以用指定位置, 属性, 自定义 keySelectorKeyedStream<SensorReading, String> keyedStream = map.keyBy(SensorReading::getId);keyedStream.max("temperature").print("max temperature");// reduce 聚合, 求最大温度下的最大时间戳记录keyedStream.reduce(new ReduceFunction<SensorReading>() {@Overridepublic SensorReading reduce(SensorReading curData, SensorReading newData) throws Exception {return new SensorReading(curData.getId(), newData.getTimestamp(), Math.max(curData.getTemperature(), newData.getTemperature()));}}).print("最大温度下的最新时间");// split&select 根据温度把数据分为高温, 低温SplitStream<SensorReading> splitStream = keyedStream.split(new OutputSelector<SensorReading>() {@Overridepublic Iterable<String> select(SensorReading value) {return value.getTemperature() > 36 ? Collections.singletonList("high") : Collections.singletonList("low");}});DataStream<SensorReading> high = splitStream.select("high");DataStream<SensorReading> low = splitStream.select("low");DataStream<SensorReading> all = splitStream.select("high", "low");high.print("高温流");low.print("低温流");all.print("all");// connect&coMap, 将高温处理为二元组, 与低温进行合并, 输出状态信息ConnectedStreams<Tuple2<String, Double>, SensorReading> connectedStream =high.map(new MapFunction<SensorReading, Tuple2<String, Double>>() {@Overridepublic Tuple2<String, Double> map(SensorReading value) throws Exception {return new Tuple2<>(value.getId(), value.getTemperature());}}).connect(low);connectedStream.map(new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() {@Overridepublic Object map1(Tuple2<String, Double> value) throws Exception {return new Tuple3<>(value.f0, value.f1, "高温报警");}@Overridepublic Object map2(SensorReading value) throws Exception {return new Tuple2<>(value.getId(), "温度正常");}}).print("connect&coMap");// 使用 union 合并 hig, lowhigh.union(low, all).print("union");env.execute();}
}

算子运算转化图:

RichMapFunction

对于 MapFunction的增强, 可以获取 RuntimeContext, 一个运行上下文代表一个分区, 每个分区创建销毁都执行 open, close 操作, 对资源预处理, 资源销毁进行操作, 继承 RichMapFunction重写 open, close 实现资源预处理与回收操作. 使操作更为灵活, 其余 RichXXX操作同理.

遇到的问题

写函数的时候, 把匿名内部类简写为 lambda 表达式, 导致泛型擦除的问题, 出现报错:
The generic type parameters of ‘Collector’ are missing. In many cases lambda methods don’t provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the ‘org.apache.flink.api.common.functions.FlatMapFunction’ interface. Otherwise the type has to be specified explicitly using type information.

总结

数据在运算, 转化过程, 一定要搞清楚, 输入是啥, 输出是啥.

FlinkAPI_Environment_输入源_算子转化流程相关推荐

  1. 快捷键实现MAC或者WINDOWS下快速切换显示器输入源

    目录 通过键盘实现mac或者Windows显示器输入信号源快捷键切换(电脑怎么切换屏幕的信号输入) 前言 一.通过显示器菜单按钮控制 二.通过快捷键设置 1.window系统 对ControlMyMo ...

  2. 【大数据开发】SparkStreaming——DStream输入源、原语、SparkStream与Kafka和Redis三者的交互

    设置SparkConf的时候不能设置为local,会报错,应当设置成local[N],N>1.这是因为需要一个核接收数据,另一个核处理数据,如果只分配一个线程处理,这个线程会被用来接收数据,就没 ...

  3. 2021年大数据Spark(四十五):Structured Streaming Sources 输入源

    目录 Sources 输入源 Socket数据源-入门案例 需求 编程实现 ​​​​​​​文件数据源-了解 ​​​​​​​需求 ​​​​​​​代码实现 ​​​​​​​Rate source-了解 So ...

  4. 【深度学习】图像输入网络必要的处理流程

    [深度学习]图像输入网络必要的处理流程 文章目录 1 图像处理之灰度转化 2 归一化 3 CLAHE 4 伽马矫正 5 Data augmentation5.1 裁剪(Crop)5.2 缩放ÿ

  5. MindSpore GPU异构算子全流程开发指导

    Tips ① 此文档详细介绍了MindSpore GPU异构算子开发流程,与官方文档相比本文档更加侧重于开发文件的解读以及常用开发方法的讲解.同时本文档用词相对简单,主要帮助大家了解GPU算子开发需要 ...

  6. .jpg图片转化流程详解

    .jpg文件转化流程详解 概要 色彩空间转换 缩减采样 图像分成8×8像素块 离散余弦变换 量化 压缩 理解质量参数 总结 原文地址 本篇文章翻译自谷歌出的优化视频里面的光头佬(Colt McAnli ...

  7. python处理回显_Python中getpass模块无回显输入源码解析

    本文主要讨论了python中getpass模块的相关内容,具体如下. getpass模块 昨天跟学弟吹牛b安利Python标准库官方文档的时候偶然发现了这个模块.仔细一看内容挺少的,只有两个主要api ...

  8. 渣渣菜鸡的 ElasticSearch 源码解析 —— 启动流程(上)

    关注我 转载请务必注明原创地址为:http://www.54tianzhisheng.cn/2018/08/11/es-code02/ 前提 上篇文章写了 ElasticSearch 源码解析 -- ...

  9. 【Android 启动过程】Activity 启动源码分析 ( ActivityThread 流程分析 二 )

    文章目录 前言 一.ActivityManagerService.attachApplicationLocked 二.ActivityStackSupervisor.attachApplication ...

最新文章

  1. 客户跟进节奏(转至索菲外贸日记)
  2. Python中json模块的使用,以及json.loads()和json.dumps()的区别
  3. maven scope范围
  4. Fix Corrupt Blocks on HDFS
  5. redis 集群 搭建(非哨兵)
  6. 2018美赛B题翻译
  7. solidworks电气元件3d库_送软件 | 零基础也可以学的EPLAN电气设计实战教程
  8. java oa系统消息推送_第三方系统向泛微OA系统推送消息
  9. 计算机的内存的材料是什么,内存条到底是干啥的?手把手的告诉你
  10. python量化投资系统构建_零基础搭建量化投资系统 以Python为工具
  11. ORACLE 正负数分开排序 SQL
  12. python经典代码
  13. ofo小黄车仍在自动续费!如何关闭微信自动扣费?
  14. lect01_codes_高阶语法
  15. FlexRay总线协议快速入门、深度剖析与应用示例
  16. Linux项目:音乐播放器
  17. matlab声音的滤波处理,MATLAB声音信号的采集与滤波处理
  18. 爬虫 - 收藏集 - 掘金
  19. setTimeout()的用法
  20. 《啊哈!算法》学习心得

热门文章

  1. 参加 CSDN 2009 英雄大会有感(一)
  2. 【计算机网络】wireshark数据流追踪、图像抓取(转)
  3. Could not load driverClass “com.mysql.jdbc.Driver“
  4. 机器学习算法之 K-means、层次聚类,谱聚类
  5. 机器学习算法之隐马尔可夫模型
  6. Django之templates模板
  7. jQuery中的on 和事件委派
  8. SQL中基于代价的优化
  9. Yarn在MapReduce中的工作机制
  10. 网络:TCP通讯之 time_wait 状态