文章目录

  • 1. 阿里云 python sdk的安装和使用
    • 1.1 PIP安装:
    • 1.2 使用AccessKey简单调用ECS
  • 2. Datahub Python SDK的安装和使用
    • 2.1 安装pydatahub
    • 2.2 使用pydatahub调用Datahub SDK
      • 2.2.1 导入pydatahub库包
    • 2.2.2 连接Datahub
    • 2.2.3 创建新的project
      • 查看已有的project
    • 2.2.4 创建新的Topic
      • 2.2.4.1 Tuple类型的Topice创建
      • 2.2.4.2 BLOB类型的Topice创建
    • 2.2.5 获取shard列表
    • 2.2.6 发布数据
      • 2.2.6.1 发布TUPLE数据
      • 2.2.6.2 发布BLOB数据

1. 阿里云 python sdk的安装和使用

参考:

阿里云python sdk的使用: https://help.aliyun.com/document_detail/67117.html?spm=a2c4g.11186623.6.544.4b3042c5OiDjys

阿里云region id 地域和可用区:https://help.aliyun.com/document_detail/40654.html?spm=5176.10695662.1996646101.1.46b36dd0Sy8kEd

1.1 PIP安装:

pip install aliyun-python-sdk-core # 安装阿里云SDK核心库
pip install aliyun-python-sdk-ecs # 安装管理ECS的库

1.2 使用AccessKey简单调用ECS

  1. 导入相关产品的SDK (此处为ECS )

    from aliyunsdkcore.client import AcsClient
    from aliyunsdkcore.acs_exception.exceptions import ClientException
    from aliyunsdkcore.acs_exception.exceptions import ServerException
    from aliyunsdkecs.request.v20140526 import DescribeInstancesRequest
    from aliyunsdkecs.request.v20140526 import StopInstanceRequest
    
  2. 新建一个AcsClient

    client = AcsClient("<your-access-key-id>", "<your-access-key-secret>","<your-region-id>"
    );
    

    注意:Region ID 为你购买相应产品时选择的区域ID, 详见 Region ID文档。

    国内Region ID 表:

    地域名称 所在城市 Region ID 可用区数量
    华北 1 青岛 cn-qingdao 2
    华北 2 北京 cn-beijing 8
    华北 3 张家口 cn-zhangjiakou 3
    华北 5 呼和浩特 cn-huhehaote 2
    华北 6 乌兰察布 cn-wulanchabu 2
    华东 1 杭州 cn-hangzhou 8
    华东 2 上海 cn-shanghai 7
    华南 1 深圳 cn-shenzhen 5
    华南 2 河源 cn-heyuan 2
    西南 1 成都 cn-chengdu 2
    中国香港 香港 cn-hongkong 2
  3. 创建Request对象。

    request = DescribeInstancesRequest.DescribeInstancesRequest()
    request.set_PageSize(10)
    
  4. 发起调用并处理返回。

    try:response = client.do_action_with_exception(request)print(response)
    except ServerException as e:print(repr(e))
    except ClientException as e:print(repr(e))
    

2. Datahub Python SDK的安装和使用

参考:

Datahub名词解释及官方文档: https://help.aliyun.com/document_detail/158776.html?spm=a2c4g.11186623.6.545.3af9329a3eUfXg

Python库pydatahub安装指南:https://pydatahub.readthedocs.io/zh_CN/latest/installation.html

pydatahub在github源码及Demo: https://github.com/aliyun/aliyun-datahub-sdk-python/tree/master/examples

2.1 安装pydatahub

pip install pydatahub

2.2 使用pydatahub调用Datahub SDK

2.2.1 导入pydatahub库包

import sys
import traceback
from datahub import DataHub
from datahub.exceptions import ResourceExistException
from datahub.models import FieldType, RecordSchema, TupleRecord, BlobRecord, CursorType, RecordType

2.2.2 连接Datahub

access_key_id = "阿里云accesskey id" # 阿里云accesskey id
access_key_secret = "阿里云accesskey secret" # 阿里云accesskey secret
end_point = "endpoint地址" # endpoint地址dh = DataHub(access_key_id, access_key_secret, end_point)

注意: 此处的endpoint对应的是相应阿里云产品的endpoint互联网地址。详见endpoint管理和介绍

对应此处,endpoint则是Datahub控制台中的互联网ID ,查看路径: Datahub数据总线控制台 > 概览 > 常用信息 > 外网(互联网)

2.2.3 创建新的project

project_name = 'test_py_sdk'
comment = 'test creating project with python sdk!'
try:dh.create_project(project_name, comment)print("create project '{}' success!".format(project_name))
except Exception as e:print(repr(e))

查看已有的project

dh.list_project()

2.2.4 创建新的Topic

2.2.4.1 Tuple类型的Topice创建

topic_name = "test_py_sdk_topic_tuple"
shard = 1
life_cycle = 3
record_schema = RecordSchema.from_lists(['bigint_field', 'string_field', 'double_field', 'bool_field', 'time_field'],[FieldType.BIGINT, FieldType.STRING, FieldType.DOUBLE, FieldType.BOOLEAN, FieldType.TIMESTAMP])
topic_comment = "test tuple topic"try:# 调用创建命令dh.create_tuple_topic(project_name, topic_name, shard, life_cycle, record_schema, topic_comment)print("create tuple topic '{}' success!".format(topic_name))
except Exception as e:print(repr(e))

注意:Tuple类型Topic写入的数据是有格式的,需要指定Record Schema,目前阿里云的python SDK仅仅支持以下几种类型(Java SDK支持更多的, 可能是觉得python不配?):

类型 含义 值域
Bigint 8字节有符号整型。请不要使用整型的最小值 (-9223372036854775808),这是系统保留值。 -9223372036854775807 ~ 9223372036854775807
String 字符串,只支持UTF-8编码。 单个String列最长允许1MB。
Boolean 布尔型。 可以表示为True/False,true/false, 0/1
Double 8字节双精度浮点数。 -1.0 10308 ~ 1.0 10308
TimeStamp 时间戳类型 表示到微秒的时间戳类型

测试的话,此处shard设置为1就好了,毕竟是收钱的,按个收费。生产使用也要尽量评估shard的使用个数。

以下是关于datahub 的限制描述(可根据此表评估shard使用数量)

限制项 描述 值域范围
活跃shard数 每个topic中活跃shard数量限制 (0,256]
总shard数 每个topic中总shard数量限制 (0,512]
Http BodySize http请求中body大小限制 4MB
单个String长度 数据中单个String字段长度限制 2MB
Merge/Split频率限制 每个新产生的shard在一定时间内不允许进行Merge/Split操作 5s
QPS限制 每个Shard写入QPS限制(非Record/s,Batch写入同一Shard仅计算为1次) 2000
Throughput限制 每个Shard写入每秒吞吐限制 5MB/s
Project限制 每个云账号能够创建的Project上限 50
Topic限制 每个Project内能创建的Topic数量限制,如有特殊请求请联系管理员 500
Topic Lifecycle限制 每个Topic中数据保存的最大时长,单位是天 [1,7]

2.2.4.2 BLOB类型的Topice创建

topic_name = "test_py_sdk_topic_bolb"
shard_count = 3
life_cycle = 7
topic_comment = "creating blob topic by using python sdk"try:# 调用创建命令dh.create_blob_topic(project_name, topic_name, shard_count, life_cycle, topic_comment)print("create blob topic '{}' success!".format(topic_name))
except Exception as e:print(repr(e))

2.2.5 获取shard列表

shard_result = dh.list_shard(project_name, topic_name)
shards = shard_result.shards
print(len(shards))
print(shards)

返回结果是一个ListShardResult对象,包含一个Shard对象的list,list中的每个元素是一个shard,可以获取shard_id,state状态,begin_hash_key,end_hash_key等信息

2.2.6 发布数据

发布数据前需要等候shard不在传输状态。

2.2.6.1 发布TUPLE数据

topic_name = "test_py_sdk_topic_tuple"
project_name = "test_py_sdk"
# 等候shards准备好
dh.wait_shards_ready(project_name, topic_name)
# 获取topic
topic_result = dh.get_topic(project_name, topic_name)
# 查看record的类型
print(topic_result.record_type)
# 查看此topic的schema信息,发布到topic的数据需要类型一致、字段类型(schema)一致
print(topic_result.record_schema)
# 获取topic的shema信息
record_schema =  topic_result.record_schema

写一个生成测试tuple 数据的函数:

def gen_tuple_data(num):"""生成10的num次方条tuple数据"""res = []for i in range(10**num):data = [i, "yc"+str(i),i/10.01, True,1455869335000000]res.append(data)return res

发布TUPLE数据的函数:

def publish_tuple_data(dh, project_name, topic_name, records):topic_result = dh.get_topic(project_name, topic_name)record_schema = topic_result.record_schematuple_recs = []for rec in records:tuple_rec = TupleRecord(schema=record_schema, values=rec)tuple_recs.append(tuple_rec)return dh.put_records(project_name, topic_name, tuple_recs)

发布十条数据到tuple类型的topic

tuple_data = gen_tuple_data(1)
publish_result = publish_tuple_data(dh, project_name, "test_py_sdk_topic", tuple_data)

发布一万条数据到tuple类型的topic

tuple_data = gen_tuple_data(4)
publish_result = publish_tuple_data(dh, project_name, "test_py_sdk_topic", tuple_data)

若是学习使用的话,此步慎重!!不然明天收到缴费单。

另,同一个shard一次不要传入大于4MB的数据,不然会返回413错误,当数据量过大时,控制好数据数量,做一下切片。

发布TUPLE数据的其他操作:

records0 = []
# 方法一
record0 = TupleRecord(schema=record_schema, values=[1, 'yc1', 10.01, True, 1455869335000000])
# 设置shard
record0.shard_id = '0'
record0.put_attribute('AK', '47')
records0.append(record0)
# 方法二
record1 = TupleRecord(schema=record_schema)
record1.set_value('bigint_field', 2)
record1.set_value('string_field', 'yc2')
record1.set_value('double_field', None)
record1.set_value('bool_field', False)
record1.set_value('time_field', 1455869335000011)
record1.hash_key = '4FFFFFFFFFFFFFFD7FFFFFFFFFFFFFFD'
records0.append(record1)
# 方法三
record2 = TupleRecord(schema=record_schema)
record2.set_value(0, 3)
record2.set_value(1, 'yc3')
record2.set_value(2,  1.1)
record2.set_value(3, False)
record2.set_value(4, 1455869335000011)
record2.attributes = {'key': 'value'}
record2.partition_key = 'TestPartitionKey'
records0.append(record2)
# 发布数据
put_result = dh.put_records(project_name, topic_name, records0)
# 发布结果
print(put_result)

2.2.6.2 发布BLOB数据

topic_name = 'test_py_sdk_topic_bolb'
project_name = "test_py_sdk"
records1 = []
record3 = BlobRecord(blob_data='data')
record3.shard_id = '0'
record3.put_attribute('a', 'b')
records1.append(record3)
put_result = dh.put_records(project_name, topic_name, records1)
put_result

看到这里点个赞吧,谢谢:)

【阿里云数据总线】Datahub使用Python SDK记录相关推荐

  1. 阿里云资源编排服务Python SDK使用入门

    阿里云资源编排服务(ROS)为我们维护云计算资源提供了一个低成本.可靠.标准化的方案.基于ROS提供的能力,我们只要编写和维护资源模板文件,就可以达到维护云计算资源的目的,而不再需要花费很多的时间通过 ...

  2. 阿里云视频点播的使用(SDK调用示例的运行)

    阿里云视频点播的使用(SDK调用示例的运行) 本文着重介绍阿里云视频点播的SDK的使用,实现这SDK方式的视频播放凭证和视频播放地址的获取. 阿里云视频点播SDK的使用方法 一:准备工作 1.给Acc ...

  3. 如何在阿里云上搭建个人网站(学习记录)

    如何在阿里云上搭建个人网站(学习记录) 第一次写博客记录学习的过程,不仅可以巩固学习知识,也方便日后复习.并且可以记录自己的成长. 先购买阿里ECS云服务器 购买网站https://promotion ...

  4. 利用MQTT协议与阿里云数据交互的python程序

    利用MQTT协议与阿里云数据交互的python程序 设计目的 功能要求和关键问题 环境配置问题 本地程序如何连接云上设备 云上的数据交互问题 界面设计问题 阿里云相关操作 本地程序 设计目的 设计开发 ...

  5. python海龟图画龙珠_阿里云天池龙珠计划——Python基础入门第2课:变量和赋值...

    阿里云天池龙珠计划--Python基础入门第2课:变量和赋值 [我是测试题2]下面这段代码的运行结果是什么? # 运行一下结果就出来了 a = "hello" b = " ...

  6. 关于阿里云服务器配置Ubuntu18.04+python环境

    关于阿里云服务器配置Ubuntu18.04+python环境 需要准备的软件有以下 Xshell Xftp 云服务器(这里以阿里云为例) 环境配置文件(附上下载链接) 点击前往git下载环境配置文件 ...

  7. 阿里云服务器centos7配置Python Django项目

    阿里云服务器centos7配置Python Django项目 2020-3-25更新更换服务器后还原系统+相应配置 2019-12-13更新 Nginx域名绑定 2019-12-12更新 静态文件加载 ...

  8. 阿里云SMTP邮件发送Python demon实践

    阿里云SMTP邮件发送Python & Django实践 说明: python 3.6 Django 2.2 阿里云轻应用服务器 注意:轻应用服务器上80 端口和25端口是被占用,465端口被 ...

  9. 阿里云数据盘扩容操作遇险记录

    阿里云数据盘扩容操作记录 阿里云数据盘40G的容量已经用了37G了,不扩容的话,肯定是不行了.于是开始准备扩容. 但是之前没有做过这样的操作.心中有点怕怕的,如果数据丢了可咋整. 不管,先创建一个快照 ...

最新文章

  1. 关于vivo 8.0和miui新系统android studio调试出现“包解析错误”的bug的解决办法
  2. 网络营销——网络营销专员表明网站地图助力网站收录进一步提升
  3. Colored Boots
  4. 试验设计与matlab数据分析 下载,试验设计与MATLAB数据分析(附光盘)
  5. 深入理解支持向量机(SVM)
  6. VUE 入门基础(3)
  7. yolov3为什么对大目标检测不好_从YOLOv1到YOLOv3,目标检测的进化之路
  8. MySql 中的=操作符
  9. 第八章 OGRE中合成器(也就是传说中的image-based rendering)以及如何在OGRE中对着色器的uniform变量传值
  10. Windows由于在创建转储期间出错,创建转储文件失败导致的蓝底白字蓝屏重启,最全细解决方案
  11. 解决OpenCV4出现“/usr/bin/ld:cannot find -lcudart_static.../usr/bin/ld: cannot find -lx86_64-linux-gnu“问题
  12. zk可实现分布式锁,Redis也可实现,之间有什么区别?
  13. oracle查询sql保留小数点前0,去掉小数位后末尾的0
  14. 六张图了解Python的赋值、浅复制、深复制
  15. vue-tv-focusable
  16. prometheus + cadvisor + grafana 监控容器和服务器
  17. Kteer软件 创建.ktr文件
  18. 数据结构教程(第五版 李春葆 上机实验题3 验证性实验)
  19. Vscode 调试arm64 linux内核
  20. 单用户MIMO系统(二):信道信息在发端未知

热门文章

  1. 嵌入式人工智能唱响2020年中国嵌入式技术大会!
  2. 使用小米手机进行真机测试
  3. CSDN不友好的收藏夹
  4. 饥荒联机版修改在线模式
  5. d954(D9546)
  6. PHP如何把三张图片均匀分布,C++ uniform_real_distribution连续均匀分布类模板用法详解...
  7. 公司25k招了一个测试员不会自动化,试用期没过就赶走了...
  8. mysql中的整除和取余函数
  9. 安卓虚拟摄像头_iPhone 的第四颗摄像头位置,为什么给了激光雷达?
  10. B站成长期UP主有哪些涨粉机会?