场景描述
有这样一种场景,用户在自建服务器上存有一定数量级的CSV格式业务数据,某一天用户了解到阿里云的OSS服务存储性价比高(嘿嘿,颜值高),于是想将CSV数据迁移到云上OSS中,并且未来还想对这些数据做一些离线分析,挖掘其中存在价值,因此需要将OSS中文件再通过一种方式同步到ODPS数加平台上,面对这样需求,小编我经过参考文档,实践,调试并修复Bug,实现出以下一种解决方案。实现目标
通过OSS的Java SDK以及批量数据通道tunnel SDK实现以下两个功能:
(1)将本地CSV文件上传到OSS;
(2)将OSS中文件同步到ODPS;
准备工作
在具体实操之前,有必要对OSS有个了解,OSS是个什么东东,为什么要选用OSS呢,OSS控制台限制条件,需要注意事项?
OSS是个什么东东?
阿里云对象存储(Object Storage Service,简称OSS),是阿里云对外提供的海量,安全,低成本,高可靠的云存储服务。通过网络随时存储和调用包括文本、图片、音频、和视频在内的各种结构化或非结构化数据文件。
为什么选用云产品OSS服务呢?
是什么原因致使用户放弃使用自建服务器存储数据,而转向云产品OSS呢?
这方面我深有感触,我以前在上海一家公司工作,原公司所有数据都是存放在自建的五六台服务器上,从规划,采购到部署,这其间过程复杂,人力部署也不简单,而且服务器价格昂贵,开发维护成本高,数据可靠性还低,总之耗时、耗力最重要是影响业务进展。接触了解到OSS后才发现,之前的自建服务器存储真是太out啦,呵呵,OSS颜值高额,这里颜值具体有以下几个方面:
可靠性高:数据自动多重冗余备份,规模自动扩展,不影响对外服务;
安全:提供企业级、用户级多层次安全保护,授权机制及白名单、防盗链、主子账号功能;
成本:省去人工扩容硬盘以及运维成本;
数据处理能力:提供丰富的数据处理服务,比如图片处理、视频转码、CDN内容加速分发。
OSS控制台限制条件?
通过 OSS 控制台可以上传小于 500 MB 文件。如要上传的文件大于 500 MB,控制台会给出超过大小限制警告,并且在任务管理列表,失败并尝试上传请求三次。异常警告如下图所示:

解决方法:可以通过 OSS的SDK 进行上传。需要注意几点
(1) 在OSS中,用户操作基本数据单元是object,单个对象大小限制为48.8TB,一个存储空间中可以有无
限量对象。
(2) 新建Bucket,输入存储空间名称,创建后不支持更改存储空间名称,上传到OSS后不能移动文件存储位
置;
(3) 在所属地域框中,下拉选择该存储空间的数据中心。订购后不支持更换地域。
(4) 删除存储空间之前请确保尚未完成的分片上传文件产生的碎片文件全部清空,否则无法删除存储空间。
(5) 通过web控制台上传文件,一刷新页面,任务管理中显示的上传任务就会消失不见,所以在上传过程中
不要刷新页面。
本地大文件分片上传到OSS
因为使用单次HTTP请求,Object过大会导致上传时间长。在这段时间出现网络原因造成超时或者链接断开错误的时候,上传容易失败,可以考虑断点续传上传(分片上传)。当Object大于5GB,这种情况下只能使用断点续传上传(分片上传),具体参考断点续传上传,下面代码实现上传本地路径下ratings.csv文件到OSS object管理中: 见附件中 源代码.rar 压缩文件中的 MultipartUploadDemo 类实现
单线程实现将OSS文件上传至ODPS(OSS java-SDK与tunnel SDK结合)
下面代码实现目标:将OSS中bucket名为qf-test,object对象为ratings.csv文件数据导入到ODPS平台中项目名为dtstack_dev,表名为ratings,分区字段为ds=20160612中。 见附件中 源代码.rar 压缩文件中的 OSSToODPS_Upload 类实现
多线程实现将OSS文件上传至ODPS(OSS java-SDK与tunnel SDK结合) 下面代码实现目标:将OSS中bucket名为qf-test,object对象为data_test/movies.csv文件数据导入到ODPS平台中项目名为dtstack_dev,表名为movies_odps2中。

import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.aliyun.odps.Column; import com.aliyun.odps.Odps; import com.aliyun.odps.PartitionSpec; import com.aliyun.odps.TableSchema; import com.aliyun.odps.account.Account; import com.aliyun.odps.account.AliyunAccount; import com.aliyun.odps.data.Record; import com.aliyun.odps.data.RecordWriter; import com.aliyun.odps.tunnel.TableTunnel; import com.aliyun.odps.tunnel.TableTunnel.UploadSession; import com.aliyun.odps.tunnel.TunnelException; import com.aliyun.oss.OSSClient; import com.aliyun.oss.model.GetObjectRequest; import com.aliyun.oss.model.OSSObject; class UploadThread implements Callable<Boolean> { private long id; private TableSchema schema = null; private RecordWriter recordWriter = null; private Record record = null; private BufferedReader reader = null; public UploadThread(long id, RecordWriter recordWriter, Record record,                          TableSchema schema,BufferedReader reader) { this.id = id; this.recordWriter = recordWriter; this.record = record; this.schema = schema; this.reader = reader; } public Boolean call() throws Exception { while (true) {                    String line = reader.readLine(); if (line == null) break; if(id == 0){ //第一行是字段名,忽略掉                           id++; continue; }                  System.out.println(line);                   String[] s = line.split(","); for (int i = 0; i < schema.getColumns().size(); i++) {                       Column column = schema.getColumn(i); switch (column.getType()) { case BIGINT:                              record.setBigint(i, Long.valueOf(s[i])); break; //               case BOOLEAN: //                       record.setBoolean(i, str); //                       break; //               case DATETIME: //                       record.setDatetime(i, str); //                       break; case DOUBLE:                                record.setDouble(i, Double.valueOf(s[i])); break; case STRING:                              record.setString(i,s[i]); break; default: throw new RuntimeException("Unknown column type: " + column.getType()); } }                    recordWriter.write(record); }               recordWriter.close(); return true; } } public class OSSToODPS_UploadThread { private static String accessKeyId = "UQV2yoSSWNgquhhe"; private static String accessKeySecret = "bG8xSLwhmKYRmtBoE3HbhOBYXvknG6"; private static String endpoint = "http://oss-cn-hangzhou.aliyuncs.com"; private static String bucketName = "qf-test"; private static String key = "data_test/movies.csv"; private static String tunnelUrl = "http://dt.odps.aliyun.com"; private static String odpsUrl = "http://service.odps.aliyun.com/api"; private static String project = "dtstack_dev"; private static String table = "movies_odps2"; //private static String partition = "ds=20160612"; private static int threadNum = 10; public static void main(String args[]) { /*                   * Constructs a client instance with your account for accessing OSS                  */                 OSSClient client = new OSSClient(endpoint, accessKeyId, accessKeySecret);                  System.out.println("Downloading an object");                  OSSObject object = client.getObject(new GetObjectRequest(bucketName, key));                BufferedReader reader = new BufferedReader(new InputStreamReader(object.getObjectContent()));                                    Account account = new AliyunAccount(accessKeyId, accessKeySecret);                  Odps odps = new Odps(account);                  odps.setEndpoint(odpsUrl);                  odps.setDefaultProject(project); try {                          TableTunnel tunnel = new TableTunnel(odps);                          tunnel.setEndpoint(tunnelUrl); //PartitionSpec partitionSpec = new PartitionSpec(partition);                          UploadSession uploadSession = tunnel.createUploadSession(project,table); //                       UploadSession uploadSession = tunnel.createUploadSession(project, //                                 table, partitionSpec);  //分区                           System.out.println("Session Status is : " + uploadSession.getStatus().toString());                           ExecutorService pool = Executors.newFixedThreadPool(threadNum);                          ArrayList<Callable<Boolean>> callers = new ArrayList<Callable<Boolean>>(); for (int i = 0; i < threadNum; i++) {                                  RecordWriter recordWriter = uploadSession.openRecordWriter(i);                                  Record record = uploadSession.newRecord();                                  callers.add(new UploadThread(i, recordWriter, record,                                                  uploadSession.getSchema(),reader)); }                          pool.invokeAll(callers);                          pool.shutdown();                           Long[] blockList = new Long[threadNum]; for (int i = 0; i < threadNum; i++)                                  blockList[i] = Long.valueOf(i);                          uploadSession.commit(blockList);                          reader.close();                          System.out.println("upload success!"); } catch (TunnelException e) {                          e.printStackTrace(); } catch (IOException e) {                          e.printStackTrace(); } catch (InterruptedException e) {                          e.printStackTrace(); } } }

编程实现中遇到BugApache httpclient包冲突
Exception in thread "main" java.lang.NoSuchFieldError: INSTANCE at org.apache.http.conn.ssl.SSLConnectionSocketFactory.<clinit>(SSLConnectionSocketFactory.java:144)
at com.aliyun.oss.common.comm.DefaultServiceClient.createHttpClientConnectionManager(DefaultServiceClient.java:232)
at com.aliyun.oss.common.comm.DefaultServiceClient.<init>(DefaultServiceClient.java:78)
at com.aliyun.oss.OSSClient.<init>(OSSClient.java:273)
at com.aliyun.oss.OSSClient.<init>(OSSClient.java:194)
at UploadToODPS.main(UploadToODPS.java:53)
工程里可能有包冲突。原因是OSS Java SDK使用了Apache httpclient 4.4.1,而个人工程使用了与Apache httpclient 4.4.1冲突的Apache httpclient。如上述发生错误的工程里,使用了Apache httpclient 4.1.2:
使用统一版本。如果个人工程里使用与Apache httpclient 4.4.1冲突版本,请也使用4.4.1版本。去掉其它版本的Apache httpclient依赖。

recordWriter.write(record) 写入位置不正确 在单线程编码实现从OSS传数据到ODPS代码中 recordWriter.write(record) 写入位置不正确,如下代码显示: for (int i = 0; i < schema.getColumns().size(); i++) {
Column column = schema.getColumn(i);
switch (column.getType()) {
case BIGINT:
record.setBigint(i, Long.valueOf(s[i]));
break;
case DOUBLE:
record.setDouble(i, Double.valueOf(s[i]));
break;
default:
throw new RuntimeException("Unknown column type: "
+ column.getType());
recordWriter.write(record); //写入位置不正确
}
}
// recordWriter.write(record); //放到for循环外,写入位置正确
recordWriter.write(record)写入位置不对,将recordWriter.write(record)放置到for循环内,会出现以下奇怪异常:

正确位置是:将recordWriter.write(record)放置到for循环外,结果如下表显示:

上传代码中 partition="20160612" 字符串写法不对
需要注意,指定分区字符串在程序中正确写法:
private static String partition = "ds=20160612"; (必须加上分区字段名)
PartitionSpec partitionSpec = new PartitionSpec(partition);
不正确写法如下:
private static String partition = "20160612";(缺少分区字段名)多线程上传任务无故中断,如下是异常截图

通过多线程将OSS中文件同步到ODPS表中时,实现多任务的并发执行,在编码实现时要注意reader.close()位置要放正确:
UploadSession uploadSession = tunnel.createUploadSession(project,table, partitionSpec);
OSSObject object = client.getObject(new GetObjectRequest(bucketName, key));
BufferedReader reader = new BufferedReader(new InputStreamReader(object.getObjectContent()));
Long[] blockList = new Long[threadNum];
uploadSession.commit(blockList);
将reader.close()放到Callable接口中call()方法里是不对滴,call方法是线程异步执行地方,开启的所有线程不断地异步从OSS的缓冲字符输入流reader中读取OSS中数据,如果在call()方法中就将reader关闭,也就是说将输入数据源关闭,直接导致线程读取失败。因此reader.close()应该放在线程外部,即uploadSession.commit()位置后边,如下。 uploadSession.commit(blockList);reader.close(); //正确位置System.out.println("upload success!");
附件下载: 源代码.rar[dream2751027].1465828697.rar

作者:dream2751027

原文链接

更多云计算干货敬请关注知乎号:大数据小编-知乎

图片上传之后清空_OSS文件上传及OSS与ODPS之间数据连通相关推荐

  1. 00截断上传绕过_【文件上传与解析】文件上传与解析漏洞总结v1.0

    点击上方"公众号" 可以订阅哦! Hello,各位小伙伴晚上好~ 这里是依旧勤劳写公众号的小编~ 今天本公众号将推出一个新的模块,那就是漏洞知识点总结模块!!!(此处应有掌声~) ...

  2. php怎么上传函数,PHP单文件上传原理及上传函数的封装操作示例

    搜索热词 @H_404_0@本文实例讲述了PHP单文件上传原理及上传函数的封装操作.分享给大家供大家参考,具体如下: @H_404_0@表单: @H_404_0@0.PHP: 无标题文档 请选择您要上 ...

  3. web文件上传(一)--文件上传与json上传区别及方法

    Web文件上传方法总结大全 上传文件与与上传数据区别 上传数据主要指json等简单字符串,上传文件指的是上传word.excel图片等.在上传数据的时候enctype默认为第一个application ...

  4. SSM框架使用Layui文件上传插件实现多文件上传(多文件列表)

    SSM框架使用Layui文件上传插件实现多文件上传(多文件列表) pom.xml文件的配置 想要实现SSM框架实现多文件上传,必要的jar包必须要在pom.xml文件中引入.如下: <!--co ...

  5. 使用Apache文件上传控件实现文件上传

    本文使用Apache提供的第三方文件上传控件进行文件上传 1.导入第三方commons-fileupload-1.3.2.jar和commons-io-2.5.jar包 2.页面form标签需添加en ...

  6. 【文件上传漏洞-01】文件上传漏洞概述、防御以及WebShell基础知识补充

    目录 1 文件上传漏洞概述 2 文件上传漏洞防御.绕过.利用 2.1 黑白名单策略 3 WebShell基础知识补充 3.1 WebShell概述 3.2 大马与小马 1 文件上传漏洞概述 概述:文件 ...

  7. asp.net实现ftp上传代码(解决大文件上传问题)

    asp.net实现ftp上传代码(解决大文件上传问题) 参考文章: (1)asp.net实现ftp上传代码(解决大文件上传问题) (2)https://www.cnblogs.com/LYunF/ar ...

  8. 文件上传linux服务器,Linux 文件上传Linux服务器

    进入命令行 在图形化桌面出现之前,与Unix系统进行交互的唯一方式就是借助由shell所提供的文本命令行界面(command line interface,CLI).CLI只能接受文本输入,也只能显示 ...

  9. SharePoint 2010 ——自定义上传页面与多文件上传解决方案

    SharePoint 2010 --自定义上传页面与多文件上传解决方案 参考文章: (1)SharePoint 2010 --自定义上传页面与多文件上传解决方案 (2)https://www.cnbl ...

最新文章

  1. boost::gil模块实现dynamic image的测试程序
  2. An example of using Pandas for regression
  3. leetcode-Excel Sheet Column Title
  4. 元类--用不上的先了解
  5. 每周四JEECG社区公开课:微信公众账号运营(jeewx使用)入门讲解
  6. 智能终端会议系统(20)---网络视频传输协议--RTP/RTCP/RTSP/SIP/SDP 之间关系
  7. centos安装python3.5_CentOS 7安装Python3.5
  8. 传奇电子cq9跳高高、跳起来规则与操作技巧
  9. 强大的发包工具fine packet builder
  10. ubuntu18.04 安装nvidia显卡驱动
  11. Java架构师成长之道之计算机组成原理概述篇
  12. 计算机二级数据模拟表,2020年计算机二级《Access数据库程序设计》模拟题(5)...
  13. 在本地计算机无法启动uGs,SIEMENS_NX 免安装版
  14. 物流快递管理系统源码
  15. kali利用fluxion无线网络钓鱼
  16. 夜深人静写算法【递归】
  17. java ssl 双向认证_java实现 SSL双向认证
  18. 给定连接查询ems配送信息
  19. 计算机课程打字教学,打字教程第1课 认识键盘
  20. 《HelloGitHub》第 79 期

热门文章

  1. 阿联酋是发达国家还是发展中国家
  2. 工作总结的写作方法与要领
  3. 2013校作息时间表暂存
  4. 730版本去掉恼人的提示信息
  5. 百度健康打通医药电商服务
  6. 计算机系统最大的加速能力,系统加速我用Windows系统四大自带工具 -电脑资料
  7. 唐诗三百首加密软件如何使用_视频加密一机一码软件该如何选择?有哪些因素影响?...
  8. java 执行cd_Java调用Linux命令(cd的处理)
  9. java session 生命周期_JavaWeb关于session生命周期的几种设置方法
  10. glance-50(秒解秒懂)