5.5 输出算子

5.5.1 概述

  1. 调用print是返回输出类,作为最后一环sink存在

该方法创建了一个PrintSinkFunction 操作,然后作为addSink方法的参数

PrintSinkFunction这个类继承自RichSinkFunction富函数类

  1. RichSinkFunction类
  • 继承了AbstractRichFunction富函数类

因此就可以调用富函数类(是一个实现类)的声明周期方法,例如open,close,以及获取运行时上下文,运行环境,定义状态等等

  • RichSinkFunction类同时也实现了SinkFunction这个接口,所以本质上也是SinkFunction

  • SinkFunction接口的抽象方法有invoke,传入是value,以及当前的上下文
  1. 关系图

  1. 如果需要自定义输出算子

可以调用DataStream的addSink方法

然后传入自己实现的SinkFunction

  1. flink提供的第三方系统连接器

5.5.2 输出到文件

  1. StreamingFileSink流文件输出类
  • 来源

继承RichSinkFunction类,并实现CheckpointedFunction,CheckpointListener(检查点)

  • 创建实例

在StreamingFileSink类中调用forRowFormat()方法传入Path以及Encoder返回StreamingFileSink.DefaultBulkFormatBuilder,DefaultBulkFormatBuilder是一个静态类并继承RowFormatBuilder类,RowFormatBuilder类又继承BucketsBuilder类,底层将数据写入bucket(桶),桶里面分大小存储分区文件,实现了分布式存储

使用Builder构建器构建

RowFormatBuilder是行编码

BulkFormatBuilder是列存储编码格式

  • 关系图

  • 代码

public class SinkToFileTest {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(4);DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),new Event("Bob", "./cart", 2000L),new Event("Alice", "./prod?id=100", 3000L),new Event("Bob", "./prod?id=1", 3300L),new Event("Alice", "./prod?id=200", 3000L),new Event("Bob", "./home", 3500L),new Event("Bob", "./prod?id=2", 3800L),new Event("Bob", "./prod?id=3", 4200L));//2.为了得到并传入SinkFunction,需要构建StreamingFileSink的一个对象//调用forRowFormat方法或者forBulkformat方法得到一个DefaultRowFormatBuilder//  其中forBulkformat方法前面还有类型参数,以及传参要求一个目录名称,一个编码器//写入文件需要序列化,需要定义序列化方法并进行编码转换,当成Stream写入文件//然后再使用builder创建实例StreamingFileSink<String> streamingFileSink = StreamingFileSink.<String>forRowFormat(new Path("./output"),new SimpleStringEncoder<>("UTF-8")).withRollingPolicy(//指定滚动策略,根据事件或者文件大小新产生文件归档保存DefaultRollingPolicy.builder()//使用builder构建实例.withMaxPartSize(1024 * 1024 * 1024).withRolloverInterval(TimeUnit.MINUTES.toMinutes(15))//事件间隔毫秒数.withInactivityInterval(TimeUnit.MINUTES.toMinutes(15))//当前不活跃的间隔事件,隔多长事件没有数据到来.build()).build();//1.写入文件调用addSink()方法,并传入SinkFunctionstream.map(data -> data.toString())//把Event类型转换成String.addSink(streamingFileSink);env.execute();}
}
  • 结果

5.5.3 输出到kafka

构造FlinkKafkaProducer类传入三个参数:brokerList(主机+端口号)和topicId(topic)以及serializationSchema(编码序列化)完成构造

  1. 代码
public class SinkToKafka {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//1.从kafka中读取数据Properties properties = new Properties();properties.setProperty("bootstrap.servers","hadoop2:9092");properties.setProperty("group.id", "consumer-group");DataStreamSource<String> kafkaStream = env.addSource(new FlinkKafkaConsumer<String>("clicks", new SimpleStringSchema(), properties));//2.用flink进行简单的etl处理转换SingleOutputStreamOperator<String> result = kafkaStream.map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {String[] fields = value.split(",");return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();}});//3.结果数据写入kafka//FlinkKafkaProducer传参borckList,topicid,序列化result.addSink(new FlinkKafkaProducer<String>("hadoop2:9092","events",new SimpleStringSchema()));env.execute();}
}
  1. kafka输出结果

5.5.4 输出到redis

  1. 引入依赖
<dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version>
</dependency>
  1. 分析
  • RedisSink类分析

RedisSink类继承自RichSinkFunction

  • 参数分析

去调构造方法,传入redis集群的配置FlinkJedisConfigBase以及RedisMapper写入命令

new FlinkJedisConfigBase的时候,可以使用FlinkJedisPoolConfig没毛病,直接继承的FlinkJedisConfigBase

FlinkJedisConfigBase是一个接口

实例FlinkJedisPoolConfig的时候也是使用的构造器Builder()的设计模式即,同样再使用.build实例它

  • 第二个参数分析

RedisMapper是一个接口

自定义一个实现类并重写方法getCommandDescription(),getKeyFromData(Event data),getValueFromData(Event data)

  • 关系图

  1. 代码
public class SinkToRedis {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//1.输入ClickSource是自定义输入DataStreamSource<Event> stream = env.addSource(new ClickSource());//2.创建一个jedis连接配置//FlinkJedisPoolConfig直接继承的FlinkJedisConfigBaseFlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder().setHost("hadoop2").build();//3.写入redisstream.addSink(new RedisSink<>(config,new MyRedisMapper()));env.execute();}//3.自定义类实现 redisMapper接口public static class MyRedisMapper implements RedisMapper<Event>{@Override//返回一个redis命令的描述public RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.HSET,"clicks");//写入哈希表}@Override//把key定义成userpublic String getKeyFromData(Event data) {return data.user;}@Override//把value定义成urlpublic String getValueFromData(Event data) {return data.url;}}
}
  1. 结果

运行redis

[hadoop1@hadoop2 redis]$ ./src/redis-server [hadoop1@hadoop2 bin]$ pwd
/usr/local/bin

5.5.5 输出到ElasticSearch

  1. 引入依赖
<dependency><groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifact
Id>
<version>${flink.version}</version>
</dependency>
  1. 分析
  • ElasticsearchSink类分析

ElasticsearchSink类继承ElasticsearchSinkBase抽象类,ElasticsearchSinkBase抽象类继承RichSinkFunction接口

  • 实例

ElasticsearchSink类调用Builder()传入参数是List和ElasticsearchSinkFunction


HttpHost需要参数主机名和端口号


是一个接口,写一个实现类重写他的方法,写入逻辑

  • 关系图
  1. 代码
public class SinToES {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//1.输入DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),new Event("Bob", "./cart", 2000L),new Event("Alice", "./prod?id=100", 3000L),new Event("Bob", "./prod?id=1", 3300L),new Event("Alice", "./prod?id=200", 3000L),new Event("Bob", "./home", 3500L),new Event("Bob", "./prod?id=2", 3800L),new Event("Bob", "./prod?id=3", 4200L));//2.定义hosts的列表ArrayList<HttpHost> httpHosts = new ArrayList<>();httpHosts.add(new HttpHost("hadoop",9200));//3.定义ElasticsearchSinkFunction<T>,是个接口,重写process方法//向es发送请求,并插入数据ElasticsearchSinkFunction<Event> elasticsearchSinkFunction = new ElasticsearchSinkFunction<Event>() {@Override//输入,运行上下文,发送任务请求public void process(Event element, RuntimeContext ctx, RequestIndexer indexer) {HashMap<String, String> map = new HashMap<>();map.put(element.user, element.url);//构建一个indexrequestIndexRequest request = Requests.indexRequest().index("clicks").type("types").source(map);indexer.add(request);}};//4.写入es//传入参数是List<HttpHost>和ElasticsearchSinkFunction<T>stream.addSink(new ElasticsearchSink.Builder<>(httpHosts,elasticsearchSinkFunction).build());env.execute();}
}
  1. 结果

5.5.6 输入到Mysql

  1. 引入依赖
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId><version>${flink.version}</version>
</dependency>
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.47</version>
</dependency>
  1. 分析
  • JdbcSink来源

无继承,无实现

定义了sink方法,三个参数,sql,JdbcStatementBuilder构造,JdbcConnectionOptions等sql的连接配置,然后返回SinkFunction

  • 参数分析

JdbcStatementBuilder是个接口,实现了BiConsumerWithException接口

单一抽象方法accept(),lambda使用

构造器私有,因此调用JdbcConnectionOptionsBuilder.build()进行实例化

  • 关系图

  1. 代码
public class SinkToMysql {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//1.输入DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),new Event("Bob", "./cart", 2000L),new Event("Alice", "./prod?id=100", 3000L),new Event("Bob", "./prod?id=1", 3300L),new Event("Alice", "./prod?id=200", 3000L),new Event("Bob", "./home", 3500L),new Event("Bob", "./prod?id=2", 3800L),new Event("Bob", "./prod?id=3", 4200L));//三个参数,sql,JdbcStatementBuilder构造,JdbcConnectionOptions等sql的连接配置stream.addSink(JdbcSink.sink("INSERT INTO clicks (user,url) VALUES(?,?)",((statement,event)->{statement.setString(1,event.user);statement.setString(2,event.url);}),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://localhost:3306/test2").withDriverName("com.mysql.jdbc.Driver").withUsername("root").withPassword("123456").build()));env.execute();}
}
  1. mysql前期准备
  • 创建mysql的test2
  • 创建clicks表
mysql> create table clicks(-> user varchar(20) not null,-> url varchar(100) not null);
Query OK, 0 rows affected (0.02 sec)
  1. 结果

5.5.7 自定义Sink输出

  1. 分析

调用DataStream的addSink()方法,并传入自定义好的SinkFunction(采用富函数类),重写关键方法invoke(),并且重写富函数类的生命周期相关方法open和close

  1. 导入依赖
<dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>${hbase.version}</version>
</dependency>
  1. 代码

Flink-输出算子(Sink)使用相关推荐

  1. 大数据之flink常用算子

    Flink分为: DataSet(批处理),DataStream(流处理),他们的方法都分别为Source.Transformation.Sink: Source:负责数据的读取 Transforma ...

  2. 解决Flink输出日志中时间比当前时间晚8个小时的问题

    Flink安装在CentOS7上,默认时间是UTC时间,查看Flink日志,发现输出时间比当前时间晚8个小时. 通过如下命令,调整成北京时间 cp /usr/share/zoneinfo/Asia/S ...

  3. Flink SQL 自定义 Sink

    1. 背景 2. 步骤 3.自定义 sink 代码 4. 使用 Redis Sink 5.详细解释 6.原理 7.参考 1.背景 内部要做 Flink SQL 平台,本文以自定义 Redis Sink ...

  4. 【Flink】Flink 自定义 redis sink

    1.概述 内部要做 Flink SQL 平台,本文以自定义 Redis Sink 为例来说明 Flink SQL 如何自定义 Sink 以及自定义完了之后如何使用 基于 Flink 1.11 2.步骤 ...

  5. flink常见算子的一些操作

    常见Transformation操作 map和filter /*** 数据源:1 2 3 4 5.....源源不断过来* 通过map打印一下接受到数据* 通过filter过滤一下数据,我们只需要偶数* ...

  6. Flink sql:Table sink doesn‘t support consuming update and delete changes which is produced by node

    一.问题描述 Flink sql将kafka作为join的输出,报错: Exception in thread "main" org.apache.flink.table.api. ...

  7. Flink实操 : Sink操作

    . 一 .前言 二 .类型 2.1. 基于本地集合的sink 2.2. 基于文件的sink 2.2.1.将数据写入本地文件 2.2.2.将数据写入HDFS 2.3. Kafka Sink 2.4. M ...

  8. flink source和sink

    flink中的source作为整个stream中的入口,而sink作为整个stream的终点. SourceFunction为所有flink中source的根接口,其定义了run()方法和cancel ...

  9. flink API之Sink入门

    kafka sink 添加依赖 <dependency><groupId>org.apache.flink</groupId><artifactId>f ...

最新文章

  1. redis如何通过读写分离来承载读请求QPS超过10万多
  2. mysql-8.0.17解压版安装步骤及MySQL服务无法启动问题的解决办法
  3. python介绍和用途-python匿名函数的介绍及用途
  4. 【Paper】2015_El H_Decentralized Control Architecture for UAV-UGV Cooperation
  5. hwnd = 0 各种粗心大意啊!
  6. 基于JavaSwing+Mysql点餐系统设计和实现
  7. 寂寞了就去搞钱?俞敏洪举报“俞敏洪”:“搞钱论”没一句话是我说的
  8. 列表元素循环移位中Python切片的妙用
  9. skills --札记
  10. vue.js 入门案例,双向绑定实现任务清单
  11. 「leetcode」 1382. 将二叉搜索树变平衡:【构造平衡二叉搜索树】详解
  12. yyds!Java 性能优化的 50 个细节(珍藏版)
  13. 京东风格的移动端Vue组件库NutUI2.0来啦
  14. Unity中的几个”近义词”
  15. 特洛伊木马与计算机病毒有什么区别,特洛伊木马Vs病毒Vs蠕虫, 有什么区别?...
  16. Git小乌龟(TortoiseGit) 简单提交代码到github
  17. matlab 交换两列数据,在MATLAB单元阵列的两列中列出单元格内容(List cell contents in two columns of MATLAB cell array)...
  18. b站修改密码服务器错误,提示账号或者密码错误,无法正常登陆
  19. dns被劫持了怎么处理 5种方法教你处理
  20. 阿里云 Elasticsearch简介和购买流程

热门文章

  1. 你的样子 - 罗大佑
  2. SpringBoot--启动时的事件机制
  3. 1.6 Terrain
  4. PAT1080 Graduate Admission (30)
  5. iOS- NSTimeInterval获取时间间隔
  6. 从零开始微信公众号开发
  7. JavaWeb课程设计——图书管理系统
  8. 多线程 - Windows下线程池的使用
  9. JSP实现简易购物商城
  10. 跟着Nature学绘图!基于ggplot2的生存曲线绘制R包