为什么80%的码农都做不了架构师?>>>   

摘要: MaxCompute 的数据上传接口(Tunnel)定义了数据 block 的概念:一个 block 对应一个 http request,多个 block 的上传可以并发而且是原子的,一次同步请求要么成功要么失败,不会污染其他的 block。这种设计对于服务端来讲十分简洁,但是也把记录状态做 fa.

本文用到的

阿里云数加-大数据计算服务MaxCompute产品地址:https://www.aliyun.com/product/odps

MaxCompute 的数据上传接口(Tunnel)定义了数据 block 的概念:一个 block 对应一个 http request,多个 block 的上传可以并发而且是原子的,一次同步请求要么成功要么失败,不会污染其他的 block。这种设计对于服务端来讲十分简洁,但是也把记录状态做 failover 的工作交给了客户端。

用户在使用 Tunnel SDK 编程时,需要对 block 这一层的语义进行认知,并且驱动数据上传的整个过程[1],并且自己进行容错,毕竟『网络错误是正常而不是异常』。由于用户文档中并没有强调这一点的重要性,导致很多用户踩了坑,一种常见的出错场景是,当客户端写数据的速度过慢,两次 write 的间隔超时[2],导致整个 block 上传失败。

High Level API

MaxCompute Java SDK 在 0.21.3-public  之后新增了 BufferredWriter 这个更高层的 API,简化了数据上传的过程,并且提供了容错的功能。 BufferedWriter 对用户隐藏了 block 这个概念,从用户角度看,就是在 session 上打开一个 writer 然后往里面写记录即可:

RecordWriter writer = null;try {int i = 0;  writer = uploadSession.openBufferedWriter();Record product = uploadSession.newRecord();for (String item : items) {product.setString("name", item);product.setBigint("id", i);writer.write(product);i += 1;}
} finally {if (writer != null) {writer.close();}
}
uploadSession.commit();

具体实现时 BufferedWriter 先将记录缓存在客户端的缓冲区中,并在缓冲区填满之后打开一个 http 连接进行上传。BufferedWriter 会尽最大可能容错,保证数据上传上去。

  • 由于屏蔽了底层细节,这个接口可能并不适合数据预划分、断点续传、分批次上传等需要细粒度控制的场景。

多线程上传示例

多线程上传时,每个线程只需要打开一个 writer 往里面写数据就行了。

class UploadThread extends Thread {private UploadSession session;private static int RECORD_COUNT = 1200;public UploadThread(UploadSession session) {this.session = session;}@Overridepublic void run() {RecordWriter writer = up.openBufferedWriter();Record r = up.newRecord();for (int i = 0; i < RECORD_COUNT; i++) {r.setBigint(0, i);writer.write(r);}writer.close();}
};public class Example {public static void main(String args[]) {// 初始化 MaxCompute 和 tunnel 的代码TableTunnel.UploadSession uploadSession = tunnel.createUploadSession(projectName, tableName);UploadThread t1 = new UploadThread(up);UploadThread t2 = new UploadThread(up);t1.start();t2.start();t1.join();t2.join();uploadSession.commit();}

更多控制

重试策略

由于底层在上传出错时会回避一段固定的时间并进行重试,但如果你的程序不想花太多时间在重试上,或者你的程序位于一个极其恶劣的网络环境中,为此 TunnelBufferedWriter 允许用户配置重试策略。

用户可以选择三种重试回避策略:指数回避(EXPONENTIAL_BACKOFF)、线性时间回避(LINEAR_BACKOFF)、常数时间回避(CONSTANT_BACKOFF)。

例如下面这段代码可以将,write 的重试次数调整为 6,每一次重试之前先分别回避 4s、8s、16s、32s、64s 和 128s(从 4 开始的指数递增的序列)。

RetryStrategy retry = new RetryStrategy(6, 4, RetryStrategy.BackoffStrategy.EXPONENTIAL_BACKOFF)writer = (TunnelBufferedWriter) uploadSession.openBufferedWriter();
writer.setRetryStrategy(retry);

缓冲区控制

如果你的程序对 JVM 的内存有严格的要求,可以通过下面这个接口修改缓冲区占内存的字节数(bytes):

writer.setBufferSize(1024*1024);

默认配置每一个 Writer 的 BufferSize 是 10 MiB。TunnelBufferedWriter 一次 flush buffer 的操作上传一个 block 的数据[3]。

多个进程共享 Session

由于一个 Session 的上传状态是通过维护一个 block list 实现的,对于多线程程序来讲,通过锁很容易实现资源的分配。但对于两个进程空间里的程序想要复用一个 Session 时,必须通过一种机制对资源进行隔离。

具体地,在 getUploadSession 的时候,必须指定这个共享这个 Session 的进程数目,以及一个用来区分进程的 global id:

//程序1:这个 session 将被两个 writer 共享,我是其中第 0 个
TableTunnel.UploadSession up = tunnel.getUploadSession(projectName, tableName, sid, 2, 0);
writer = session.openBufferedWriter();//程序1:这个 session 将被两个 writer 共享,我是其中第 1 个
TableTunnel.UploadSession up = tunnel.getUploadSession(projectName, tableName, sid, 2, 1);
writer = session.openBufferedWriter();

Notes

[1] 一次完整的上传流程通常包括以下步骤:

先对数据进行划分
为每个数据块指定 block id,即调用 openRecordWriter(id)
然后用一个或多个线程分别将这些 block 上传上去
并在某个 block 上传失败以后,需要对整个 block 进行重传
在所有 block 都上传以后,向服务端提供上传成功的 blockid list 进行校验,即调用 session.commit([1,2,3,...])
[2] 因为使用长连接,服务端有计时器判断是否客户端是否 alive

[3] block 在服务端有 20000 个的数量上限,如果 BufferSize 设得太小会导致 20000 个 block 很快被用光

[4] Session的有效期为24小时,超过24小时会导致数据上传失败

原文链接

转载于:https://my.oschina.net/yunqi/blog/1785569

MaxCompute Tunnel SDK数据上传利器——BufferedWriter使用指南相关推荐

  1. 文件上传利器SWFUpload使用指南

    2019独角兽企业重金招聘Python工程师标准>>> 文件上传利器SWFUpload使用指南 SWFUpload是一个flash和js相结合而成的文件上传插件,其功能非常强大.以前 ...

  2. 文件上传利器SWFUpload使用指南(转)

    http://www.cnblogs.com/2050/archive/2012/08/29/2662932.html 文件上传利器SWFUpload使用指南 SWFUpload是一个flash和js ...

  3. Plupload 上传控件使用指南

    本文转载至(感谢原作者分享):http://www.cnblogs.com/2050/p/3913184.html#plupload_doc2 我之前写过一篇文章<文件上传利器SWFUpload ...

  4. STM32控制NB-IOT将数据上传至微信与网站显示

    利用nb-iot的Modbus协议将获取的数据发至服务器,通过微信与网站显示 前段时间给同学做了一个小项目.基于stm32103获取空气质量数据,温湿度等信息,然后利用nb-iot上传.之前一直听过这 ...

  5. java云控_云控 数据上传

    新旧云控,新旧上传SDK的实现和区别 0]旧云控的功能控制是使用FLAG控制,云下发flags: Whetstone/app/klorobot/src/com/xxxx/klo/bugreport/u ...

  6. 2020-08-20 将数据上传到 S3 或从S3下载

    将数据上传到 S3 在上个 notebook 中,你应该使用给定剽窃/非剽窃文本数据语料库的特征和类别标签创建了两个文件:training.csv 和 test.csv 文件. 以下单元格将加载一些 ...

  7. 【AllJoyn专题】基于AllJoyn和Yeelink的传感器数据上传与指令下行的研究

    接触高通物联网框架AllJoyn不太久,但确是被深深地吸引了.在我看来,促进我深入学习的原因有三点:一.AllJoyn开源,对开源的软硬件总会有种莫名的喜爱,虽然或许不会都深入下去:二.顺应潮流,物联 ...

  8. m5310模组数据上传至onenet_NBIOT模组M5310接入中国移动物联网开放平台示例文档

    <time>: -t 设备存活时间,标示终端和 OneNET 平台之间连接的存活周 期,设置范围为 10s~86400s: : -u 设置 PUT 和 POST 指令分片长度,范围 0~6 ...

  9. 检查文件上传完成_“我的数据上传NCBI又报错了...” “攻略拿去!”

    在上一期的内容中,我们分享了NCBI测序数据上传的主要步骤和资料填写的注意事项.今天跟大家分享最后一步:原始测序数据的上传以及上传后项目编号的相关类型和含义. 图1 NCBI测序数据上传步骤 | 原始 ...

最新文章

  1. mysql忘记密码如何修改
  2. string类的用法详解
  3. oracle ob 使用基础之基础
  4. 内存泄漏的常见应用领域
  5. Gartner公布2017年十大战略科技发展趋势
  6. php使用5.2.,请问php5.2.5版本的$_FILES函数的用法?
  7. 弹性架构_实践中的弹性基础架构
  8. 面向对象的设计原则-类设计原则
  9. 程序员的“数学修炼手册”,帮你快速恶补数学知识 | 资源
  10. titanic数据集_数据可视化泰坦尼克号图表预测
  11. kotlin-中文免费文档(后台,android,前端)
  12. Inceptor UDF
  13. 计算机怎么配置IP地址,如何设置电脑IP地址?
  14. 使用Python第三方库requests和bs4 爬取必应首页的图片,并存储到系统对应位置
  15. 当Analyzer 2007 遇上.Net 3.0时,可能会秀才爱上兵
  16. DFT中常用英文缩写,词汇及详解(每日持续更新)
  17. 测试用例(测试用例的编写、评审和管理)
  18. matlab非平稳信号小波和FFT去噪
  19. Hive 3.1.2Linux CentOs 安装,踩坑 Dbeaver 连接Hive
  20. JAVA-@Primary的常用方式

热门文章

  1. python图片转字符画
  2. 首尾连接的数组的求和问题
  3. Data Lake Analytics: 读/写PolarDB的数据
  4. Codeforces Round #539 (Div. 1)
  5. LightOJ 1084 Winter(记忆化搜索)
  6. Android 本地搭建Tomcat服务器供真机测试
  7. PHP算法之四大基础算法
  8. python类中成员的的调用
  9. git常见使用场景总结
  10. linux bash: sqlplus: command not found 错误处理