简介:DataWorks作为飞天大数据平台操作系统,历经11年发展,形成了涵盖数据集成、数据开发、数据治理、数据服务的一站式大数据开发治理平台。很多企业用户在使用产品的过程中希望他们的本地服务能够和阿里云上的DataWorks服务进行交互,从而提升企业大数据处理的效率,减少人工操作和运维工作,降低数据风险和企业成本,现在DataWorks开放OpenAPI能力满足企业的定制化需求。

DataWorks作为飞天大数据平台操作系统,历经11年发展,形成了涵盖数据集成、数据开发、数据治理、数据服务的一站式大数据开发治理平台。很多企业用户在使用产品的过程中希望他们的本地服务能够和阿里云上的DataWorks服务进行交互,从而提升企业大数据处理的效率,减少人工操作和运维工作,降低数据风险和企业成本,现在DataWorks开放OpenAPI能力满足企业的定制化需求。
DataWorks OpenAPI涵盖租户、元数据、数据开发、运维中心、数据质量、数据服务等DataWorks核心能力,企业版和旗舰版分别赠送100万次/月、1000万次/月的免费调用额度。

关于Dataworks OpenAPI开通要求和开放地域可查阅DataWorks OpenAPI概述
限DataWorks企业版及以上使用立即开通
开通7天试用请使用钉钉扫码联系

实战简介

我们假设这样一个简单的场景,开发人员想把RDS库里面的数据同步到一张MaxCompute分区表中,然后在自建系统的页面上展示经过数据分析后的报表数据,那么如何通过DataWorks OpenAPI去完成整个链路的实现呢?

实战准备

一、引入DataWorks OpenAPI SDK

这一部分可参考 安装 DataWorks OpenAPI Java SDK,除了java语言,我们还支持Python,PHP,C#,Go 等语言支持。默认情况下我们不需要显式去指定DataWorks OpenAPI的EndPoint,但是如果aliyun-java-sdk-core版本偏低的情况下可能会找不到DataWorks OpenAPI的Endpoint,这时候可在不升级版本的情况下通过使用如下代码进行请求。

    IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", "XXX", "xxx");DefaultProfile.addEndpoint("cn-shanghai","dataworks-public", "dataworks.cn-shanghai.aliyuncs.com");IAcsClient client = new DefaultAcsClient(profile);

如上代码是显式地指定了DataWorks OpenAPI的EndPoint,dataworks.${regionId}.aliyuncs.com这样的域名格式在公网环境下可访问,但是有些用户需要在VPC环境下调用OpenAPI,那么则需要把域名dataworks.${regionId}.aliyuncs.com 变更成 dataworks-vpc.${regionId}.aliyuncs.com,这样在VPC网络环境下即使不能访问公网也能请求到DataWorks OpenAPI。
如果您不清楚regionId(地域ID)的概念,可参考地域和可用区。

二、了解DataWorks OpenAPI文档

详细阅读DataWorks OpenAPI文档对开发非常有帮助,做API开发时如果对参数的约束不太理解时可参考DataWorks OpenAPI文档,里面对每个出入参、参数示例、错误码描述都有详细的解释。点击查看API参考>>

实战步骤

步骤一:创建RDS数据源

集成租户API可创建引擎、创建数据源、查看项目空间等信息。在我们这个业务场景中,MaxCompute分区表存在于MaxCompute引擎中,我们在DataWorks管控台创建完MaxCompute工作空间后会自动创建好MaxCompute引擎的数据源,所以我们只需要使用【CreateConnection】创建好RDS数据源即可:

        CreateConnectionRequest createRequest = new CreateConnectionRequest();createRequest.setProjectId(-1L);createRequest.setName("TEST_CONNECTION");createRequest.setConnectionType("MYSQL");createRequest.setEnvType(1);createRequest.setContent("{\"password\":\"12345\"}");Long connectionId;try {CreateConnectionResponse createResponse = client.getAcsResponse(createRequest);Assert.assertNotNull(createResponse.getData());connectionId = createResponse.getData();UpdateConnectionRequest updateRequest = new UpdateConnectionRequest();updateRequest.setConnectionId(connectionId);updateRequest.setDescription("1");UpdateConnectionResponse acsResponse = client.getAcsResponse(updateRequest);Assert.assertTrue(acsResponse.getData());DeleteConnectionRequest deleteRequest = new DeleteConnectionRequest();deleteRequest.setConnectionId(connectionId);DeleteConnectionResponse deleteResponse = client.getAcsResponse(deleteRequest);Assert.assertTrue(deleteResponse.getData());} catch (ClientException e) {e.printStackTrace();Assert.fail();}

UpdateConnection和DeleteConnection可分别修改和删除数据源信息。另外对项目空间的成员进行管理的API集是CreateProjectMember、DeleteProjectMember、RemoveProjectMemberFromRole、ListProjectMembers。

步骤二:表的创建

集成DataWorks元数据OpenAPI我们能管理引擎侧的表信息,通过DataWorks管控台和租户API我们完成了MaxCompute引擎和RDS数据源的创建工作,下一步需要完成表的创建,可通过元数据的【CreateTable】完成:

        IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", "XXX", "xxx");IAcsClient client = new DefaultAcsClient(profile);CreateTableRequest request = new CreateTableRequest();request.setTableName("table_test");request.setColumnss(new ArrayList<>());request.setEndpoint("endpoint");CreateTableResponse response = client.getAcsResponse(request);String nextTaskId = response.getTaskInfo().getNextTaskId();System.out.println(nextTaskId);

关于表管理的API集是CreateTable、UpdateTable、DeleteTable、GetMetaDBTableList、CheckMetaTable等,除了可对表进行管理,元数据API还能对表元数据、表主题进行管理,更多详情可参考DataWorks OpenAPI文档。

步骤三:任务开发和发布调度

集成数据开发API可管理文件,并对文件进行提交和发布后生成周期任务,周期任务会定时调度运行,创建不同类型的文件是根据FileType这个字段决定的,目前我们已支持非常多的FileType,通过运维中心的API【ListProgramTypeCount】可获取所有已支持的系统节点以及自定义节点。

        IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", "XXX", "xxx");IAcsClient client = new DefaultAcsClient(profile);CreateFileRequest createFileRequest = new CreateFileRequest();createFileRequest.setFileType(DefaultNodeType.ODPS_SQL.getCode());createFileRequest.setInputList(projectIdentifier+"_root");createFileRequest.setContent(content);createFileRequest.setFileName("create_file_" + caseId);createFileRequest.setFileFolderPath("业务流程/POP接口测试/MaxCompute/test_folder_3");createFileRequest.setFileDescription("create file " + caseId);createFileRequest.setRerunMode("ALL_ALLOWED");CreateFileResponse createFileResponse = getAcsResponse(createFileRequest);

content字段存储SQL脚本、Shell脚本、数据集成的脚本代码,数据集成的脚本格式可参考通过脚本模式配置任务。
使用【CreateFile】创建完脚本后,如需修改可使用UpdateFile、DeleteFile进行管理。和页面上的操作流程一致的是完成文件开发后得提交和发布文件才会生成周期实例,这里要注意的是需要轮询SubmitFile返回的 DeploymentId,只有当GetDeployment返回的状态是完成时(status.finished())才表示部署成功。

        IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", "XXX", "xxx");IAcsClient client = new DefaultAcsClient(profile);SubmitFileRequest request = new SubmitFileRequest();request.setFileId(fileId);request.setComment("submit file");SubmitFileResponse submitFileResponse = getAcsResponse(submitFileRequest);//检查提交结果DeploymentStatus status = null;GetDeploymentResponse.Data.Deployment deployment = null;int retryTimes = 0;while (retryTimes < 6) {GetDeploymentRequest getDeploymentRequest = getDeploymentRequest(submitFileResponse.getData());GetDeploymentResponse getDeploymentResponse = getAcsResponse(getDeploymentRequest);LOGGER.info("Deployment status got - RequestId[{}]", getDeploymentResponse.getRequestId());Assert.assertNotNull(getDeploymentResponse.getData());deployment = getDeploymentResponse.getData().getDeployment();Assert.assertNotNull(deployment);Assert.assertTrue(deployment.getName().equalsIgnoreCase(baseId));Assert.assertTrue(deployment.getCreatorId().equalsIgnoreCase(baseId));Assert.assertTrue(deployment.getHandlerId().equalsIgnoreCase(baseId));Assert.assertEquals((int) deployment.getFromEnvironment(), DatastudioEnvironment.LOCAL.value());Assert.assertEquals((int) deployment.getToEnvironment(), DatastudioEnvironment.DEV.value());Assert.assertTrue(StringUtils.isBlank(deployment.getErrorMessage()));status = Enums.find(DeploymentStatus.class, deployment.getStatus());Assert.assertNotNull(status);if (status.finished()) {LOGGER.info("Deployment finished - FinalStatus[{}]", status);break;}LOGGER.info("Deployment not finished. Sleep for a while. - CurrentStatus[{}]", status);retryTimes++;SleepUtils.seconds(10L);}

如果是在标准模式的项目下开发,提交完成后,还需要发布文件才能最终提交到调度成为周期任务。发布文件使用DeployFile,和提交文件一样,也需要使用GetDeployment轮询部署状态。

    DeployFileRequest request = new DeployFileRequest();request.setFileId(fileId);request.setComment("deploy file");DeployFileResponse deployFileResponse = getAcsResponse(deployFileRequest);//检查发布部署结果DeploymentStatus status = null;GetDeploymentResponse.Data.Deployment deployment = null;int retryTimes = 0;while (retryTimes < 6) {GetDeploymentRequest getDeploymentRequest = getDeploymentRequest(deploymentId);GetDeploymentResponse getDeploymentResponse = getAcsResponse(getDeploymentRequest);LOGGER.info("Deployment status got - RequestId[{}]", getDeploymentResponse.getRequestId());Assert.assertNotNull(getDeploymentResponse.getData());deployment = getDeploymentResponse.getData().getDeployment();Assert.assertNotNull(deployment);LOGGER.info("Deployment information got - DeploymentId[{}] - DeploymentDetail[{}]",deploymentId, new Gson().toJson(deployment));Assert.assertTrue(deployment.getCreatorId().equalsIgnoreCase(baseId));Assert.assertTrue(StringUtils.isBlank(deployment.getErrorMessage()));status = Enums.find(DeploymentStatus.class, deployment.getStatus());Assert.assertNotNull(status);if (status.finished()) {LOGGER.info("Deployment finished - FinalStatus[{}]", status);break;}LOGGER.info("Deployment not finished. Sleep for a while. - CurrentStatus[{}]", status);retryTimes++;SleepUtils.seconds(10L);}

数据开发API除了可对文件管理外,还能管理文件夹、资源、函数,更多详情可参考DataWorks OpenAPI文档。

步骤四:配置运维监控

通过API完成周期任务的生产之后,会在DataWorks平台每天生成调度实例被定时调度运行,使用运维中心API可对周期任务和周期实例进行运维操作,可通过GetNode、GetInstance、ListInstances等API查看周期任务和周期实例,监控实例运行情况。

        GetInstanceRequest request = new GetInstanceRequest();request.setInstanceId(INSTANCE_ID);request.setProjectEnv(PROJECT_ENV);try {GetInstanceResponse response = client.getAcsResponse(request);Object data = ReturnModelParser.parse("getInstanceSuccess", gson.toJson(response));BizInstanceDto bizInstanceDto = GsonUtils.jsonToBean(data.toString(), BizInstanceDto.class);Assert.assertEquals("NOT_RUN", bizInstanceDto.getStatus().toString());Assert.assertEquals(1590416703313L, bizInstanceDto.getModifyTime().getTime());Assert.assertEquals(INSTANCE_ID, bizInstanceDto.getInstanceId());Assert.assertEquals("DAILY", bizInstanceDto.getDagType().toString());Assert.assertEquals("kzh", bizInstanceDto.getNodeName());Assert.assertEquals("", bizInstanceDto.getParamValues());Assert.assertEquals(1590416703313L, bizInstanceDto.getCreateTime().getTime());Assert.assertEquals(1590422400000L, bizInstanceDto.getCycTime().getTime());Assert.assertEquals(338450167L, bizInstanceDto.getDagId().longValue());Assert.assertEquals(1590336000000L, bizInstanceDto.getBizdate().getTime());Assert.assertEquals(33115L, bizInstanceDto.getNodeId().longValue());} catch (Exception e) {e.printStackTrace();Assert.fail();}

如果实例运行异常可通过RestartInstance、SetSuccessInstance、SuspendInstance、ResumeInstance处理。
使用CreateRemind、UpdateRemind等API可创建自定义报警规则,确保每天基线顺利产出,一旦异常可告警通知到人工,然后介入。

        CreateRemindRequest createRemindRequest = new CreateRemindRequest();createRemindRequest.setRemindName("REMIND_CREATE_TEST");createRemindRequest.setRemindUnit(PopRemindUnit.NODE.name());createRemindRequest.setRemindType(RemindType.ERROR.name());createRemindRequest.setAlertUnit(PopAlertUnit.OTHER.name());createRemindRequest.setDndEnd("08:00");createRemindRequest.setNodeIds("-1");createRemindRequest.setMaxAlertTimes(1);createRemindRequest.setAlertInterval(1800);createRemindRequest.setAlertMethods(PopAlertMethod.MAIL.name());createRemindRequest.setAlertTargets(MosadConstants.POP_UID);try { CreateRemindResponse createResponse = client.getAcsResponse(createRemindRequest);MosadReturnModelParser.parse("createRemindTest", gson.toJson(createResponse));Assert.assertTrue(createResponse.getData() > 0);} catch (Exception ex) {ex.printStackTrace();return;}

运维中心主要提供周期任务、手动业务流程、基线查询、告警配置和查询等相关API,可参考DataWorks OpenAPI文档。

步骤五:配置数据质量监控

在这个业务场景中,我们通过前面介绍的API已经可以每天定时把数据从RDS同步到MaxCompute的表中了。如果我们担心产生脏数据或者数据缺失影响到线上业务,那么可通过数据质量API来集成DataWorks数据质量监控能力,当表数据产出异常时,可以立刻触发给规则订阅人。

        CreateQualityRuleRequest request = new CreateQualityRuleRequest();request.setBlockType(0);request.setComment("test-createTemplateRuleSuccess");request.setCriticalThreshold("50");request.setEntityId(entityId);request.setOperator("abs");request.setPredictType(0);request.setProjectName(PROJECT_NAME);request.setProperty("table_count");request.setPropertyType("table");request.setRuleName("createTemplateRuleSuccess");request.setRuleType(0);request.setTemplateId(7);request.setWarningThreshold("10");try {CreateQualityRuleResponse response = client.getAcsResponse(request);Object data = ReturnModelParser.parse("createTemplateRuleSuccess", gson.toJson(response));Long templateRuleId = Long.parseLong(data.toString());Assert.assertTrue(templateRuleId > 0);return templateRuleId;} catch (Exception e) {e.printStackTrace();Assert.assertFalse(true);return null;}

CreateQualityRule、GetQualityFollower、CreateQualityRelativeNode等数据质量API集可管理数据质量规则,更多数据质量API可参考DataWorks OpenAPI文档。

步骤六:生成数据服务API

我们通过元数据API完成了表创建,通过数据开发API完成文件和周期任务创建,通过数据质量和运维中心API配置好了监控规则,MaxCompute分区表数据亦可顺利产生,这时候我们还需要最后一个步骤把MaxCompute分区表的数据通过数据服务OpenAPI生成一个数据服务API向系统提供数据服务。

        CreateDataServiceApiRequest createRequest = new CreateDataServiceApiRequest();createRequest.setTenantId(tenantId);createRequest.setProjectId(projectId);createRequest.setApiMode(apiMode);createRequest.setApiName(apiName);createRequest.setApiPath(apiPath);createRequest.setApiDescription("test");createRequest.setGroupId(groupId);createRequest.setVisibleRange(visibleRange);createRequest.setTimeout(10000);createRequest.setProtocols(protocols);createRequest.setRequestMethod(requestMethod);createRequest.setResponseContentType(responseType);CreateDataServiceApiResponse createResponse = client.getAcsResponse(createRequest);Long apiId = createResponse.getData();Assert.assertNotNull(apiId);GetDataServiceApiRequest getRequest = new GetDataServiceApiRequest();getRequest.setTenantId(tenantId);getRequest.setProjectId(projectId);getRequest.setApiId(apiId);GetDataServiceApiResponse getResponse = client.getAcsResponse(getRequest);GetDataServiceApiResponse.Data data = getResponse.getData();Assert.assertEquals(apiId, data.getApiId());Assert.assertEquals(0L, data.getFolderId().longValue());

使用CreateDataServiceApi、PublishDataServiceApi可把表数据转换成数据服务API,那么整个数据生产链路就完成了,集成以上的DataWorks OpenAPI即完成了本地系统和云上系统的无缝对接。

API调试小工具

DataWorks发布的所有API全部可在线调试,并以可见即所得的方式产生源码,这样可大大提高OpenAPI的开发效率,强烈推荐使用。DataWorks OpenAPI调试入口>>

总结

工欲善其数,必先利其器!DataWorks OpenAPI是2020年正式发布的企业数据开发提效神器。通过OpenAPI的方式,能够极大地提高企业使用DataWorks产品能力的灵活性。目前已发布150+个OpenAPI,并且还在持续增加中。本期实战旨在帮助企业用户了解如何快速上手DataWorks OpenAPI的实践应用,通过场景化的实战演练体验DataWorks OpenAPI的强大能力。实战系列内容持续更新中,感谢大家的关注!

关于Dataworks OpenAPI开通要求和开放地域可查阅DataWorks OpenAPI概述
限DataWorks企业版及以上使用立即开通

开通7天试用与折扣请使用钉钉扫码联系

原文链接:https://developer.aliyun.com/article/781486?

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

从实战中了解数据开发全流程——DataWorks OpenAPI实战相关推荐

  1. 一文解锁华为云新技能-AIOT开发全流程【设备接入-ESP端侧数据收集[MQTT]-实时数据分析】(步步截图较详细)

    一文解锁华为云新技能-AIOT开发全流程[设备接入-ESP端侧数据收集[MQTT]-实时数据分析](步步截图较详细) 在这篇你将会接触到:从物联网工程师从硬件层-通信层到应用层全流程:开发华为云最基本 ...

  2. 智能门锁开发全流程分享

    本文分享自中移OneOS微信公众号<智能门锁开发全流程>,作者:小O. <<<Python是计算机中一门公认的简单并且容易学习的语言,它的语法简单友好,拥有丰富的库资源和 ...

  3. MindSpore框架TBE算子开发全流程

    本文为MindSpore框架TBE算子开发全流程的图文案例.         视频案例请移步MindsSpore框架TBE算子开发全流程 MindSpore框架TBE算子开发全流程 一.工具介绍 1. ...

  4. 菜鸟看的Android应用开发全流程

    给菜鸟看的Android应用开发全流程--好多Android开发中,没人告诉过你的事 很多菜鸟开始学习Android开发,去网上搜集过很多"Android开发教程",但是搜索出来的 ...

  5. 国内征信行业模型开发全流程详解

    1. 前言 目前国内的金融体系主要由银行.互联网消费金融.助贷机构组成,本人参与过国内外大型银行.消金.助贷机构的征信模型开发,相对而言,对当前国内的征信模型具有一定的发言权.下面,我将从技术角度全面 ...

  6. 深度学习大厂前端项目开发全流程全流程

    用户审美的要求越来越高,也就使得越来越多的公司和企业注重线上用户的体验,都想要向外界传达出众的气质形象和重要信息,所以,Web前端人员的需求也越来越大. 深度学习大厂项目开发全流程全流程 在国外企业, ...

  7. RPD快速产品开发全流程详解

    一.什么是RPD? RPD定义-Rapid Product Development(快速产品开发): 借鉴了业界主流的产品开发流程:IPD.敏捷开发等: 结合企业当前软硬件开发实践,RPD是包括了思想 ...

  8. 【微信小程序开发全流程】篇章0:基于JavaScript开发的校园综合类微信小程序的概览

    基于JavaScript开发的校园综合类微信小程序的概览 本文仅供学习,未经同意请勿转载 一些说明:上述项目来源于笔者我本科大三阶段2020年电子设计课程项目,在这个项目中,我主要是负责的部分有前端, ...

  9. 次世代游戏美术资源开发全流程及常用的软件

    次世代游戏美术资源开发全流程案例 外国艺术家FlyCat用Blender完成的次世代悟空(胸像)全流程 外国艺术家FlyCat用Blender完成的次世代悟空(胸像)全流程 流程图

最新文章

  1. 分享:Dlib 17.49 发布,跨平台 C++ 通用库
  2. python实习做什么工作-大一/大二学生Python实习的困惑?
  3. Django2.2 pymysql 连接mysql数据库的坑
  4. 利用小工具instsrv和srvany 创建windows服务
  5. Linux 命令行上执行多个命令(分隔符简介使用)
  6. 用C++实现单链表的创建、逆置和输出 的两种方法
  7. zTree v2.6 - v3.0 文件对比
  8. hdu - 3415 Max Sum of Max-K-sub-sequence
  9. 纯CSS实现帅气的SVG路径描边动画效果
  10. 无法卸载_六月累积更新又出问题:打印机故障 部分程序无法打开和卸载
  11. vpc数量上限_服务器虚拟机最大数量限制
  12. Android-HandlerThread详解
  13. ADB工具华为鸿蒙,adb工具包华为版
  14. oracle讲表通过主键去重,数据库试题,数据库基础试题及答案
  15. 用线段树写Dijkstar
  16. oracle热备检查,oracle 手动热备
  17. 2017-08-25阿里校招笔试题---菜鸟仓库
  18. 程序员的英文代号_构建一个代号为1的聊天应用程序4
  19. Flutter 使用 ESC/POS蓝牙或以太网库控制热敏打印机
  20. win10更新失败 无法安装 Windows,因为这台电脑的磁盘布局不受UEFI固件支持

热门文章

  1. Python可视化 | Matplotlib绘制圆环图的两种方法!
  2. 计算机主机内置的地址码被称为,2016年职称计算机考试WPS_Office单选练习试题1
  3. php 我已阅读并同意 判断,phb.php
  4. java中浮点数的表示_java 浮点数表示法
  5. oracle12c导11g,Oracle12c的数据库如何向11g导入
  6. 【Hadoop应用案例】针对运营商支付业务的渠道推荐系统
  7. ubuntu 构建 deb 安装包
  8. [Alpha阶段]第六次Scrum Meeting
  9. HttpClient-01基本概念
  10. linux基础命令学习(四)用户与群组