创建数据处理管道

问题

你想以数据管道 (类似 Unix 管道) 的方式迭代处理数据。比如,你有个大量的数据需要处理,但是不能将它们一次性放入内存中。

解法

生成器函数是一个实现管道机制的好办法。为了演示,假定你要处理一个非常大的日志文件目录:

foo/access-log-012007.gzaccess-log-022007.gzaccess-log-032007.gz...access-log-012008
bar/access-log-092007.bz2...access-log-022008

假设每个日志文件包含这样的数据:

124.115.6.12 - - [10/Jul/2012:00:18:50 -0500] "GET /robots.txt ..." 200 71
210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /ply/ ..." 200 11875
210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /favicon.ico ..." 404 369
61.135.216.105 - - [10/Jul/2012:00:20:04 -0500] "GET /blog/atom.xml ..." 304 -
...

为了处理这些文件,你可以定义一个由多个执行特定任务独立任务的简单生成器函数组成的容器。就像这样:

import os
import fnmatch
import gzip
import bz2
import re
def gen_find(filepat, top):'''Find all filenames in a directory tree that match a shell wildcard pattern'''for path, dirlist, filelist in os.walk(top):for name in fnmatch.filter(filelist, filepat):yield os.path.join(path,name)def gen_opener(filenames):'''Open a sequence of filenames one at a time producing a file object.The file is closed immediately when proceeding to the next iteration.'''for filename in filenames:if filename.endswith('.gz'):f = gzip.open(filename, 'rt')elif filename.endswith('.bz2'):f = bz2.open(filename, 'rt')else:f = open(filename, 'rt')yield ff.close()def gen_concatenate(iterators):'''Chain a sequence of iterators together into a single sequence.'''for it in iterators:yield from itdef gen_grep(pattern, lines):'''Look for a regex pattern in a sequence of lines'''pat = re.compile(pattern)for line in lines:if pat.search(line):yield line

现在你可以很容易的将这些函数连起来创建一个处理管道。比如,为了查找包含单词 python 的所有日志行,你可以这样做:

lognames = gen_find('access-log*', 'www')
files = gen_opener(lognames)
lines = gen_concatenate(files)
pylines = gen_grep('(?i)python', lines)
for line in pylines:print(line)

如果将来的时候你想扩展管道,你甚至可以在生成器表达式中包装数据。比如,下面这个版本计算出传输的字节数并计算其总和。

lognames = gen_find('access-log*', 'www')
files = gen_opener(lognames)
lines = gen_concatenate(files)
pylines = gen_grep('(?i)python', lines)
bytecolumn = (line.rsplit(None,1)[1] for line in pylines)
bytes = (int(x) for x in bytecolumn if x != '-')
print('Total', sum(bytes))

讨论

以管道方式处理数据可以用来解决各类其他问题,包括解析,读取实时数据,定时轮询等。

为了理解上述代码,重点是要明白 yield 语句作为数据的生产者而 for 循环语句作为数据的消费者。当这些生成器被连在一起后,每个 yield 会将一个单独的数据元素传递给迭代处理管道的下一阶段。在例子最后部分, sum() 函数是最终的程序驱动者,每次从生成器管道中提取出一个元素。

这种方式一个非常好的特点是每个生成器函数很小并且都是独立的。这样的话就很容易编写和维护它们了。很多时候,这些函数如果比较通用的话可以在其他场景重复使用。并且最终将这些组件组合起来的代码看上去非常简单,也很容易理解。

使用这种方式的内存效率也不得不提。上述代码即便是在一个超大型文件目录中也能工作的很好。事实上,由于使用了迭代方式处理,代码运行过程中只需要很小很小的内存。

在调用 gen concatenate() 函数的时候你可能会有些不太明白。这个函数的目的是将输入序列拼接成一个很长的行序列。 itertools.chain() 函数同样有类似的功能,但是它需要将所有可迭代对象最为参数传入。在上面这个例子中,你可能会写类似这样的语句 lines = itertools.chain(*files) ,使得 gen_opener() 生成器能被全部消费掉。但由于 gen_opener() 生成器每次生成一个打开过的文件,等到下一个迭代步骤时文件就关闭了,因此 china() 在这里不能这样使用。上面的方案可以避免这种情况。

gen_concatenate() 函数中出现过 yield from 语句,它将 yield 操作代理到父生成器上去。语句 yield from it 简单的返回生成器 it 所产生的所有值。关于这个我们在 4.14 小节会有更进一步的描述。

最后还有一点需要注意的是,管道方式并不是万能的。有时候你想立即处理所有数据。然而,即便是这种情况,使用生成器管道也可以将这类问题从逻辑上变为工作流的处理方式。

David Beazley 在他的 Generator Tricks for Systems Programmers 教程中对于这种技术有非常深入的讲解。可以参考这个教程获取更多的信息。

《Python Cookbook 3rd》笔记(4.13):创建数据处理管道相关推荐

  1. 《Python Cookbook 3rd》笔记汇总

    文章目录 一.数据结构 二.字符串和文本 三.数字.日期和时间 四.迭代器与生成器 五.文件与IO 一.数据结构 标题 关键词 1.1:拆分序列后赋值给多个变量 可迭代对象.拆分赋值 1.2:拆分任意 ...

  2. 《Python Cookbook 3rd》笔记(1.4):查找最大或最小的N个元素

    <Python Cookbook 3rd>1.4:查找最大或最小的N个元素 问题 怎样从一个集合中获得最大或者最小的N个元素列表? 解法 heapq 模块有两个函数:nlargest()和 ...

  3. 《Python cookbook》笔记二

    <Python cookbook>笔记二 第二章 字符串和文本 -使用多个界定符分割字符串- 你需要将一个字符串分割为多个字段,但是分隔符 (还有周围的空格) 并不是固定 的. # str ...

  4. Machine Learning with Python Cookbook 学习笔记 第8章

    Chapter 8. Handling Images 前言 本笔记是针对人工智能典型算法的课程中Machine Learning with Python Cookbook的学习笔记 学习的实战代码都放 ...

  5. Machine Learning with Python Cookbook 学习笔记 第9章

    Chapter 9. Dimensionality Reduction Using Feature Extraction 前言 本笔记是针对人工智能典型算法的课程中Machine Learning w ...

  6. Machine Learning with Python Cookbook 学习笔记 第6章

    Chapter 6. Handling Text 本笔记是针对人工智能典型算法的课程中Machine Learning with Python Cookbook的学习笔记 学习的实战代码都放在代码压缩 ...

  7. Python Cookbook 3rd Edition Documentation

    Python Cookbook 3rd Edition Documentation 文章目录 第一章:数据结构和算法 1.1 解压序列赋值给多个变量 问题 解决方案 讨论 1.2 解压可迭代对象赋值给 ...

  8. 《Python cookbook》笔记一

    <Python cookbook> 第一章 数据结构和算法 - *号解压多个变量 - 如果一个可迭代对象的元素个数超过变量个数时,会抛出一个 ValueError .那么 怎样才能从这个可 ...

  9. 《Python Cookbook 3rd》笔记(5.19):创建临时文件和文件夹

    创建临时文件和文件夹 问题 你需要在程序执行时创建一个临时文件或目录,并希望使用完之后可以自动销毁掉. 解法 tempfile 模块中有很多的函数可以完成这任务.为了创建一个匿名的临时文件,可以使用 ...

最新文章

  1. 开发部署提速8倍!这款IDE插件了解一下?
  2. how to become a very good candidate at the investment bank?
  3. 一种新的穿透防火墙的数据传输技术
  4. iis设置服务器文件权限设置,iis7如何配置写入权限以上传文件---急急急
  5. Exchange2010应用地址列表
  6. AE物体表面跟踪特效合成高级插件:Lockdown for Mac
  7. 用事件驱动编程解救臃肿的代码
  8. Hyperledger Fabric 1.0 从零开始(十二)——fabric-sdk-java应用【补充】
  9. win10X64 + vs2015通过Cmake编译Opencv(一)
  10. 流编辑器 SED 十分钟入门全教程
  11. python游戏开发引擎_【图片】有关Ren'Py引擎的游戏制作汉化教程【linux游戏吧】_百度贴吧...
  12. java sql 搜索拼音
  13. 使用jQuery美化英雄联盟
  14. 黑莓7290无信号问题
  15. 数据库建模工具Pdman
  16. 【案例】做一个电话簿(二)
  17. Java基础-数组01
  18. 仿支付宝头像外加一个边框的工具类
  19. Ciphertext-Policy Attribute-Based Encryption (CP-ABE) Toolkit Installation
  20. python怎么另存为_python如何IE另存为附件

热门文章

  1. 在WinCE5.0和WinCE6.0下,编译选项介绍
  2. virtualbox主机网络管理 未能创建_如何在 VirtualBox 中增加现有虚拟机的磁盘大小 | Linux 中国...
  3. java方法有excel实现_Java实现EXCEL操作(1)
  4. 单选按钮_PerlTk教程之按钮Button、复选按钮Checkbutton、单选按钮Radiobutton(附完整代码)...
  5. hadoop namenode启动不了_集群版hadoop安装,写给大忙人看的
  6. 【转】ABP源码分析四十四:ZERO的配置
  7. 【转】文本文件和二进制文件区别及java中字节流和字符流归纳
  8. ABP入门系列(5)——展现层实现增删改查
  9. 【MOSS】快速调试Sharepoint站点
  10. hive或mysql报错Too many connections