Dask源码剖析是一个专栏,更多章节请点击文章列表查看。后续我会更新更多内容上来。

文章目录

  • Collection:Bag
    • Bag的创建
      • 从内存序列创建(from_sequence)
      • 从文本文件创建(read_text)
      • Bag的构成
    • Bag的一些行为(Manipulation)
      • .take(k,npartitions)
      • .filter(predicate)
      • .map(func, *args, **kwargs)
  • 总结

Collection:Bag


对于Bag数据模型,其实从Dask官方进行的用户调研情况来看,这种数据模型较其他数据模型使用的情况是最少的:

但是Bag是较为简单的数据模型,对于理解其他数据模型,比如DataFrame、Array,我认为是有帮助的。所以咱们本节就先了解一下Bag。

从源码目录里,我们可以很快的定位到Bag的包:

就在dask源码包的下面,有个bag的包。而其核心代码都在core.py中。

还是老规矩,先构建一些简单的demo,然后根据demo通过debug的形式看下是如何实现的。(例子来自dask-tutorial)

Bag的创建

从内存序列创建(from_sequence)

import dask.bag as db
# 从序列创建bag
b = db.from_sequence([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], npartitions=2)
b.take(3)

上述例子从长度为10的整数数组,npartitions=2即分成两片,创建了一个bag。然后.take(3)即从bag中获取3个元素。
在from_sequence插入断点,我们开始深入源码,step into,可以看到:

# dask/bag/core.py
def from_sequence(seq, partition_size=None, npartitions=None):""" 从序列创建一个dask.Bag。如果非用这个方法,这个序列在内存中不要太大,最好让Dask Bag自己加载你的数据。比如我们加载一个文件名序列放入Bag中,然后使用“.map”打开它们。Parameters----------seq: Iterable一个可迭代的序列,导入dask中partition_size: int (可选)每个分片的大小npartitions: int (可选)分片的数量最好提供 ``partition_size`` 或 ``npartitions``参数,但别两个参数都都填另外参考:--------read_text: 从文本文件创建bag"""seq = list(seq)if npartitions and not partition_size:partition_size = int(math.ceil(len(seq) / npartitions))# 如果两个参数都不给,就按100的分片大小去切割元素if npartitions is None and partition_size is None:if len(seq) < 100:partition_size = 1else:partition_size = int(len(seq) / 100)# 这里是对序列进行切分的地方,这里还是用了toolz三方库,# 在客户端把seq序列按partition_size(此处为2)进行切分parts = list(partition_all(partition_size, seq))# 这时候parts=[(1, 2, 3, 4, 5), (6, 7, 8, 9, 10)]# 参考上一节Delayed介绍,这里应该是要开始转异步任务了,先创建了任务的name:name = "from_sequence-" + tokenize(seq, partition_size)# name = 'from_sequence-4206ac43a7f088cbbf77f5dc46ca024c'if len(parts) > 0:# enumerate不熟的看下https://www.runoob.com/python/python-func-enumerate.html# 这里实际就是把name和parts的序号作为key,part内容作为value,存到字典里。d = dict(((name, i), list(part)) for i, part in enumerate(parts))# d长这样:# {('from_sequence-4206ac43a7f088cbbf77f5dc46ca024c', 0): [1, 2, 3, 4, 5],# ('from_sequence-4206ac43a7f088cbbf77f5dc46ca024c', 1): [6, 7, 8, 9, 10]}else:d = {(name, 0): []}# 最后创建Bag,这里先不跟进细看了,我们先看完其他的构建方法。return Bag(d, name, len(d))

从文本文件创建(read_text)

上面的从客户端(往往是在笔记本上)内存创建的bag,受限于客户端的硬件配置,不适合大数据的加载。所以dask提供了read_text更为常见的数据加载方式。这里还是用了教程给的例子:

# 构建Demo数据:
%run prep.py -d accounts

这个时候在教程文件夹的data文件夹下,多了很多accounts.*.json.gz的数据:

总共50个吧。每个都是经过GZIP压缩的。所以扩展名是gz。每个大小在500KB左右。我们开始加载数据:

import os
b = db.read_text(os.path.join('data', 'accounts.*.json.gz'))
b.take(1)

通过debug,我们看下read_text内容:

# dask/bag/text.py
def read_text(urlpath,blocksize=None,compression="infer",encoding=system_encoding,errors="strict",linedelimiter=os.linesep,collection=True,storage_options=None,files_per_partition=None,include_path=False,
):""" 从文本文件中加载参数列表:----------urlpath : string 或 list 类型可以支持:绝对或相对路径、带有协议前缀的url,比如``s3://``。其他类型我们过会看代码吧。blocksize: None, int, or str这个比较有意思,把文件切割成多大的块(按bytes为单位)。None的话会根据流(如http的报文流)大小来切。也可以传个整数类型,或者"128MiB"这样的字符串compression: string文件的压缩格式,默认是根据文件自适应。encoding: stringerrors: stringlinedelimiter: stringcollection: bool, optional如果是True则返回dask.bag , 否则返回 delayed 数组storage_options: dict这块对于适配各种大数据平台比较有用,比如hdfs或s3的一些密码、host、port等等files_per_partition: None or int不设的话,一个文件一个分片(partition ),设了就按输入文件group 后再分片。这个参数和blocksize互斥。include_path: bool是否在Bag里包含path,是的话按元组 (line, path)构建bag,默认是不带path的例子-------->>> b = read_text('myfiles.1.txt')  # doctest: +SKIP>>> b = read_text('myfiles.*.txt')  # doctest: +SKIP>>> b = read_text('myfiles.*.txt.gz')  # doctest: +SKIP>>> b = read_text('s3://bucket/myfiles.*.txt')  # doctest: +SKIP>>> b = read_text('s3://key:secret@bucket/myfiles.*.txt')  # doctest: +SKIP>>> b = read_text('hdfs://namenode.example.com/myfiles.*.txt')  # doctest: +SKIP将未压缩字节数(blocksize)加载到分片:>>> b = read_text('largefile.txt', blocksize='10MB')  # doctest: +SKIPinclude_path=True的情况:>>> b = read_text('myfiles.*.txt', include_path=True) # doctest: +SKIP>>> b.take(1) # doctest: +SKIP(('first line of the first file', '/home/dask/myfiles.0.txt'),)返回值-------dask.bag.Bag 或 listdask.bag.Bag 或 Delayed 列表。取决于collection传True还是False"""# 这两个参数互斥,注释里提到过if blocksize is not None and files_per_partition is not None:raise ValueError("Only one of blocksize or files_per_partition can be set")if isinstance(blocksize, str):blocksize = parse_bytes(blocksize)# 这里用了fsspec三方库,通过官方文档可以知道,这是一个key支持多种存储后端的文件操作库# 查了下fsspec的官方文档:https://filesystem-spec.readthedocs.io/en/latest/api.html#built-in-implementations# 大概支持hdfs、gcs、s3、ftp等协议,当然也支持本地绝对路径与相对路径。files = open_files(urlpath,mode="rt",encoding=encoding,errors=errors,compression=compression,**(storage_options or {}))# 下面的逻辑是按照blocksize 或files_per_partition 进行分片。if blocksize is None:if files_per_partition is None:blocks = [delayed(list)(delayed(partial(file_to_blocks, include_path))(fil))for fil in files]else:blocks = []for start in range(0, len(files), files_per_partition):block_files = files[start : (start + files_per_partition)]block_lines = delayed(concat)(delayed(map)(partial(file_to_blocks, include_path), block_files,))blocks.append(block_lines)else:o = read_bytes(urlpath,delimiter=linedelimiter.encode(),blocksize=blocksize,sample=False,compression=compression,include_path=include_path,**(storage_options or {}))raw_blocks = o[1]blocks = [delayed(decode)(b, encoding, errors) for b in concat(raw_blocks)]if include_path:paths = list(concat([[path] * len(raw_blocks[i]) for i, path in enumerate(o[2])]))blocks = [delayed(attach_path)(entry, path) for entry, path in zip(blocks, paths)]if not blocks:raise ValueError("No files found", urlpath)# 是否把block的delayed转成Bag对象。if collection:blocks = from_delayed(blocks)return block

按照上述的例子加载50个json.gz,read_text我们实际上会按照一个文件一个block,最后把delayed构建成Bag对象中。用图表示Graph(DAG)如下:

回顾文本文件创建Bag的过程:

  1. 即从文件中逐行读取的操作,先从urlpath解析出文件fsspec对象(注意如果是分布式,其实是需要client和worker节点都可以访问到这些文件的路径的,client负责生成任务,取不到取不全这些本地路径是会有问题的,worker更不用说,加载的时候如果找不到文件也会有问题)
  2. 后续流程是转成了delayed
  3. 通过blocksize或files_per_partition对路径数组进行分片
  4. 通过file_to_blocks(生成器)按行读取文件
  5. 通过list获取生成器的值,转成list
  6. 将上述的delayed list作为Bag的Graph,对象
def file_to_blocks(include_path, lazy_file):with lazy_file as f:for line in f:yield (line, lazy_file.path) if include_path else line

Bag的构成

通过上述两种构建方法,Bag对象实际存储的是一个graph、它的name,还有就是它的分片(partition)数。而本身的数据,要么是内存中的元素数组,要么是获取元素的Delayed对象。

Bag的一些行为(Manipulation)

.take(k,npartitions)

上面例子中,最先被使用的,便是take方法了:

import dask.bag as db
b = db.from_sequence([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], npartitions=2)
# 获取3个元素
b.take(3)
import os
b = db.read_text(os.path.join('data', 'accounts.*.json.gz'))
# 获取1个元素
b.take(1)

下面我们看一下take,源码在dask/bag/core.py,这里就补贴源码了。
take接受4个参数:

  • k:获取元素的个数
  • npartitions:int型,可选,从多少个分片中获取元素(注意不是第几个),如果传-1就从所有分片中获取。
  • compute:bool型,可选,默认True,即计算返回结果
  • warn:bool型,如果传入的比实际Bag中有的元素个数多,发出告警

通过源码走读可以看到,take内部会创建一个新的Bag对象,而新的Bag对象(子Bag)是依赖于总的Bag的。如果获取元素的个数>1,会将结果通过toolz的concat函数拼接起来。子Bag的分片固定为1(毕竟take是想要获取结果了)。

那么问题来了,既然take创建的子Bag依赖于父Bag,那么是不是会把所有的元素(例如第二个例子里的50个json.gz)都加载到dask后,才能take呢?

如果从子Bag构建的graph看,是需要的,毕竟take依赖于父bag的bag-from-delayed-abd0…,而父Bag是会读取所有partitions的。但如果Dask真要这么搞,对大数据处理来说绝对是一场灾难,后面我也不会写下去了。通过跟源码我们发现,dask还是很智能的为我们解决了这个问题,其秘密就在take最后进行compute的时候,有optimize_graph这个选项。会对graph进行优化,以下截图自debug跟进到compute的时候:

优化后的dsk对象(要计算的graph)只会加载1个分片的文件。collections_to_dsk这一步是对其进行优化的关键,跟进去:

# dask/base.py
def collections_to_dsk(collections, optimize_graph=True, **kwargs):"""Convert many collections into a single dask graph, after optimization"""optimizations = kwargs.pop("optimizations", None) or config.get("optimizations", [])# 对graph进行优化# 假如optimize_graph改成False,那么是会对所有的Bag分片进行加载的。if optimize_graph:# 这里的optimization_function代码我粘贴到本函数后面.# 可以看到,真实的优化函数,会定义在__dask_optimize__属性里。也就说,# Bag模块有自己的优化函数groups = groupby(optimization_function, collections)_opt_list = []for opt, val in groups.items():_graph_and_keys = _extract_graph_and_keys(val)groups[opt] = _graph_and_keys# 调用优化函数,进行graph优化_opt_list.append(opt(_graph_and_keys[0], _graph_and_keys[1], **kwargs))for opt in optimizations:_opt_list = []group = {}for k, (dsk, keys) in groups.items():group[k] = (opt(dsk, keys), keys)_opt_list.append(opt(dsk, keys, **kwargs))groups = groupdsk = merge(*map(ensure_dict, _opt_list,))else:dsk, _ = _extract_graph_and_keys(collections)return dskdef optimization_function(x):return getattr(x, "__dask_optimize__", dont_optimize)

Bag的优化函数:

# dask/bag/core.py
def optimize(dsk, keys, fuse_keys=None, rename_fused_keys=None, **kwargs):""" Optimize a dask from a dask Bag. """dsk = ensure_dict(dsk)# 调用的公用的优化方法cull,就是在这里,Bag移除掉了不必要的加载。dsk2, dependencies = cull(dsk, keys)kwargs = {}if rename_fused_keys is not None:kwargs["rename_keys"] = rename_fused_keysdsk3, dependencies = fuse(dsk2, keys + (fuse_keys or []), dependencies, **kwargs)dsk4 = inline_singleton_lists(dsk3, keys, dependencies)dsk5 = lazify(dsk4)return dsk5

dask公用的优化方法:

# dask/optimization.py
def cull(dsk, keys):""" Return new dask with only the tasks required to calculate keys.In other words, remove unnecessary tasks from dask.``keys`` may be a single key or list of keys.Examples-------->>> d = {'x': 1, 'y': (inc, 'x'), 'out': (add, 'x', 10)}>>> dsk, dependencies = cull(d, 'out')  # doctest: +SKIP>>> dsk  # doctest: +SKIP{'x': 1, 'out': (add, 'x', 10)}>>> dependencies  # doctest: +SKIP{'x': set(), 'out': set(['x'])}Returns-------dsk: culled dask graphdependencies: Dict mapping {key: [deps]}.  Useful side effect to accelerateother optimizations, notably fuse."""if not isinstance(keys, (list, set)):keys = [keys]seen = set()dependencies = dict()out = {}work = list(set(flatten(keys)))while work:new_work = []for k in work:# 根据key找到真实依赖的任务# 由于take-xxxx实际依赖的并不是bag-from-delayed-xxx,而是# (bag-from-delayed-xxx,0),也就是说是可以定位到具体一个分片的# 所以可以理论上是可以对dask的graph进行裁剪的,裁剪方法大致逻辑就是用# (bag-from-delayed-xxx,0)作为key,在dsk字典里索引对应的子任务# 例如list、再索引到file-to-blockdependencies_k = get_dependencies(dsk, k, as_list=True)  # fuse needs listsout[k] = dsk[k]dependencies[k] = dependencies_kfor d in dependencies_k:if d not in seen:seen.add(d)new_work.append(d)work = new_workreturn out, dependencie

如果我们对optimize_graph改成False,那么dask是会把所有分片都加载到内存的。感兴趣的可以修改take的源码试一下:

# dask/bag/core.py
# def take(...):graph = HighLevelGraph.from_collections(name, dsk, dependencies=[self])b = Bag(graph, name, 1)# 改为False试一下if compute:return tuple(b.compute(optimize_graph=False))else:return b

.filter(predicate)

filter比较好理解,按照predicate(谓词)进行过滤,谓词函数会返回True或False,满足的结果会返回。

    def filter(self, predicate):""" Filter elements in collection by a predicate function.>>> def iseven(x):...     return x % 2 == 0>>> import dask.bag as db>>> b = db.from_sequence(range(5))>>> list(b.filter(iseven))  # doctest: +SKIP[0, 2, 4]"""# 可以看到,又是组件graph的三步,name、dsk、HighLevelGraphname = "filter-{0}-{1}".format(funcname(predicate), tokenize(self, predicate))dsk = dict(((name, i), (reify, (filter, predicate, (self.name, i))))for i in range(self.npartitions))graph = HighLevelGraph.from_collections(name, dsk, dependencies=[self])# 最后返回的仍是Bag对象,名字改为filter-xxx-xxx,分片数不变return type(self)(graph, name, self.npartitions)

.map(func, *args, **kwargs)

map其实是更通用一点的filter,如果说filter是调用了系统的filter结合传入的谓词函数实现的过滤,那么map也可以做类似的事情。
原理上也是根据传入的函数以及参数,构成graph。具体代码不再展开。

总结

  • Bag对象实际存储的是一个graph、它的name,还有就是它的分片(npartition)数。而本身的数据,要么是内存中的元素数组,要么是获取元素的Delayed对象。分片是把一个总的任务切分成多个子任务进行并行/分布式计算的根本。
  • Bag的行为(处理方法)是基于已有的graph,再进一步增加一些处理过程的graph。不会立即进行计算。
  • 真正的计算是graph触发compute时,进行一些优化(可选,默认是进行优化),选择调度器后才开始的。

02 Dask源码剖析-Dask的数据模型-Bag相关推荐

  1. 01 Dask源码剖析-Dask的数据模型-Delayed

    Dask源码剖析是一个专栏,更多章节请点击文章列表查看.后续我会更新更多内容上来. 文章目录 Dask的数据模型概述 Collections:Delayed delayed函数 Delayed对象和D ...

  2. 03 Dask源码剖析-Dask的数据模型-Array

    Dask源码剖析是一个专栏,更多章节请点击文章列表查看.后续我会更新更多内容上来. 文章目录 Collection:Array Array的创建 from_array加载 Collection:Arr ...

  3. JUC并发编程02——AQS源码剖析

    1.AQS介绍 相信每个Java Coder 都使用过或者至少听说过AQS, 它是抽象队列同步器AbstractQueuedSynchronizer 的简称,在juc包下.它提供了一套可用于实现锁同步 ...

  4. LinkedList源码剖析

    1. LinkedList简介 LinkedList是基于双向循环链表(从源码中可以很容易看出)实现的,除了可以当作链表来操作外,它还可以当作栈,队列和双端队列来使用. LinkedList同样是非线 ...

  5. stl源码剖析_STL之set源码剖析

    " 源码面前,了无秘密." 之前在 小bug蕴含大能量 中讲过一个和set相关的bug,说过要从红黑树到STL 红黑树,再到STL set的源码逐步掌握整个知识架构. 最近已经把这 ...

  6. 《GDAL源码剖析与开发指南》导读

    前言 GDAL源码剖析与开发指南 GDAL全称是Geospatial Data Abstraction Library(地理空间数据抽象库),是一个在X/MIT许可协议下读写空间数据(包括栅格数据和矢 ...

  7. Chrome源码剖析、上--多线程模型、进程通信、进程模型

    Chrome源码剖析.上 原著:duguguiyu. 整理:July. 时间:二零一一年四月二日. 出处:http://blog.csdn.net/v_JULY_v. 说明:此Chrome源码剖析很大 ...

  8. CHROME源码剖析 上《转》

    转自:http://www.blogjava.net/xiaomage234/archive/2012/02/16/370122.html 原著:duguguiyu. 整理:July. 时间:二零一一 ...

  9. Chrome-Chrome源码剖析

    Chrome源码剖析 [序] && [一] [序] 开源是口好东西,它让这个充斥着大量工业垃圾代码和教材玩具代码的行业,多了一些艺术气息和美的潜质.它使得每个人,无论你来自米国纽约还是 ...

最新文章

  1. 电子工程师必须知道的10个网站 !!!
  2. Intent 传递对象
  3. Flink 容错机制:Checkpoints、Savepoints
  4. Hadoop YARN:调度性能优化实践
  5. linux 通配符 元字符 转义字符
  6. 为什么我要构建这个脚手架
  7. pythonxml读写_python xml读取和写入
  8. Build desktop apps for Windows PCs
  9. 2016/06/11
  10. Java中正则表达式替换字符串
  11. mysql skip-grant-tables my.cnf_skip-grant-tables:修改mysql密码
  12. 关于ajax的content-download时间过慢问题的解决方案与思考
  13. php和python-Python与PHP的一些区别
  14. java date.set_解决Java Calendar类set()方法的陷阱
  15. scrapy 抓取 google play 应用信息
  16. MySQL open_tables和opened_tables
  17. 医疗his系统值不值得投入使用
  18. MySQL 5.6.21下载安装之安装篇(二)
  19. 阿里云centOS安装图形界面
  20. 土拍熔断意味着什么_315土拍将解地市之渴?“熔断”来了,别高兴太早

热门文章

  1. 安装 跨模态模型CLIP 或是遇到 AttributeError: module ‘clip‘ has no attribute ‘load‘
  2. HRESULT的详细说明
  3. Monkeyrunner命令安装APK
  4. 如何如何理解区分功能测试和非功能测试
  5. 设计模式-访问者模式的应用
  6. postman断言详解
  7. 【转】给一些准备进银行IT部门的同学的建议
  8. 价格贵不是它的错而是你的错!2020年价格最贵的3款全面屏手机
  9. 公司来了个大神,服务器缩减一半,性能反而提升7倍!跪了...
  10. 【信息检索】Java简易搜索引擎原理及实现(一)建立倒排索引