1.4 Flink HDFS Connector /Flink HDFS连接器
在上一章节已经翻译了Flink Kafka Connector,但由于HDFS作为大多数研究大数据者日常用到的,此章节并添加翻译HDFS的连接器。
此连接器提供了一个Sink,将分区文件写入Hadoop FileSystem支持的任何文件系统。要使用此连接器,请将以下依赖项添加到您的项目中:
<dependency>
<groupId> org.apache.flink </groupId>
<artifactId> flink-connector-filesystem_2.10 </artifactId>
<version> 1.2.0 </version>
</dependency>
请注意,流连接器当前不是二进制分发的一部分。有关如何将程序与程序库打包以进行集群执行的信息,请参阅此处。
折叠文件接收器(Bucketing File Sink)
可以配置压力行为以及写入操作,但我们稍后将会介绍。这是你如何创建一个耐心的病人,默认情况下,它会收敛到按时间分割的滚动文件:
<Java代码>
DataStream <String> input = ...;
input.addSink(new BucketingSink <String>(“/base/path”));
唯一必需的参数是存储桶的基本路径。可以通过指定自定义bucketer,写入器和批量大小来进一步配置接收器。
默认情况下,当元素到达时,当前的系统时间将会降级,并使用日期时间模式“yyyy-MM-dd-HH”命名这些存储区。此模式将传递给具有当前系统时间的SimpleDateFormat以形成存储桶路径。每当遇到新的日期时,都会创建一个新的桶。例如,如果您有一个包含分钟作为最细粒度的模式,您将每分钟获得一个新的桶。每个桶本身是一个包含几个零件文件的目录:每个并行实例的接收器将创建自己的零件文件,当零件文件变得太大时,槽也将在其他文件旁边创建一个新的零件文件。当桶变得不活动时,打开的零件文件将被刷新并关闭。当最近没有写入时,桶被视为不活动。默认情况下,接收器每分钟检查不活动的桶,并关闭一分钟内未写入的任何桶。可以在BucketingSink上使用setInactiveBucketCheckInterval()和setInactiveBucketThreshold()配置此行为。
您也可以使用BucketingSink上的setBucketer()指定自定义bucketer。如果需要,bucketer可以使用元素或元组的属性来确定bucket目录。
默认的作者是StringWriter。这将调用toString()对传入的元素,并将它们写入部分文件,用换行符分隔。要在BucketingSink上指定一个自定义的作者,请使用setWriter()。如果要编写Hadoop SequenceFiles,可以使用提供的SequenceFileWriter,它也可以配置为使用压缩。
最后一个配置选项是批量大小。这指定何时应该关闭零件文件并启动一个新的零件。 (默认部分文件大小为384 MB)。
例:
<Java代码>
DataStream <Tuple2 <IntWritable,Text >> input = ...;
sinketingSink <String> sink = new BucketingSink <String>(“/ base / path”);
sink.setBucketer(new DateTimeBucketer <String>(“yyyy-MM-dd-HHmm”));
sink.setWriter(new SequenceFileWriter <IntWritable,Text>());
sink.setBatchSize(1024 * 1024 * 400); //这是400 MB,
input.addSink(sink);
这将创建一个写入到遵循此模式的桶文件的接收器:
/ base / path / {date-time} / part- {parallel-task} - {count}
其中date-time是从日期/时间格式获取的字符串,parallel-task是并行接收器实例的索引,count是由于批量大小而创建的部分文件的运行数。
转载于:https://blog.51cto.com/12819839/1916383
1.4 Flink HDFS Connector /Flink HDFS连接器相关推荐
- Could not create FileSystem for highly available storage path (hdfs://node7-1/flink/ha/flinkCluster)
flink 集群 Standalone 模式 高可用部署无法启动已解决 hadoop , zookeeper,工作正常,flink-standalone 启动正常.在搭建HA集群时,集群启动未报错,查 ...
- Apache Flink 零基础入门(二十)Flink kafka connector
内置source和sink 内置source包括从文件读取,从文件夹读取,从socket中读取.从集合或者迭代器中读取.内置的sink包括写文件.控制台输出.socket 内置connectors A ...
- flink redis connector(支持flink sql)
flink redis connector(支持flink sql) 1. 背景 工作原因,需要基于flink sql做redis sink,但bahir 分支的flink connector支持只是 ...
- 【Flink】Apache Flink 1.13.0 正式发布,流处理应用更加简单高效
1.概述 转载:Apache Flink 1.13.0 正式发布,流处理应用更加简单高效 侵权可删,这里是做个笔记,防止找不到. Flink 1.13 发布了!Flink 1.13 包括了超过 200 ...
- Flink(初识Flink,快速上手)
目录 初识Flink Flink设计理念 Flink的应用 Flink在企业中的应用 Flink的主要应用场景 流式数据处理的发展和演变 流处理和批处理 传统事务处理 有状态的流处理 Lambda 架 ...
- Flink知识点总结 Flink简介
Flink简介 Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算.并且 Flink 提供了数据分布.容错机制以及资源管理等核心功能.Flink提供了诸多高抽象层的API以便 ...
- 【Flink基础】-- Flink CDC介绍
一.Flink CDC 是什么? 2020年 Flink cdc 首次在 Flink forward 大会上官宣, 由 Jark Wu & Qingsheng Ren 两位大佬 介绍,原始 b ...
- HDFS权限设置 \ HDFS涉及ACLs的命令
本文主要参考:https://www.cnblogs.com/royfans/p/7326859.html https://www.cppentry.com/bencandy.php?fid=115& ...
- 《从0到1学习Flink》—— 介绍Flink中的Stream Windows
前言 目前有许多数据分析的场景从批处理到流处理的演变, 虽然可以将批处理作为流处理的特殊情况来处理,但是分析无穷集的流数据通常需要思维方式的转变并且具有其自己的术语(例如,"windowin ...
最新文章
- PHP 防XSS跨站攻击
- mysql 自增列坏处_MySQL--更新自增列的潜在风险
- vue 方法获取返回值_vue.js - vuex异步提交,怎么获取返回数据
- sql行数少于10_如何用少于100行的代码创建生成艺术
- 介绍一种新的激活函数族ACON
- 第十章:在Spark集群上掌握比较重要的图操作之Computing Degree
- Web API Filter ActionFilterAttribute 使用
- 坐标偏差大_三坐标常见撞针原因,总结的太到位了!
- 建立唯一索引后mysql策略_【MySQL】MySQL索引背后的之使用策略及优化【转】
- 在ASP.NET中如何添加过滤器
- 关于DYNPRO程序的系统迁移与版本不匹配问题之一
- 【杂谈】我学习这么好,为什么找不到工作?
- JSONArray.fromObject(str)
- scratch---迷宫夺宝游戏实现,完整代码分享,多种地图与角色特效都有素材包!
- SAP BAPI_EXCHANGERATE_GETDETAIL计算货币转换汇率
- 群晖java安装失败_群晖NAS安装Jenkins
- 【IEEE_Verilog-4.4】Verilog中的充电强度charge strength和驱动强度drive strength
- Java SDK和Java JDK的区别
- 调节latex表格中的字体大小
- linux如何配浏览器证书,部署国密SSL证书,如何兼容国际主流浏览器?
热门文章
- 二十年后我发明了保姆机器人作文_小学生作文“二十年后的我”走红,老师看完气愤,让学生站着听课...
- 邵阳市计算机学校2021,邵阳市计算机中等专业学校在2021年邵阳市中等职业学校班主任基本功比赛中荣获市一等奖...
- 织梦网站调用变量失败_(自适应手机版)响应式精密机械模具类网站织梦模板 织梦仪器模具加工设备网站模板下载...
- python计算两个正整数的和差积商_已知两个正整数的和与积求这两个数
- python dataframe 选取字段 特别慢_从parqu读取dask dataframe列重命名速度较慢(er)
- n3k配置vpc是否还需要配置hsrp_HSRP
- sql 整改措施 注入_记一次Sql注入 解决方案
- h5调微信支付 unkonw url_h5移动端调用支付宝、微信支付的实现
- python random函数shuffle_Python|有趣的shuffle方法
- html一边自动宽度,有2列,希望右侧固定宽度,左侧自动宽度。_html/css_WEB-ITnose...