Dask源码剖析是一个专栏,更多章节请点击文章列表查看。后续我会更新更多内容上来。

文章目录

  • Collection:Array
    • Array的创建
      • from_array加载

Collection:Array


前面我们了解了Bag数据模型。通过阅读Bag的源码我们大概熟悉了Dask的数据模型的套路。操作有加载,有计算,都是先转成Delayed,并按照逻辑构建成Graph。真正的计算得到结果需要执行compute提交到集群或在本地执行,并且dask很智能的会graph进行优化。

本节我们看下Array数据结构。
从源码目录里,我们可以很快的定位到Array的包:

就在dask源码包的下面,有个array的包。而其核心代码都在core.py中。

还是老规矩,先构建一些简单的demo,然后根据demo通过debug的形式看下是如何实现的。(例子来自dask-tutorial)

Array的创建

这回dask-tutorial给的示例是构建了一套demo的数据集,先运行以下,生成random.hdf5

%run prep.py -d random
# 通过三方库(可pip install h5py获取) h5py 加载数据集
# 下述操作只是获取了一个h5py文件指针,并不会把所有数据加载到内存。
import h5py
import os
f = h5py.File(os.path.join('data', 'random.hdf5'), mode='r')
# 数据没实际加载到内存,
dset = f['/x']

from_array加载

import dask.array as da
x = da.from_array(dset, chunks=(1000000,))
x

通过dask array提供的from_array函数可以构建Array对象。但hdf5里面的数据是和bag的from_sequence函数一样加载到内存里了,还是只是构建了一个delayed对象呢?
我们看下from_array的源码:

# dask/array/core.py
def from_array(x,chunks="auto",name=None,lock=False,asarray=None,fancy=True,getitem=None,meta=None,
):""" 从一个长得像array的对象中,创建一个dask array对象。什么叫长得像array的对象?其实就是像python原生的数组、np.ndarray,这些具备数组行为的对象。具体行为(方法)必须有``.shape``, ``.ndim``, ``.dtype``,然后还要支持numpy风格的切片(slicing)参数:----------x : 具备数组行为的对象chunks : int, tuple如何对数组进行分块. 必须是下述形式之一:- 传类似1000这样的整数,即分解成每个维度长度为1000的块,比如2250*2750,会分解成(3*3)个(1000,1000)的块.- 按shape进行分块,例如 (1000, 1000).- 像((1000, 1000, 500), (400, 400))这种,在每个维度上给出每块分块大小.- 指定每块的大小, 比如 "100 MiB" - 传入 "auto" 会使用 ``array.chunk-size`` 作为分块大小.- 传入-1 或 None 则作为1整块处理,不进行分块.name : str, optionalarray的名称. 默认是字符 ``x``的hash.默认情况下,使用Python标准的sha1算法. 当安装了 cityhash, xxhash 或 murmurhash.会变成相应的算法,这些算法在大规模的特征标记(创建token化的name时)可以获得更好的性能。当使用 ``name=False`` 会使用随机名称代替hashing.. 注意::由于name会被用于graph的key,所以必须保证key的唯一性lock : bool or Lock, optional如果 ``x`` 不支持并行读,那么传入True(Dask会帮你创建)或者指定的Lock对象。asarray : bool, optionalasarray在numpy中,会将类似array的数据结构转换成ndarray(是直接值引用,也就是说不会新开辟内存)。这种操作可以很方便的让任意类array的数据结构使用numpy的特性。dask的asarray与之类似,这个参数默认为None,当__array_function__未定义时,效果类似于将这个参数设置为True,即将各个分块转换成numpy array。而当asarray为False的时候,分块不转换成ndarray。fancy : bool, optionalfancy indexing是numpy的概念,很简单:即指传递索引数组以便一次得到多个数组元素。使用fancy indexing时要特别注意的一点是返回数组的shape反映的是索引数组的shape而不是被索引的原数组的shape。如果 ``x`` 不支持 fancy indexing (例如 用list或array做索引) 那么此参数为 False. 默认是 True.meta : Array-like, optionaldask array 结果的元数据.  一般由输入数组的slicing(分片操作)得到默认是输入数组."""if isinstance(x, Array):raise ValueError("Array is already a dask array. Use 'asarray' or " "'rechunk' instead.")elif is_dask_collection(x):warnings.warn("Passing an object to dask.array.from_array which is already a ""Dask collection. This can lead to unexpected behavior.")# 像例子中hdf5对象是不会在此转numpy的,否则相当于在client端加载数据了。if isinstance(x, (list, tuple, memoryview) + np.ScalarType):x = np.array(x)if asarray is None:# 例子中的hdf5是不具备__array_function__方法的,后续会借助numpy的asarray转ndarrayasarray = not hasattr(x, "__array_function__")previous_chunks = getattr(x, "chunks", None)# 这里chunk是一个tuple,存储了各chunks的分块大小chunks = normalize_chunks(chunks, x.shape, dtype=x.dtype, previous_chunks=previous_chunks)# 命名if name in (None, True):token = tokenize(x, chunks)original_name = "array-original-" + tokenname = name or "array-" + tokenelif name is False:original_name = name = "array-" + str(uuid.uuid1())else:original_name = name# 是否加锁,例子里是支持并行读的,所以没有加锁,lock为Falseif lock is True:lock = SerializableLock()# Always use the getter for h5py etc. Not using isinstance(x, np.ndarray)# because np.matrix is a subclass of np.ndarray.if type(x) is np.ndarray and all(len(c) == 1 for c in chunks):# No slicing neededdsk = {(name,) + (0,) * x.ndim: x}else:# getitem实际是函数的参数,但是文档里并没有暴露,应该是暂时不希望用户传入此参数。# 实际上getitem是定义了如何从x里加载数据到内存np.ndarray的方法。# 一般来说不传的话,会有三种gettter方法:if getitem is None:if type(x) is np.ndarray and not lock:# simpler and cleaner, but missing all the nuances of gettergetitem = operator.getitemelif fancy:# hdf5文件会通过此getter来获取数据,getter的代码我们在下面解读一下。getitem = getterelse:# 如果fancy参数为false,会用此逻辑获取getitem = getter_nofancy# 这里生成了graph,getem是此处的关键,代码我们在下面解读一下。dsk = getem(original_name,chunks,getitem=getitem,shape=x.shape,out_name=name,lock=lock,asarray=asarray,dtype=x.dtype,)dsk[original_name] = x# TileDB(一种分布式格点数据存储数据库),这里dask.array提供了对其的支持,可以看到和其他数据源的区别在于:# TileDB并没有把x作为meta,这里暂不深究。if x.__class__.__module__.split(".")[0] == "tiledb" and hasattr(x, "_ctx_"):return Array(dsk, name, chunks, dtype=x.dtype)# 和文档中提到的一样,如果meta元数据为空,则把它本身赋值给metaif meta is None:meta = xreturn Array(dsk, name, chunks, meta=meta, dtype=getattr(x, "dtype", None))# dask/array/core.py
def getter(a, b, asarray=True, lock=None):# a:数据源,b:sliceif isinstance(b, tuple) and any(x is None for x in b):b2 = tuple(x for x in b if x is not None)b3 = tuple(None if x is None else slice(None, None)for x in bif not isinstance(x, Integral))# 通过递归,解决多维度获取分块的问题return getter(a, b2, asarray=asarray, lock=lock)[b3]if lock:lock.acquire()try:c = a[b]if asarray:# 转numpy的ndarrayc = np.asarray(c)finally:if lock:lock.release()return c
def getem(arr,chunks,getitem=getter,shape=None,out_name=None,lock=False,asarray=True,dtype=None,
):"""dask从各种array-like的数据中获取分块数据的方法,其中getter即上面提到的3类getter之一。>>> getem('X', chunks=(2, 3), shape=(4, 6))  # doctest: +SKIP{('X', 0, 0): (getter, 'X', (slice(0, 2), slice(0, 3))),('X', 1, 0): (getter, 'X', (slice(2, 4), slice(0, 3))),('X', 1, 1): (getter, 'X', (slice(2, 4), slice(3, 6))),('X', 0, 1): (getter, 'X', (slice(0, 2), slice(3, 6)))}>>> getem('X', chunks=((2, 2), (3, 3)))  # doctest: +SKIP{('X', 0, 0): (getter, 'X', (slice(0, 2), slice(0, 3))),('X', 1, 0): (getter, 'X', (slice(2, 4), slice(0, 3))),('X', 1, 1): (getter, 'X', (slice(2, 4), slice(3, 6))),('X', 0, 1): (getter, 'X', (slice(0, 2), slice(3, 6)))}"""out_name = out_name or arrchunks = normalize_chunks(chunks, shape, dtype=dtype)# 这里会生成一个迭代器,输出(out_name,0、1、2...)作为keykeys = product([out_name], *(range(len(bds)) for bds in chunks))# 这里会构建slices的列表,标记start/end,便于分块slices = slices_from_chunks(chunks)if (has_keyword(getitem, "asarray")and has_keyword(getitem, "lock")and (not asarray or lock)):values = [(getitem, arr, x, asarray, lock) for x in slices]else:# Common case, drop extra parametersvalues = [(getitem, arr, x) for x in slices]# ok,这里会将(name,分块数作为key),对应的(getitem,arr,x)作为value,getitem暂时不会调用,# 会作为delayed对象后续执行按需调用。# arr是数据源,x在这里是slice,包含了重要的start/end信息。回过头来可以再看下上面getitem的源码,便于更深入理解。return dict(zip(keys, values)

上述整个从h5py对象到da.array的过程,会生成一个graph,用思维导图表示即:

这样我们就通过from_array成功构建了一个dask-array对象。其在jupyter里的展示还是很友好的:

小结一下,dask array数据结构与bag没有太大差异,也是将数据转换成了graph。如果传入的x支持异步加载(如hdf5),那么实际数据仍然是懒加载的。
其他的构建方式还有

  • from_zarr
  • from_tiledb
  • from_npy_stack
  • from_delayed
    这里就不一一介绍了,大致原理一致

03 Dask源码剖析-Dask的数据模型-Array相关推荐

  1. 01 Dask源码剖析-Dask的数据模型-Delayed

    Dask源码剖析是一个专栏,更多章节请点击文章列表查看.后续我会更新更多内容上来. 文章目录 Dask的数据模型概述 Collections:Delayed delayed函数 Delayed对象和D ...

  2. 02 Dask源码剖析-Dask的数据模型-Bag

    Dask源码剖析是一个专栏,更多章节请点击文章列表查看.后续我会更新更多内容上来. 文章目录 Collection:Bag Bag的创建 从内存序列创建(from_sequence) 从文本文件创建( ...

  3. C++ STL源码剖析 tr1与std array

    C++ STL源码剖析 tr1与std array 深入底层,层层剖析by 光城 0.导语 源码剖析版本为gcc4.9.1. C++ tr1全称Technical Report 1,是针对C++标准库 ...

  4. LinkedList源码剖析

    1. LinkedList简介 LinkedList是基于双向循环链表(从源码中可以很容易看出)实现的,除了可以当作链表来操作外,它还可以当作栈,队列和双端队列来使用. LinkedList同样是非线 ...

  5. boost源码剖析之:泛型函数指针类boost::function(rev#3)

    boost源码剖析之:泛型函数指针类boost::function(rev#3) 刘未鹏 C++的罗浮宫(http://blog.csdn.net/pongba)   Note: 并非新作,03年曾放 ...

  6. 《GDAL源码剖析与开发指南》导读

    前言 GDAL源码剖析与开发指南 GDAL全称是Geospatial Data Abstraction Library(地理空间数据抽象库),是一个在X/MIT许可协议下读写空间数据(包括栅格数据和矢 ...

  7. 【C++标准库】std::string用法指南源码剖析

    文章目录 1.ASCII码 (1)计算机如何表达字符 2.C 语言中的字符类型 char (1)思想:char 即整数 (3)C 语言帮手函数 (4)C语言中的字符串 (4)C 语言转义符 3.C++ ...

  8. Python源码剖析[19] —— 执行引擎之一般表达式(2)

    Python源码剖析 --Python执行引擎之一般表达式(2) 本文作者: Robert Chen(search.pythoner@gmail.com ) 3.2     Simple.py 前面我 ...

  9. 老李推荐:第14章4节《MonkeyRunner源码剖析》 HierarchyViewer实现原理-装备ViewServer-端口转发 1...

    老李推荐:第14章4节<MonkeyRunner源码剖析> HierarchyViewer实现原理-装备ViewServer-端口转发 在初始化HierarchyViewer的实例过程中, ...

最新文章

  1. 视频教学动作修饰语:CVPR2020论文解析
  2. java中读取文件的方法
  3. 深度协同过滤:用神经网络取代内积建模
  4. java随机抽题系统_在用java做一个在线考试系统,随机抽题遇到了问题,我写了一个随机抽题的方法,不知道在asp按钮中怎么用...
  5. vue中如何在地图中标点…
  6. 用Redis存储Tomcat集群的Session
  7. grpc-go客户端源码分析
  8. 关于atollic truestudio for stm32
  9. php的链接查询,php – 使用指向另一个查询的链接运行查询.
  10. shit!Vxworks!Shit!WorkBench!
  11. adb Error: failed to write; /data/local/tmp/??.apk (No such file or directory)
  12. Peewee的基本使用
  13. JVM和操作系统的关系是什么?
  14. (转载)有关推挽输出、开漏输出、复用开漏输出、复用推挽输出以及上拉输入、下拉输入、浮空输入、模拟输入区别...
  15. 用style标签的background-image属性 改变图片大小
  16. 计算机组装师分为哪几步,电脑组装主要需要学习哪几个方面?难学吗?
  17. HSM(安全管理平台)
  18. 【docker】docker常用命令总结
  19. java计算机毕业设计小说阅读网站源码+系统+数据库+lw文档+mybatis+运行部署
  20. 【Bigger】如何区分三个大数据热门职业

热门文章

  1. MonkeyRunner API(一)
  2. 育儿宝宝参考(身高)
  3. 个人网站搭建 03——Hexo + Github 博客搭建
  4. android viewholder静态,android – 静态ViewHolder并在使用RecyclerView时获取上下文
  5. 【安装教程】——安装显卡驱动
  6. ofice2007 没有下拉框模糊查询功能
  7. Mosquitto 权限管理
  8. 安信可分享 | 分享一个基于airkiss协议的配网小程序,实现小程序一键配网安信可ESP32C3\ESP8266\ESP32\ESP32S2系列的模组。(附带源码)
  9. Mongodb 被忽略的 数据类型 索引种类 与限制 与如何导向开发者 (1 常用数据类型)...
  10. java异常[java.util.regex.patternsyntaxexception dangling meta character ‘+‘ near index]解决