Flink对接OBS方案
有两种方案可供选择

仿distributehdfs方案:逻辑简单代码量少,约束少,强约束2.Truncate接口有一个强约束:并行文件桶才支持,对象桶不支持(obs方面在大数据场景下主推并行文件桶,不再推对象桶)
仿s3方案:逻辑复杂且可能有约束和场景限制
仿distributehdfs方案
(1)DLI同事要判断一下风险点

StreamingFileSink:OBS内部已经进行了评估,结论是可行,DLI同事也要判断一下风险点
BucketingSink:DLI同事要判断一下
OutPutFormatSinkFunction:DLI同事要判断一下
(2)OBS侧要开发trunct接口

(3)DLI提需求单

仿distributehdfs方案进展
flink进展:flink1.11版本对接obs
1.目标:
(1)obs作为实时流的sink端以及作为checkpoint存储。
(2)作为stream source和批计算场景暂不考虑:因为此部分没有深入研究,后续希望@黄崴 可以提供支撑

2.方案:通过实现Flink的filesystem抽象类,仿照flink源码中的flink-filesystems/flink-hadoop-fs模块(因为OBSFileSystem实现了trunct接口,无需参考s3实现)

3.flink dataStream编程:通过streamFileSink对接obs
已经验证:
(1)FsStateBackend存储后端为obs功能初步验证:ok
(2)streamFileSink对接obs功能初步验证:ok
未验证:
(1)RocksDBStateBackend存储后端为obs未验证
(2)FsStateBackend存储后端为obs故障恢复场景未验证
(3)FsStateBackend存储后端为obs性能未验证
(4)streamFileSink对接obs性能未验证

Flink消费Kafka数据,写入HDFS - 使用 StreamingFileSink_magic_kid_2010的博客-CSDN博客_oncheckpointrollingpolicy

4.flink table/sql编程:(“方案”中的方法是否可以支撑下述两种场景。希望@黄崴 可以提供信息支撑)
(1)实时写入filesystem(OBS) Flink教程-flink 1.11使用sql将流式数据写入文件系统_大数据技术与应用实战的博客-CSDN博客_flink sql 写文件
(2)实时写入OBS上的hive Flink教程--flink 1.11 使用sql将流式数据写入hive_大数据技术与应用实战的博客-CSDN博客_flinksql 写入hive

flink table/sql编程对接obs整体均未验证

仿s3方案(不走此路径)
StreamingFileSink机制对接
OBS侧放开了一些接口
DLI已经开发联调完毕
DLI已经提需求单要求正式出包
Flink对接OBS开发
关于代码和jar构建问题:目前工程和依赖编译是仿照flink-azure-fs-hadoop,代码实现是仿照flink-hadoop-fs和部分flink-azure/oss-fs-hadoop,其依赖了flink-fs-hadoop-shaded模块(maven仓库中没有),后面依赖编译要仿照flink-s3-fs-hadoop模块进行
关于配置文件问题:要研究清楚flink-hadoop-fs 模块中HadoopFsFactory中关于配置相关的代码
测试相关:
(1)单元集成测试:org.apache.flink.core.fs包下的测试用例和org.apache.flink.runtime.fs.hdfs包下的测试用例

(2)性能测试用例:

和hdfs对比

1.Intel的Hi-Bench:
GitHub - Intel-bigdata/HiBench: HiBench is a big data benchmark suite.
2.Yahoo的streaming-benchmarks:
GitHub - yahoo/streaming-benchmarks: Benchmarks for Low Latency (Streaming) solutions including Apache Storm, Apache Spark, Apache Flink, ...
Yahoo的流计算引擎基准测试 | 并发编程网 – ifeve.com
%8E%E5%9F%BA%E5%87%86%E6%B5%8B%E8%AF%95/

3.flink batch sql测试:
GitHub - ververica/flink-sql-benchmark
Flink 1.10 和 Hive 3.0 性能对比(附 Demo 演示 PPT)_Apache Flink的博客-CSDN博客

4.其他Flink基准测试
流计算框架 Flink 与 Storm 的性能对比 - 美团技术团队
360深度实践:Flink与Storm协议级对比_高可用架构的博客-CSDN博客
http://www.louisvv.com/archives/2216.html
干货 | Flink及主流流框架比较
GitHub - apache/flink-benchmarks: Benchmarks for Apache Flink

附:Flink的文件系统
1.Apache Flink 1.10 Documentation: 文件系统

2.Apache Flink 1.10 Documentation: File Systems:

总结(非常重要)
Flink实战之StreamingFileSink如何写数据到其它HA的Hadoop集群_scx_white的博客-CSDN博客

Flink有自己的文件系统抽象(主要包含文件系统操作和故障恢复语义),寻找实现的过程如下:

首先通过java service loader机制在meta-info的service目录下寻找所有的org.apache.flink.core.fs.FileSystemFactory(LocalFileSystemFactory也显式加载进去)
通过路径的shcema的判断是否有自己的对应实现:
如果有就通过对应的FileSystemFactory创建flink FileSystem的实现;

如果没有就通过flink-hadoop-fs模块中的HadoopFsFactory创建flink FileSystem的实现,即桥接到hadoop fileSystem;

对flink-hadoop-fs模块的理解:
对flink Filesystem的实现之一
一方面作为flink FileSystem和hadoop FileSystem的桥接模块;
一方面在distributeHDFS的基础上实现了flink FileSystem中的故障恢复语义
We recommend using Flink’s built-in file systems unless required otherwise. Using a Hadoop File System directly may be required, for example, when using that file system for YARN’s resource storage, via the fs.defaultFS configuration property in Hadoop’s core-site.xml.
通过SinkFunction实现数据落地到flink FileSystem,总共有三种方案
OutPutFormatSinkFunction方案(位于flink-streaming-java模块)
官方标注OutPutFormatSinkFunction过时了可使用BucketingSink取代
不能进行分区
不能提供exactly-once semantics and fault-tolerance
BucketingSink方案(位于flink-connector-filesystem connector模块)
有一些主要的限制(对S3的一致输出,支持Parquet和ORC等格式),如果不重新设计就无法解决。
StreamingFileSink方案((位于flink-streaming-java模块))
Flink fileSystem的应用场景
读写数据
StateBackend
其他Flink 需要使用文件系统 URI 的位置:JobManager 高可用配置 
 例子:

// 读取 S3 bucketenv.readTextFile("s3://<bucket>/<endpoint>");

// 写入 S3 bucketstream.writeAsText("s3://<bucket>/<endpoint>");

// 使用 S3 作为 FsStatebackendenv.setStateBackend(new FsStateBackend("s3://<your-bucket>/<endpoint>"));

Flink fileSystem的实现
已有实现
Local File System
Amazon S3 object storage is supported by two alternative implementations: flink-s3-fs-presto and flink-s3-fs-hadoop. Both implementations are self-contained with no dependency footprint.
MapR FS file system adapter is already supported in the main Flink distribution under the maprfs:// URI scheme. You must provide the MapR libraries in the classpath (for example in lib directory).
OpenStack Swift FS is supported by flink-swift-fs-hadoop and registered under the swift:// URI scheme. The implementation is based on the Hadoop Project but is self-contained with no dependency footprint. To use it when using Flink as a library, add the respective maven dependency (org.apache.flink:flink-swift-fs-hadoop:1.10.0).
Aliyun Object Storage Service is supported by flink-oss-fs-hadoop and registered under the oss:// URI scheme. The implementation is based on the Hadoop Project but is self-contained with no dependency footprint.
Azure Blob Storage is supported by flink-azure-fs-hadoop and registered under the wasb(s):// URI schemes. The implementation is based on the Hadoop Project but is self-contained with no dependency footprint.
创建一个新的flink fileSystem实现
Add the File System implementation, which is a subclass of org.apache.flink.core.fs.FileSystem.
Add a factory that instantiates that file system and declares the scheme under which the FileSystem is registered. This must be a subclass of org.apache.flink.core.fs.FileSystemFactory.
Add a service entry. Create a file META-INF/services/org.apache.flink.core.fs.FileSystemFactory which contains the class name of your file system factory class (see the Java Service Loader docs for more details).
flink fileSystem的加载使用
Attention The plugin mechanism for file systems was introduced in Flink version 1.9 to support dedicated Java class loaders per plugin and to move away from the class shading mechanism. You can still use the provided file systems (or your own implementations) via the old mechanism by copying the corresponding JAR file into lib directory. However, since 1.10, s3 plugins must be loaded through the plugin mechanism; the old way no longer works as these plugins are not shaded anymore (or more specifically the classes are not relocated since 1.10).

It’s encouraged to use the plugins-based loading mechanism for file systems that support it. Loading file systems components from the lib directory will not supported in future Flink versions.

附:FileSink连接器
FileSink在flink中大致有三种实现方式:

OutPutFormatSinkFunction(内置实现,通过FileOutputFormat构造,过时了)
BucketingSink(位于flink-connector-filesystem connector模块中)
StreamingFileSink(内置实现)
StreamingFileSink和BucketingSink的区别???

状态数据存储在file中使用了上述那种方式???

正在上传…
取消

OutPutFormatSinkFunction
OutPutFormatSinkFunction缺陷:

0.官方标注OutPutFormatSinkFunction过时了可使用BucketingSink取代

1.不能进行分区

2.不能提供exactly-once semantics and fault-tolerance

正在上传…
取消

OutPutFormatSinkFunction原理:通过FileOutputFormat构造

1.其open方法:调用FileOutputFormat的configure和open方法

2.其invoke方法:调用FileOutputFormat的writeRecord()方法进行真正的写操作

3.其close方法:调用FileOutputFormat的close方法

BucketingSink
(1)org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink中的类说明

(2)官方文档:

https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/filesystem_sink.html

网上资料:Flink FileSink 自定义输出路径——BucketingSink - Flink菜鸟 - 博客园
Flink FileSystem的connector分析_Android路上的人的博客-CSDN博客

Flink HDFS Sink 如何保证 exactly-once 语义_kisimple的博客-CSDN博客

Flink从BucketSink看checkpoint与故障恢复 - 简书

BucketingSink缺陷:

有一些主要的限制(对S3的一致输出,支持Parquet和ORC等格式),如果不重新设计就无法解决。StreamingFileSink实现这个新设计,是一个最终将取代的新BucketingSink???但是,正如您在当前版本(Flink 1.6)中注意到的那样,StreamingFileSink还不支持BucketingSink的所有功能)

正在上传…
取消

BucketingSink原理:

1.其initializeState方法:创建了fileSysterm对象,并从持久化状态后端恢复状态

2.其open方法:在processingTimeService中根据inactiveBucketCheckInterval注册了一个定时器,到时间时回调ProcessingTimeCallback接口的onProcessingTime方法,其将关闭不再写入的partFile

3.其invoke方法:

public void invoke(T value) {

//通过分区器Bucketer获取分区路径
   Path bucketPath = bucketer.getBucketPath(clock, new Path(basePath), value);

long currentProcessingTime = processingTimeService.getCurrentProcessingTime();
   BucketState<T> bucketState = state.getBucketState(bucketPath);
   if (bucketState == null) {
      bucketState = new BucketState<>(currentProcessingTime);
      state.addBucketState(bucketPath, bucketState);
   }
   //根据大小(调用Writer的getPos方法获取大小)和时间判断是否进行滚动
   if (shouldRoll(bucketState, currentProcessingTime)) {

//滚动时先关闭(调用writer的close,且将progressing状态的文件变为pedding状态);再创建(调用writer的open方法,此时文件处于progressing状态)
      openNewPartFile(bucketPath, bucketState);
   }
  //调用Writer的write方法进行真正的写入
   bucketState.writer.write(value);
   bucketState.lastWrittenToTime = currentProcessingTime;
}

其snapshotState方法:调用Writer的flush方法
其notifyCheckpointComplete方法:将pedding状态的文件改为finish状态
分区:Bucketer接口

正在上传…
取消

2.滚动:大小和时间

3.文件格式:writer接口

正在上传…
取消

4.和Checkpoint结合提供exactly-once semantics and fault-tolerance:

StreamingFileSink
(1)org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink中的类说明

(2)官方文档:

https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/streamfile_sink.html

1.10.0中文文档:

Apache Flink 1.10 Documentation: Streaming File Sink

(3)网上资料:

Flink StreamingFileSink源码解析_lvwenyuan_1的博客-CSDN博客(源码分析)

Flink FileSink 自定义输出路径——StreamingFileSink、BucketingSink 和 StreamingFileSink简单比较 - Flink菜鸟 - 博客园

Flink-1.10中的StreamingFileSink相关特性

Flink-1.10中的StreamingFileSink相关特性 - 最弱无败 - 博客园

StreamingFileSink压缩与合并小文件:StreamingFileSink压缩与合并小文件 - 简书

StreamFileSink与BucketingSink的区别:

Flink StreamFileSink与BucketingSink的区别_magic_kid_2010的博客-CSDN博客

Flink1.9系列-StreamingFileSink vs BucketingSink篇_枫叶的落寞的博客-CSDN博客

当BucketingSink存在时,为什么我们需要StreamingFileSink?-问答-阿里云开发者社区-阿里云

Flink FileSink 自定义输出路径——StreamingFileSink、BucketingSink 和 StreamingFileSink简单比较..._anfuyi5792的博客-CSDN博客

正在上传…
取消

1.Streaming File Sink 会将数据写入到桶中。由于输入流可能是无界的,因此每个桶中的数据被划分为多个有限大小的文件。如何分桶是可以配置的,默认使用基于时间的分桶策略,这种策略每个小时创建一个新的桶,桶中包含的文件将记录所有该小时内从流中接收到的数据。

2.桶目录中的实际输出数据会被划分为多个部分文件(part file),每一个接收桶数据的 Sink Subtask ,至少包含一个部分文件(part file)。额外的部分文件(part file)将根据滚动策略创建,滚动策略是可以配置的。默认的策略是根据文件大小和超时时间来滚动文件。超时时间指打开文件的最长持续时间,以及文件关闭前的最长非活动时间。

部分文件(part file)可以处于以下三种状态之一:

In-progress :当前文件正在写入中
Pending :当处于 In-progress 状态的文件关闭(closed)了,就变为 Pending 状态
Finished :在成功的 Checkpoint 后,Pending 状态将变为 Finished 状态
处于 Finished 状态的文件不会再被修改,可以被下游系统安全地读取。

3. 使用 StreamingFileSink 时需要启用 Checkpoint ,每次做 Checkpoint 时写入完成。如果 Checkpoint 被禁用,部分文件(part file)将永远处于 'in-progress' 或 'pending' 状态,下游系统无法安全地读取。

4.目前 StreamingFileSink 只支持三种文件系统: HDFS、S3/flink-s3-fs-hadoop和Local

DataStream<Tuple2<LongWritable, Text>> input = ...;

StreamingFileSink<Tuple2<Integer, Integer>> sink = StreamingFileSink

.forRowFormat((new Path(outputPath), new SimpleStringEncoder<>("UTF-8"))

.withBucketAssigner(new KeyBucketAssigner())//分桶策略

.withRollingPolicy(OnCheckpointRollingPolicy.build())//滚动策略

.withOutputFileConfig(config)//输出文件配置

.build();

this.basePath = Preconditions.checkNotNull(basePath);
this.encoder = Preconditions.checkNotNull(encoder);
this.bucketAssigner = Preconditions.checkNotNull(assigner);
this.rollingPolicy = Preconditions.checkNotNull(policy);
this.bucketCheckInterval = bucketCheckInterval;
this.bucketFactory = Preconditions.checkNotNull(bucketFactory);
this.outputFileConfig = Preconditions.checkNotNull(outputFileConfig);

input.addSink(sink);

StreamingFileSink 支持行编码格式和批量编码格式
Row-encoded sink: StreamingFileSink.forRowFormat(basePath, rowEncoder)
Bulk-encoded sink: StreamingFileSink.forBulkFormat(basePath, bulkWriterFactory)
创建行或批量编码的 Sink 时,我们需要指定存储桶的基本路径和数据的编码逻辑。

分桶策略:
Flink 有两个内置的 BucketAssigners :

DateTimeBucketAssigner :默认基于时间的分配器
BasePathBucketAssigner :将所有部分文件(part file)存储在基本路径中的分配器(单个全局桶)
3.滚动策略:

Flink 有两个内置的滚动策略:

DefaultRollingPolicy
partSize

rolloverInterval

inactivityInterval

OnCheckpointRollingPolicy
4.部分文件配置:

(1)默认情况下,部分文件命名格式如下所示:

In-progress / Pending: part-<subtaskIndex>-<partFileIndex>.inprogress.uid
FINISHED: part-<subtaskIndex>-<partFileIndex>
(2)部分文件的索引在每个 subtask 内部是严格递增的(按文件创建顺序)。但是索引并不总是连续的。当 Job 重启后,所有部分文件的索引从 `max part index + 1` 开始, 这里的 `max part index` 是所有 subtask 中索引的最大值。

(3)可以通过 OutputFileConfig 指定部分文件名的前缀和后缀。

源码分析

Flink StreamingFileSink源码解析_lvwenyuan_1的博客-CSDN博客

一个subtask:

对应一个buckects对象和一个RecoverableWriter对象

对应多个bucket对象和bucket对象对应的PartFileWriter

正在上传…
取消

Open方法
调用时机:
干的事情:
根据bucketCheckInterval参数设置定时器
Invoke方法
调用时机:有数据进来
干的事情:
Bucket第一次滚动前:创建或是获取此数据对应的bucket对象,并创建bucket对应的PartFileWriter是对partfile输出流RecoverableFsDataOutputStream的包装:
调用RecoverableFsDataOutputStream RecoverableWriter.open(Path)

向bucket中滚动写数据:调用rollingPolicy.shouldRollOnEvent根据partSize参数进行文件滚动。滚动时:
首先调用closePartFile关闭上周期的文件,变更文件状态为append:
调用PartFileWriter的closeForCommit方法关闭流并返回一个CommitRecoverable对象,将其保存到List<CommitRecoverable>:

调用Committer RecoverableFsDataOutputStream.closeForCommit()

然后调用CommitRecoverable getRecoverable()

创建下周期滚动文件对应的输出流
Bucket<IN, BucketID> onElement(final IN value, final SinkFunction.Context context) throws Exception {
   final long currentProcessingTime = context.currentProcessingTime();

// setting the values in the bucketer context
   bucketerContext.update(
         context.timestamp(),
         context.currentWatermark(),
         currentProcessingTime);

final BucketID bucketId = bucketAssigner.getBucketId(value, bucketerContext);
   final Bucket<IN, BucketID> bucket =  getOrCreateBucketForBucketId(bucketId);
   bucket.write(value, currentProcessingTime);

// we update the global max counter here because as buckets become inactive and
   // get removed from the list of active buckets, at the time when we want to create
   // another part file for the bucket, if we start from 0 we may overwrite previous parts.

this.maxPartCounter = Math.max(maxPartCounter, bucket.getPartCounter());
   return bucket;
}

void write(IN element, long currentTime) throws IOException {
   if (inProgressPart == null || rollingPolicy.shouldRollOnEvent(inProgressPart, element)) {

if (LOG.isDebugEnabled()) {
         LOG.debug("Subtask {} closing in-progress part file for bucket id={} due to element {}.",
               subtaskIndex, bucketId, element);
      }

rollPartFile(currentTime);
   }
   inProgressPart.write(element, currentTime);
}

private void rollPartFile(final long currentTime) throws IOException {
   closePartFile();

final Path partFilePath = assembleNewPartPath();
   final RecoverableFsDataOutputStream stream = fsWriter.open(partFilePath);
   inProgressPart = partFileFactory.openNew(bucketId, stream, partFilePath, currentTime);

if (LOG.isDebugEnabled()) {
      LOG.debug("Subtask {} opening new part file \"{}\" for bucket id={}.",
            subtaskIndex, partFilePath.getName(), bucketId);
   }

partCounter++;
}

private CommitRecoverable closePartFile() throws IOException {
   CommitRecoverable committable = null;
   if (inProgressPart != null) {
      committable = inProgressPart.closeForCommit();
      pendingPartsForCurrentCheckpoint.add(committable);
      inProgressPart = null;
   }
   return committable;
}

RecoverableWriter.CommitRecoverable closeForCommit() throws IOException {
   return currentPartStream.closeForCommit().getRecoverable();
}

Close方法
调用时机:
干的事情:
onProcessingTime方法
调用时机:根据bucketCheckInterval参数定时器调用
干的事情:
(1)调用rollingPolicy.shouldRollOnProcessingTime根据rolloverInterval和inactivityInterval滚动策略进行文件关闭而不是滚动

void onProcessingTime(long timestamp) throws IOException {
   if (inProgressPart != null && rollingPolicy.shouldRollOnProcessingTime(inProgressPart, timestamp)) {
      if (LOG.isDebugEnabled()) {
         LOG.debug("Subtask {} closing in-progress part file for bucket id={} due to processing time rolling policy " +
               "(in-progress file created @ {}, last updated @ {} and current time is {}).",
               subtaskIndex, bucketId, inProgressPart.getCreationTime(), inProgressPart.getLastUpdateTime(), timestamp);
      }
      closePartFile();
   }
}

5.initializeState 方法

1.调用时机:在每次新建 BucketSink 或者故障恢复时会调用。

2.干的事情:Flink StreamingFileSink源码解析_lvwenyuan_1的博客-CSDN博客

获取此subtask对应的subtaskIndex
创建此subtask对应的buckets对象,构造函数中调用fileSystem的createRecoverableWriter创建RecoverableWriter
调用RecoverableWriter fsWriter = FileSystem.get(basePath.toUri()).createRecoverableWriter();

从BucketStates中恢复处于inprogress状态和pending状态的文件
restoreInProgressFile(bucketState);
恢复处于inprogress状态的文件:此subtask对应的所有bucket对象和bucket对象对应的PartFileWriter.

调用RecoverableFsDataOutputStream RecoverableWriter.recover(ResumeRecoverable resumable)

commitRecoveredPendingFiles(bucketState):
恢复处于pending状态的文件:变为finish状态

首先调用

RecoverableFsDataOutputStream.Committer RecoverableWriter .recoverForCommit(CommitRecoverable resumable)

然后调用Committer的commitAfterRecovery()方法

6.snapshotState方法

调用时机:在每次触发 ck 时会被调用
干的事情:
做快照前检查是否需要滚动,调用DefaultRollingPolicy.shouldRollOnCheckpoint根据partsize参数判断是否需要滚动,满足就会closePartFile。
将所有pending 状态的文件(CommitRecoverable对象)存储到NavigableMap<Long, List<CommitRecoverable>> pendingPartsPerCheckpoint结构中,方便所有task ck完成后修改其状态为 finish
将inProgres状态的文件持久化,并记录 inprogress 文件的大小,以便下次 ck 前发生故障,可以获知本次ck时该文件的大小,以便删除“本次ck后到故障发生时写入的数据”:调用PartFileWriter的persist方法(调用out.hflush()和out.hsync()进行持久化)对已经写入的数据持久化,并返回ResumeRecoverable对象
调用ResumeRecoverable RecoverableFsDataOutputStream.persist()

将当前状态存入Statebackend,以便下次 ck 前发生故障,可以从这个状态处进行恢复。
7.notifyCheckpointComplete方法

调用时机:该方法会在 ck 完成后调用
干的事情:
将 pending 状态的文件转为 final 状态
调用Committer recoverForCommit(CommitRecoverable resumable)

然后调用Committer .commit()

移除writer已经处于close状态的bucket
注意事项

通用注意事项

重要提示 1: 使用 Hadoop < 2.7 时,请使用 OnCheckpointRollingPolicy 滚动策略,该策略会在每次检查点时进行文件切割。 这样做的原因是如果部分文件的生命周期跨多个检查点,当 StreamingFileSink 从之前的检查点进行恢复时会调用文件系统的 truncate() 方法清理 in-progress 文件中未提交的数据。 Hadoop 2.7 之前的版本不支持这个方法,因此 Flink 会报异常。

重要提示 2: 鉴于 Flink 的 sink 以及 UDF 通常不会区分作业的正常结束(比如有限流)和异常终止,因此正常结束作业的最后一批 in-progress 文件不会被转换到 “完成” 状态。

重要提示 3: Flink 以及 StreamingFileSink 不会覆盖已经提交的数据。因此如果尝试从一个包含 in-progress 文件的旧 checkpoint/savepoint 恢复, 且这些 in-progress 文件会被接下来的成功 checkpoint 提交,Flink 会因为无法找到 in-progress 文件而抛异常,从而恢复失败。

重要提示 4: 目前 StreamingFileSink 只支持三种文件系统: HDFS、S3和Local。如果配置了不支持的文件系统,在执行的时候 Flink 会抛出异常。

S3 特有的注意事项

重要提示 1: 对于 S3,StreamingFileSink 只支持基于 Hadoop 的文件系统实现,不支持基于 Presto 的实现。如果想使用 StreamingFileSink 向 S3 写入数据并且将 checkpoint 放在基于 Presto 的文件系统,建议明确指定 “s3a://” (for Hadoop)作为sink的目标路径方案,并且为 checkpoint 路径明确指定 “s3p://” (for Presto)。 如果 Sink 和 checkpoint 都使用 “s3://” 路径的话,可能会导致不可预知的行为,因为双方的实现都在“监听”这个路径。

重要提示 2: StreamingFileSink 使用 S3 的 Multi-part Upload (后续使用MPU代替)特性可以保证精确一次的语义。这个特性支持以独立的块(因此被称为”multi-part”)模式上传文件,当 MPU 的所有部分文件 成功上传之后,可以合并成原始文件。对于失效的 MPUs,S3 提供了一个基于桶生命周期的规则,用户可以用这个规则来丢弃在指定时间内未完成的MPU。 如果在一些部分文件还未上传时触发 savepoint,并且这个规则设置的比较严格,这意味着相关的 MPU在作业重启之前可能会超时。后续的部分文件没 有写入到 savepoint, 那么在 Flink 作业从 savepoint 恢复时,会因为拿不到缺失的部分文件,导致任务失败并抛出异常。

附:Flink和hadoop的集成
Apache Flink 1.11 Documentation: Hadoop 集成

附:Flink shaded
GitHub - apache/flink-shaded: Apache Flink shaded artifacts repository

Flink DeleteOnExitHook上的内存泄漏
http://www.voidcn.com/article/p-yzidqbpd-bwh.html

dli反馈的flink长时间运行条件下checkpoint到obs时引起的OOM问题,解决方案:
1.拷贝hadoop2.8.3版本的LocalDirAllocator类,修改为OBSLocalDirAllocator,修改点如下:

正在上传…
取消

Flink开发遇到的问题
Caused by: org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "file"

at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3332)

at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352)

at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)

at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403)

at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371)

at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.get(FileSystem.java:477)

at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.getLocal(FileSystem.java:433)

at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:301)

at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:378)

at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:456)

at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:200)

at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.obs.OBSDataBlocks$DiskBlockFactory.createTmpFileForWrite(OBSDataBlocks.java:869)

at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.obs.OBSDataBlocks$DiskBlockFactory.create(OBSDataBlocks.java:843)

at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.obs.OBSBlockOutputStream.createBlockIfNeeded(OBSBlockOutputStream.java:223)

at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.obs.OBSBlockOutputStream.<init>(OBSBlockOutputStream.java:200)

at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.obs.OBSFileSystem.create(OBSFileSystem.java:943)

at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169)

at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149)

at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038)

at org.apache.flink.fs.obshadoop.FlinkOBSFileSystem.create(FlinkOBSFileSystem.java:128)

at org.apache.flink.fs.obshadoop.FlinkOBSFileSystem.create(FlinkOBSFileSystem.java:23)

at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.create(PluginFileSystemFactory.java:169)

at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:126)

at org.apache.flink.api.common.io.FileOutputFormat.open(FileOutputFormat.java:248)

at org.apache.flink.api.java.io.TextOutputFormat.open(TextOutputFormat.java:88)

at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.open(OutputFormatSinkFunction.java:65)

at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)

at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)

at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)

at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)

at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)

at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)

at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)

at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)

at java.lang.Thread.run(Thread.java:748)

开源flink对接华为云OBS相关推荐

  1. 【Go Web】Go对接华为云obs代码

    // UploadFile 文件上传函数 func UploadFile(c *gin.Context) {path := c.Param("path")if !isAllow(p ...

  2. 使用rclone工具实现华为云OBS至AWS S3数据迁移同步

    1. 背景 项目需要将华为云的OBS对象存储服务的存储桶bucket的内容迁移复制到AWS云的S3存储桶中,AWS中暂无实现改需求的云服务,所以采用开源的第三方软件rclone来实现. rclone可 ...

  3. 【福利活动】深度体验OpenHarmony对接华为云IoT

    本文主要介绍基于OpenHarmony 3.0来接入IoTDA,以BearPi-HM_Nano开发板为例,使用huaweicloud_iot_link SDK对接华为云物联网平台的简单流程.文末为大家 ...

  4. 六步带你完成博流wifi模组对接华为云流程

    摘要:本文主要采用基于博流wifi模组以及我们的SDK移植实现华为云的对接,上报数据以及命令下发等,希望对您有所帮助. 1 简介 首先需要研究透彻博流项目的编译运行流程,首先看其根目录中包括compo ...

  5. SpringBoot整合华为云OBS

    一.参考项 华为云 OBS(官网): 对象存储服务OBS官网_海量安全高可靠_数据云存储解决方案-华为云 OBS SDK for Java(官网): SDK下载_对象存储服务 OBS_Java_华为云 ...

  6. sscom串口网络数据调试器使用post方法向华为云obs桶上传文件和图片

    原贴地址:sscom串口网络数据调试器使用post方法向华为云obs桶上传文件和图片-云社区-华为云 [摘要] 之前发了文章"postman使用post方法向华为云obs桶上传文件和图片&q ...

  7. uniapp 上传图片到华为云obs

    记录一下用uniapp上传图片到华为云obs,之前是先把文件传到我们自己的服务器,然后后端的同事再上传到obs,但是我们公司的带宽太低了,传的太太太太慢了,于是考虑直接让用户上传到obs,不经过我们自 ...

  8. 前端js华为云obs断点续传上传

    前端js华为云obs断点续传上传 断点续传上传就是将待上传的文件分成若干份分别上传,并实时地将每段上传结果统一记录在断点续传记录对象中,仅当所有分段都上传成功时返回上传成功的结果,否则在回调函数中返回 ...

  9. 华为云OBS文件上传下载工具类

    Java-华为云OBS文件上传下载工具类 文章目录 Java-华为云OBS文件上传下载工具类 1.华为云obs文件上传下载 2.文件流转MultipartFile 3.File转换为Multipart ...

最新文章

  1. android 教程 最新版,Android最新版本开发环境搭建图文教程
  2. 函数空间中的最佳逼近
  3. 阿里百川与极客邦科技达成战略合作 Weex宣布开源
  4. Ubuntu18.04安装JDK1.8和maven3
  5. 阿里新一代分布式任务调度平台Schedulerx2.0破土而出
  6. Shell编程:shell script 的追踪与 debug
  7. 数据库事务4种隔离级别和7种传播行为
  8. 【matlab深度学习工具箱】classificationLayer参数详解
  9. 遍历Map集合的4种常用方法
  10. 电信中兴B860AV2.1-T_线刷固件包
  11. 自动驾驶技术-环境感知篇:基于视觉相关技术介绍
  12. QTableView结构及用法
  13. kd树 python实现_Python语言描述KNN算法与Kd树
  14. python ocr识别 沪牌_7月沪牌拍后分析:毫秒之间,锁定中标
  15. 小牛的net程序开发之路
  16. 计算机课睡觉检讨书,上课睡觉检讨书范文三篇
  17. k8s出现问题导致cpu使用率过高
  18. 制造行业实施作业成本法案例(AMT 邓为民)
  19. 平方根计算在加速度传感器中的应用
  20. char在mysql中的意思_mysql中char表示什么意思

热门文章

  1. 现场直播计分识别软件Crack:Scoreboard OCR
  2. Java——黑马程序员双色球案例
  3. CSS之元素的水平方向的布局
  4. YII 开启URL伪静态方法(yii中urlManager匹配和注意点)
  5. 如何查询GBase数据库中表的comment信息
  6. Weka内置特征选择算法整理
  7. 专访吕毅:链家网技术架构的演进之路
  8. 74期:涂鸦科技A轮获千万美元,无专职运维!
  9. 安全模式启动服务安全模式加载驱动.sys
  10. 坐标系定义和相互转换算法