flink学习(五)DataStream API
目录
- 编程的模型
- Source
- 基于集合的Source
- 基于文件的Source
- 基于socket的Source
- 自定义source--MySQL
- Transformation
- 基本操作
- 合并和拆分
- 分区
- sink
- 预定义Sink
- 自定义Sink
编程的模型
编程的主要是用以下三步来写
- data source
获取数据的来源例如基于集合中,文件,socket,MySQL - transforations
对获取到的数据进行相关的规则计算 - sink
将处理好的数据输出或者保存
Source
基于集合的Source
env.fromElements():元素
env.fromCollection():集合
env.generateSequence():产生序列
env.fromSequence():来自于序列
全代码
package datastream;/*** @author 公羽* date 2021/7/3*/
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Arrays;public class source_set {public static void main(String[] args) throws Exception {// 1.环境准备StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 流处理模式
// 2.source
// env.formElements() 元素DataStream<String> ds1 = env.fromElements("test1","test2","test3");
// env.fromcollection 集合DataStream<String> ds2 = env.fromCollection(Arrays.asList("test4","test5","test6"));
// env.generateSequence() 产生序列 已经放弃的用法DataStream<Long> ds3 = env.generateSequence(1,10);
// env.fromSequence() 来自序列DataStream<Long> ds4 = env.fromSequence(1,10);
// 3.transformation
// 4.sinkds1.print();ds2.print();ds3.print();ds4.print();
// 5.执行env.execute();}
}
基于文件的Source
- env.readTextFile(本地/hdfs文件/文件夹/压缩包)
在pom.xml文件中加入
要是想要读取hdfs上的数据需要把hadoop的配置文件hdfs-site.xml和core-site.xml下载复制到项目resource是文件下
- 本地
- 本地文件夹
- hdfs
- 压缩包
全代码
package datastream;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @author 公羽* date 2021/7/3*/
public class source_file {public static void main(String[] args) throws Exception {// 1.环境准备StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 流处理模式
// 2.source
// 读取单个文件DataStream<String> ds1 = env.readTextFile("E:\\input\\test.txt");
// 读取文件夹DataStream<String> ds2 = env.readTextFile("E:\\input");
// 读取hdfs上单个文件DataStream<String> ds3 = env.readTextFile("hdfs://spark01:9000/test/test.txt");
// 读取压缩包DataStream<String> ds4 = env.readTextFile("hdfs://spark01:9000/test/test.gz");
// 3.transformation
// 4.sink
// ds1.print();
// ds2.print();
// ds3.print();ds4.print();
// 5.executorenv.execute();}
//
}
基于socket的Source
监听9999端口
全代码
package datastream;import javafx.util.converter.DateStringConverter;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;import java.util.Date;/*** @author 公羽* date 2021/7/3*/
public class source_socket {public static void main(String[] args) throws Exception {// 1.环境准备StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 流处理模式
// 2.sourceDataStream<String> ds1 = env.socketTextStream("spark01",9999);
// 3.transformationDataStream<String> ds2 = ds1.flatMap(new FlatMapFunction<String, String>() {@Override
// 重写map方法public void flatMap(String s, Collector<String> collector) throws Exception {String[] words = s.split(" ");//按照" "切词for (String word : words){collector.collect(word);}}});DataStream<Tuple2<String, Integer>> ds3 = ds2.map(new MapFunction<String, Tuple2<String, Integer>>() {@Override
// 重写reduce方法public Tuple2<String, Integer> map(String s) throws Exception {return Tuple2.of(s,1);//组成二元组的形式}});
// 按照key结合进行统计KeyedStream<Tuple2<String,Integer>, String > ds4 = ds3.keyBy(t -> t.f0);DataStream<Tuple2<String,Integer>> ds5 = ds4.sum(1);
// 4.sinkds5.print();
// 5.executeenv.execute();}
}
自定义source–MySQL
安装插件Lombok
添加依赖
数据库中的数据
创建学生实体类
package datastream;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/*** @author 公羽* date 2021/7/3*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class student {private Integer id;private String name;private Integer age;
}
创建自定义来源类
package datastream;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.concurrent.TimeUnit;
public class source_mysql extends RichParallelSourceFunction<student> {private Connection connection = null;private PreparedStatement preparedStatement = null;private boolean flag = true;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useSSL=false&characterEncoding=utf-8&serverTimezone=UTC","root","123456");String sql = "select * from test_flink";preparedStatement = connection.prepareStatement(sql);}@Overridepublic void run(SourceContext sourceContext) throws Exception {while (flag) {ResultSet rs = preparedStatement.executeQuery();while (rs.next()) {int id = rs.getInt("id");String name = rs.getString("name");int age = rs.getInt("age");sourceContext.collect(new student(id,name,age));}TimeUnit.SECONDS.sleep(5);}}@Overridepublic void cancel() {flag = false;}@Overridepublic void close() throws Exception {super.close();preparedStatement.close();connection.close();}
}
编写主类
package datastream;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @author 公羽* date 2021/7/3*/public class driver_mysql {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<student> ds = env.addSource(new source_mysql()).setParallelism(1);ds.print();env.execute();}
}
Transformation
基本操作
flatmap
将集合中的每个元素变成一个或多个元素,并返回扁平化之后的结果
- map
将函数作用在集合的每一个元素上,并返回作用后的结果 - keyBy
按照指定的key对流中的数据进行分组,注意流处理中没有groupBy,而是keyBy - filter
按照指定的条件对集合中的元素进行过滤,返回符合条件的元素 - sum
按照指定的字段对集合中的元素进行求和 - reduce
对集合中的元素进行聚合
案列
package datastream;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** @author 公羽* date 2021/7/4*/
public class trans_basic {public static void main(String args[]) throws Exception {//1、准备环境-envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//自动选择处理模式//2、准备数据-sourceDataStream<String> lineDS = env.fromElements("spark heihei sqoop hadoop","spark flink","hadoop fink heihei spark");//3、处理数据-transformation//3.1 将每一行数据切分成一个个的单词组成一个集合DataStream<String> wordsDS = lineDS.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String s, Collector<String> collector) throws Exception {//s就是一行行的数据,再将每一行分割为一个个的单词String[] words = s.split(" ");for (String word : words) {//将切割的单词收集起来并返回collector.collect(word);}}});//3.1.5 对数据进行敏感词过滤DataStream<String> filterDS = wordsDS.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String s) throws Exception {return !s.equals("heihei");}});//3.2 对集合中的每个单词记为1DataStream<Tuple2<String,Integer>> wordAndOnesDS = filterDS.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String s) throws Exception {//s就是进来的一个个单词,再跟1组成一个二元组return Tuple2.of(s,1);}});//3.3 对数据按照key进行分组KeyedStream<Tuple2<String,Integer>,String> groupedDS = wordAndOnesDS.keyBy(t->t.f0);//3.4 对各个组内的数据按照value进行聚合也就是求sumDataStream<Tuple2<String, Integer>> aggResult = groupedDS.sum(1);//3.5 对结果聚合DataStream<Tuple2<String,Integer>> redResult = groupedDS.reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> t2, Tuple2<String, Integer> t1) throws Exception {return Tuple2.of(t2.f0,t2.f1 + t2.f1);}});//4、输出结果-sinkaggResult.print();redResult.print();
// 5、触发执行-execute//说明:如果有print那么Dataset不需要调用execute,DataStream需要调用executeenv.execute();}
}
合并和拆分
合并
- union和connect
union合并多个同类型的数据流,并生成一个同类型的新的数据流,connect连接两个数据流,这两个数据流可以是不同的类型
package datastream;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;import java.util.Arrays;/*** @author 公羽* date 2021/7/4*/
public class trans_con {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);DataStream<String> dataStreamSource = env.fromElements("test1", "test2", "test3");DataStream<String> stringDataStreamSource = env.fromCollection(Arrays.asList("test4", "test5", "test6"));DataStream<Long> longDataStreamSource = env.generateSequence(1, 10);DataStream<Long> longDataStreamSource1 = env.fromSequence(1, 10);DataStream<String> union = dataStreamSource.union(stringDataStreamSource);ConnectedStreams<String, Long> connect = dataStreamSource.connect(longDataStreamSource);DataStream<String> map = connect.map(new CoMapFunction<String, Long, String>() {@Overridepublic String map1(String s) throws Exception {return "String -> String" +" "+ s;}@Overridepublic String map2(Long aLong) throws Exception {return "Long -> String" + " " + aLong.toString();}});
// union.print();map.print();env.execute();}
}
拆分
- Side Outputs
Side Outputs可以使用process方法对流中的数据进行处理,并针对不同的处理结果将数据收集到不同的OutputTag中
package datastream;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;/*** @author 公羽* date 2021/7/4*/
public class trans_split {public static void main(String[] args) throws Exception {// 环境准备StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// sourceDataStream<Integer> integerDataStreamSource = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9);
// transformation
// 拆分
// 定义标签OutputTag<Integer> tag1 = new OutputTag<Integer>("偶数", TypeInformation.of(Integer.class));OutputTag<Integer> tag2 = new OutputTag<>("奇数", TypeInformation.of(Integer.class));SingleOutputStreamOperator<Integer> process = integerDataStreamSource.process(new ProcessFunction<Integer, Integer>() {@Overridepublic void processElement(Integer integer, Context context, Collector<Integer> collector) throws Exception {if (integer % 2 == 0){context.output(tag1,integer);}else {context.output(tag2,integer);}}});
// 取出标签DataStream<Integer> sideOutput = process.getSideOutput(tag1);DataStream<Integer> sideOutput1 = process.getSideOutput(tag2);
// sinksideOutput.print();sideOutput1.print();
// executeenv.execute();}
}
分区
- rebalance重平衡分区
解决数据倾斜,数据倾斜指的是大量的数据集中于一台节点上,而其他节点的负载较轻
package datastream;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @author 公羽* date 2021/7/4*/
public class trans_para {public static void main(String[] args) throws Exception {// 环境准备StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// sourceDataStream<Long> longDataStreamSource = env.fromSequence(1, 10000);
// transformation
// 将数据随机分配一下,有可能出现数据倾斜DataStream<Long> filter = longDataStreamSource.filter(new FilterFunction<Long>() {@Overridepublic boolean filter(Long aLong) throws Exception {return aLong > 10;}});
// 直接处理,有可能出现数据倾斜DataStream<Tuple2<Integer, Integer>> map = filter.map(new RichMapFunction<Long, Tuple2<Integer, Integer>>() {@Overridepublic Tuple2<Integer, Integer> map(Long aLong) throws Exception {int id = getRuntimeContext().getIndexOfThisSubtask();return Tuple2.of(id,1);}}).keyBy(t -> t.f0).sum(1);//在数据输出前进行了rebalance重平衡分区,解决数据的倾斜DataStream<Tuple2<Integer, Integer>> map1 = filter.rebalance().map(new RichMapFunction<Long, Tuple2<Integer, Integer>>() {@Overridepublic Tuple2<Integer, Integer> map(Long aLong) throws Exception {int id = getRuntimeContext().getIndexOfThisSubtask();return Tuple2.of(id,1);}}).keyBy(t -> t.f0).sum(1);
// sinkmap.print();map1.print();
// executeenv.execute();}
}
sink
预定义Sink
ds.print():直接输出到控制台
ds.printToErr():直接输出到控制台,用红色
- ds.writeAsText(“本地/HDFS”,WriteMode.OVERWRITE).setParallelism(n):输出到本地或者hdfs上,如果n=1,则输出为文件名,如果n>1,则输出为文件夹
全代码
package datastream;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @author 公羽* date 2021/7/4*/
public class sink {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);DataStream<String> ds = env.fromElements("test1", "test2", "test3");
// sink
// ds.print();
// ds.printToErr();
// ds.writeAsText("f:/test/test1").setParallelism(1);ds.writeAsText("f:/test/test2").setParallelism(2);env.execute();}
}
自定义Sink
将数据存入MySQL中
定义sink类
package datastream;
import datastream.student;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
/*** @author 公羽* date 2021/7/4*/
public class sink_mysql extends RichSinkFunction<student> {private Connection connection = null;private PreparedStatement preparedStatement = null;@Overridepublic void open(Configuration parameters) throws Exception {//调用父类的构造方法,可删除super.open(parameters);//加载mysql驱动,建立连接connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useSSL=false&characterEncoding=utf-8&serverTimezone=UTC","root","123456");String sql = "insert into test_flink(name,id,age) values(?,?,?)";//建立StatementpreparedStatement = connection.prepareStatement(sql);}@Overridepublic void invoke(student value, Context context) throws Exception {//给ps中的?设置具体的值preparedStatement.setString(1,value.getName());//获取姓名preparedStatement.setInt(2,value.getId());//获取idpreparedStatement.setInt(3,value.getAge());//获取年龄//执行sqlpreparedStatement.executeUpdate();}@Overridepublic void close() throws Exception {super.close();preparedStatement.close();connection.close();}
}
定义sink主类
package datastream;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/*** @author 公羽* date 2021/7/4*/
public class sink_driver {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);DataStream<student> test1 = env.fromElements(new student(10,"test1",100));test1.addSink(new sink_mysql());env.execute();}
}
flink学习(五)DataStream API相关推荐
- Flink学习之DataStream API(python版本)
- flink fi java_Flink DataStream API编程指南
Flink中的DataStream程序是实现数据流转换的常规程序(例如:filtering, updating state, defining windows, aggregating).数据流最初是 ...
- Flink 1.13,面向流批一体的运行时与 DataStream API 优化
简介:在 1.13 中,针对流批一体的目标,Flink 优化了大规模作业调度以及批执行模式下网络 Shuffle 的性能,以及在 DataStream API 方面完善有限流作业的退出语义. 本文由社 ...
- 【基础】Flink -- DataStream API
Flink -- DataStream API 执行环境 Execution Environment 创建执行环境 设置执行模式 触发程序执行 源算子 Source 从集合中读取数据 从文件读取数据 ...
- Flink入门第十二课:DataStream api/Flink sql实现每隔5分钟统计最近一小时热门商品小案例
用到的数据文件 用到的数据文件 链接:https://pan.baidu.com/s/1uCk-IF4wWVfUkuuTAKaD0w 提取码:2hmu 1.需求 & 数据 用户行为数据不断写入 ...
- flink DataStream API使用及原理
传统的大数据处理方式一般是批处理式的,也就是说,今天所收集的数据,我们明天再把今天收集到的数据算出来,以供大家使用,但是在很多情况下,数据的时效性对于业务的成败是非常关键的. Spark 和 Flin ...
- [Flink]Flink DataStream API 概览
目录 什么是 DataStream 什么能被转化为流 流式Flink程序的开发流程 DataStream的数据源 迭代数据流 配置运行时参数 什么是 DataStream Datastream API ...
- (十八)Flink Table API SQL 编程指南 Table API 和Datastream API 集成
文章目录 DataStream 和 Table 之间的转换 依赖项和导入 配置 执行行为 datastream API table API 批处理运行时模式 Changelog统一 处理(仅插入)流 ...
- Flink DataStream API 介绍
Flink DataStream API 介绍 StreamExecutionEnvironment #mermaid-svg-JKeWa22W2vWA4zBS {font-family:" ...
最新文章
- javaScript的运算符和if。。else语句
- 【java 性能优化实战】1 理论分析:性能优化,有哪些衡量指标、性能优化的技术手段
- 【Http专题】Https
- 好用的数据分析工具能顶半个BI团队?大佬力荐这款工具值得体验
- QT的QNetworkAccessManager类的使用
- java 算法_Java 浅谈数据结构和算法
- 微课|Python使用Barrier对象实现多线程同步
- RTM-DSP项目总结
- Illustrator 教程,了解 AI 中的绘图工具
- linux卸载rpm安装的golang,linux下go安装/卸载重装-Go语言中文社区
- High-Quality Genome-Scale Models From Error-Prone, Long-Read Assemblies高质量的基因组尺度模型来自易出错的,长时间读取的程序集
- java 调用ffmpeg 转成mp4_Java+Windows+ffmpeg实现视频转换
- 随机数相关函数rand与srand 以及drand48()与srand48
- cadence导生产文件_写了个Allegro导出bom和坐标文件的教程, 用于JLC的S - 电子制作(ElecDIY)版 - 北大未名BBS...
- 记录---提取合并VCF文件
- No Matter What
- 大学物理第二章笔记——高等农林院校基础课程教程系列
- Delphi10.4使用FireDAC数据访问组件开发数据库软件学习开发教程(1)
- 《月薪5千与月薪5万的文案区别》!如何用立体思维去解决问题
- 【LeetCode题解】347_前K个高频元素(Top-K-Frequent-Elements)