Flink-输出算子(Sink)使用
5.5 输出算子
5.5.1 概述
- 调用print是返回输出类,作为最后一环sink存在
该方法创建了一个PrintSinkFunction 操作,然后作为addSink方法的参数
PrintSinkFunction这个类继承自RichSinkFunction富函数类
- RichSinkFunction类
- 继承了AbstractRichFunction富函数类
因此就可以调用富函数类(是一个实现类)的声明周期方法,例如open,close,以及获取运行时上下文,运行环境,定义状态等等
- RichSinkFunction类同时也实现了SinkFunction这个接口,所以本质上也是SinkFunction
- SinkFunction接口的抽象方法有invoke,传入是value,以及当前的上下文
- 关系图
- 如果需要自定义输出算子
可以调用DataStream的addSink方法
然后传入自己实现的SinkFunction
- flink提供的第三方系统连接器
5.5.2 输出到文件
- StreamingFileSink流文件输出类
- 来源
继承RichSinkFunction类,并实现CheckpointedFunction,CheckpointListener(检查点)
- 创建实例
在StreamingFileSink类中调用forRowFormat()方法传入Path以及Encoder返回StreamingFileSink.DefaultBulkFormatBuilder,DefaultBulkFormatBuilder是一个静态类并继承RowFormatBuilder类,RowFormatBuilder类又继承BucketsBuilder类,底层将数据写入bucket(桶),桶里面分大小存储分区文件,实现了分布式存储
使用Builder构建器构建
RowFormatBuilder是行编码
BulkFormatBuilder是列存储编码格式
关系图
代码
public class SinkToFileTest {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(4);DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),new Event("Bob", "./cart", 2000L),new Event("Alice", "./prod?id=100", 3000L),new Event("Bob", "./prod?id=1", 3300L),new Event("Alice", "./prod?id=200", 3000L),new Event("Bob", "./home", 3500L),new Event("Bob", "./prod?id=2", 3800L),new Event("Bob", "./prod?id=3", 4200L));//2.为了得到并传入SinkFunction,需要构建StreamingFileSink的一个对象//调用forRowFormat方法或者forBulkformat方法得到一个DefaultRowFormatBuilder// 其中forBulkformat方法前面还有类型参数,以及传参要求一个目录名称,一个编码器//写入文件需要序列化,需要定义序列化方法并进行编码转换,当成Stream写入文件//然后再使用builder创建实例StreamingFileSink<String> streamingFileSink = StreamingFileSink.<String>forRowFormat(new Path("./output"),new SimpleStringEncoder<>("UTF-8")).withRollingPolicy(//指定滚动策略,根据事件或者文件大小新产生文件归档保存DefaultRollingPolicy.builder()//使用builder构建实例.withMaxPartSize(1024 * 1024 * 1024).withRolloverInterval(TimeUnit.MINUTES.toMinutes(15))//事件间隔毫秒数.withInactivityInterval(TimeUnit.MINUTES.toMinutes(15))//当前不活跃的间隔事件,隔多长事件没有数据到来.build()).build();//1.写入文件调用addSink()方法,并传入SinkFunctionstream.map(data -> data.toString())//把Event类型转换成String.addSink(streamingFileSink);env.execute();}
}
- 结果
5.5.3 输出到kafka
构造FlinkKafkaProducer类传入三个参数:brokerList(主机+端口号)和topicId(topic)以及serializationSchema(编码序列化)完成构造
- 代码
public class SinkToKafka {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//1.从kafka中读取数据Properties properties = new Properties();properties.setProperty("bootstrap.servers","hadoop2:9092");properties.setProperty("group.id", "consumer-group");DataStreamSource<String> kafkaStream = env.addSource(new FlinkKafkaConsumer<String>("clicks", new SimpleStringSchema(), properties));//2.用flink进行简单的etl处理转换SingleOutputStreamOperator<String> result = kafkaStream.map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {String[] fields = value.split(",");return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();}});//3.结果数据写入kafka//FlinkKafkaProducer传参borckList,topicid,序列化result.addSink(new FlinkKafkaProducer<String>("hadoop2:9092","events",new SimpleStringSchema()));env.execute();}
}
- kafka输出结果
5.5.4 输出到redis
- 引入依赖
<dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version>
</dependency>
- 分析
- RedisSink类分析
RedisSink类继承自RichSinkFunction
- 参数分析
去调构造方法,传入redis集群的配置FlinkJedisConfigBase以及RedisMapper写入命令
new FlinkJedisConfigBase的时候,可以使用FlinkJedisPoolConfig没毛病,直接继承的FlinkJedisConfigBase
FlinkJedisConfigBase是一个接口
实例FlinkJedisPoolConfig的时候也是使用的构造器Builder()的设计模式即,同样再使用.build实例它
- 第二个参数分析
RedisMapper是一个接口
自定义一个实现类并重写方法getCommandDescription(),getKeyFromData(Event data),getValueFromData(Event data)
- 关系图
- 代码
public class SinkToRedis {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//1.输入ClickSource是自定义输入DataStreamSource<Event> stream = env.addSource(new ClickSource());//2.创建一个jedis连接配置//FlinkJedisPoolConfig直接继承的FlinkJedisConfigBaseFlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder().setHost("hadoop2").build();//3.写入redisstream.addSink(new RedisSink<>(config,new MyRedisMapper()));env.execute();}//3.自定义类实现 redisMapper接口public static class MyRedisMapper implements RedisMapper<Event>{@Override//返回一个redis命令的描述public RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.HSET,"clicks");//写入哈希表}@Override//把key定义成userpublic String getKeyFromData(Event data) {return data.user;}@Override//把value定义成urlpublic String getValueFromData(Event data) {return data.url;}}
}
- 结果
运行redis
[hadoop1@hadoop2 redis]$ ./src/redis-server [hadoop1@hadoop2 bin]$ pwd
/usr/local/bin
5.5.5 输出到ElasticSearch
- 引入依赖
<dependency><groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifact
Id>
<version>${flink.version}</version>
</dependency>
- 分析
- ElasticsearchSink类分析
ElasticsearchSink类继承ElasticsearchSinkBase抽象类,ElasticsearchSinkBase抽象类继承RichSinkFunction接口
- 实例
ElasticsearchSink类调用Builder()传入参数是List和ElasticsearchSinkFunction
HttpHost需要参数主机名和端口号
是一个接口,写一个实现类重写他的方法,写入逻辑
- 关系图
- 代码
public class SinToES {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//1.输入DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),new Event("Bob", "./cart", 2000L),new Event("Alice", "./prod?id=100", 3000L),new Event("Bob", "./prod?id=1", 3300L),new Event("Alice", "./prod?id=200", 3000L),new Event("Bob", "./home", 3500L),new Event("Bob", "./prod?id=2", 3800L),new Event("Bob", "./prod?id=3", 4200L));//2.定义hosts的列表ArrayList<HttpHost> httpHosts = new ArrayList<>();httpHosts.add(new HttpHost("hadoop",9200));//3.定义ElasticsearchSinkFunction<T>,是个接口,重写process方法//向es发送请求,并插入数据ElasticsearchSinkFunction<Event> elasticsearchSinkFunction = new ElasticsearchSinkFunction<Event>() {@Override//输入,运行上下文,发送任务请求public void process(Event element, RuntimeContext ctx, RequestIndexer indexer) {HashMap<String, String> map = new HashMap<>();map.put(element.user, element.url);//构建一个indexrequestIndexRequest request = Requests.indexRequest().index("clicks").type("types").source(map);indexer.add(request);}};//4.写入es//传入参数是List<HttpHost>和ElasticsearchSinkFunction<T>stream.addSink(new ElasticsearchSink.Builder<>(httpHosts,elasticsearchSinkFunction).build());env.execute();}
}
- 结果
5.5.6 输入到Mysql
- 引入依赖
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId><version>${flink.version}</version>
</dependency>
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.47</version>
</dependency>
- 分析
- JdbcSink来源
无继承,无实现
定义了sink方法,三个参数,sql,JdbcStatementBuilder构造,JdbcConnectionOptions等sql的连接配置,然后返回SinkFunction
- 参数分析
JdbcStatementBuilder是个接口,实现了BiConsumerWithException接口
单一抽象方法accept(),lambda使用
构造器私有,因此调用JdbcConnectionOptionsBuilder.build()进行实例化
- 关系图
- 代码
public class SinkToMysql {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//1.输入DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),new Event("Bob", "./cart", 2000L),new Event("Alice", "./prod?id=100", 3000L),new Event("Bob", "./prod?id=1", 3300L),new Event("Alice", "./prod?id=200", 3000L),new Event("Bob", "./home", 3500L),new Event("Bob", "./prod?id=2", 3800L),new Event("Bob", "./prod?id=3", 4200L));//三个参数,sql,JdbcStatementBuilder构造,JdbcConnectionOptions等sql的连接配置stream.addSink(JdbcSink.sink("INSERT INTO clicks (user,url) VALUES(?,?)",((statement,event)->{statement.setString(1,event.user);statement.setString(2,event.url);}),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://localhost:3306/test2").withDriverName("com.mysql.jdbc.Driver").withUsername("root").withPassword("123456").build()));env.execute();}
}
- mysql前期准备
- 创建mysql的test2
- 创建clicks表
mysql> create table clicks(-> user varchar(20) not null,-> url varchar(100) not null);
Query OK, 0 rows affected (0.02 sec)
- 结果
5.5.7 自定义Sink输出
- 分析
调用DataStream的addSink()方法,并传入自定义好的SinkFunction(采用富函数类),重写关键方法invoke(),并且重写富函数类的生命周期相关方法open和close
- 导入依赖
<dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>${hbase.version}</version>
</dependency>
- 代码
略
Flink-输出算子(Sink)使用相关推荐
- 大数据之flink常用算子
Flink分为: DataSet(批处理),DataStream(流处理),他们的方法都分别为Source.Transformation.Sink: Source:负责数据的读取 Transforma ...
- 解决Flink输出日志中时间比当前时间晚8个小时的问题
Flink安装在CentOS7上,默认时间是UTC时间,查看Flink日志,发现输出时间比当前时间晚8个小时. 通过如下命令,调整成北京时间 cp /usr/share/zoneinfo/Asia/S ...
- Flink SQL 自定义 Sink
1. 背景 2. 步骤 3.自定义 sink 代码 4. 使用 Redis Sink 5.详细解释 6.原理 7.参考 1.背景 内部要做 Flink SQL 平台,本文以自定义 Redis Sink ...
- 【Flink】Flink 自定义 redis sink
1.概述 内部要做 Flink SQL 平台,本文以自定义 Redis Sink 为例来说明 Flink SQL 如何自定义 Sink 以及自定义完了之后如何使用 基于 Flink 1.11 2.步骤 ...
- flink常见算子的一些操作
常见Transformation操作 map和filter /*** 数据源:1 2 3 4 5.....源源不断过来* 通过map打印一下接受到数据* 通过filter过滤一下数据,我们只需要偶数* ...
- Flink sql:Table sink doesn‘t support consuming update and delete changes which is produced by node
一.问题描述 Flink sql将kafka作为join的输出,报错: Exception in thread "main" org.apache.flink.table.api. ...
- Flink实操 : Sink操作
. 一 .前言 二 .类型 2.1. 基于本地集合的sink 2.2. 基于文件的sink 2.2.1.将数据写入本地文件 2.2.2.将数据写入HDFS 2.3. Kafka Sink 2.4. M ...
- flink source和sink
flink中的source作为整个stream中的入口,而sink作为整个stream的终点. SourceFunction为所有flink中source的根接口,其定义了run()方法和cancel ...
- flink API之Sink入门
kafka sink 添加依赖 <dependency><groupId>org.apache.flink</groupId><artifactId>f ...
最新文章
- redis如何通过读写分离来承载读请求QPS超过10万多
- mysql-8.0.17解压版安装步骤及MySQL服务无法启动问题的解决办法
- python介绍和用途-python匿名函数的介绍及用途
- 【Paper】2015_El H_Decentralized Control Architecture for UAV-UGV Cooperation
- hwnd = 0 各种粗心大意啊!
- 基于JavaSwing+Mysql点餐系统设计和实现
- 寂寞了就去搞钱?俞敏洪举报“俞敏洪”:“搞钱论”没一句话是我说的
- 列表元素循环移位中Python切片的妙用
- skills --札记
- vue.js 入门案例,双向绑定实现任务清单
- 「leetcode」 1382. 将二叉搜索树变平衡:【构造平衡二叉搜索树】详解
- yyds!Java 性能优化的 50 个细节(珍藏版)
- 京东风格的移动端Vue组件库NutUI2.0来啦
- Unity中的几个”近义词”
- 特洛伊木马与计算机病毒有什么区别,特洛伊木马Vs病毒Vs蠕虫, 有什么区别?...
- Git小乌龟(TortoiseGit) 简单提交代码到github
- matlab 交换两列数据,在MATLAB单元阵列的两列中列出单元格内容(List cell contents in two columns of MATLAB cell array)...
- b站修改密码服务器错误,提示账号或者密码错误,无法正常登陆
- dns被劫持了怎么处理 5种方法教你处理
- 阿里云 Elasticsearch简介和购买流程