大家好,并发编程 进入第十章。

好了,今天的内容其实还挺多的,我准备了三天,到今天才整理完毕。希望大家看完,有所收获的,能给小明一个赞。这就是对小明最大的鼓励了。

为了更好地衔接这一节,我们先来回顾一下上一节的内容。

上一节「」,我们首先介绍了,如何创建一个协程对象.

主要有两种方法

通过async关键字,

通过@asyncio.coroutine 装饰函数。

然后有了协程对象,就需要一个事件循环容器来运行我们的协程。其主要的步骤有如下几点:

将协程对象转为task任务对象

定义一个事件循环对象容器用来存放task

将task任务扔进事件循环对象中并触发

为了让大家,对生成器和协程有一个更加清晰的认识,我还介绍了yield和async/await的区别。

最后,我们还讲了,如何给一个协程添加回调函数。

好了,用个形象的比喻,上一节,其实就只是讲了协程中的单任务。哈哈,是不是还挺难的?希望大家一定要多看几遍,多敲代码,不要光看哦。

那么这一节,我们就来看下,协程中的多任务。

.本文目录

协程中的并发

协程中的嵌套

协程中的状态

gather与wait

.协程中的并发

协程的并发,和线程一样。举个例子来说,就好像 一个人同时吃三个馒头,咬了第一个馒头一口,就得等这口咽下去,才能去啃第其他两个馒头。就这样交替换着吃。

asyncio实现并发,就需要多个协程来完成任务,每当有任务阻塞的时候就await,然后其他协程继续工作。

第一步,当然是创建多个协程的列表。

# 协程函数

async def do_some_work(x):

print('Waiting: ', x)

await asyncio.sleep(x)

return 'Done after {}s'.format(x)

# 协程对象

coroutine1 = do_some_work(1)

coroutine2 = do_some_work(2)

coroutine3 = do_some_work(4)

# 将协程转成task,并组成list

tasks = [

asyncio.ensure_future(coroutine1),

asyncio.ensure_future(coroutine2),

asyncio.ensure_future(coroutine3)

]

第二步,如何将这些协程注册到事件循环中呢。

有两种方法,至于这两种方法什么区别,稍后会介绍。

使用asyncio.wait()

loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.wait(tasks))

使用asyncio.gather()

# 千万注意,这里的 「*」 不能省略

loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.gather(*tasks))

最后,return的结果,可以用task.result()查看。

for task in tasks:

print('Task ret: ', task.result())

完整代码如下

import asyncio

# 协程函数

async def do_some_work(x):

print('Waiting: ', x)

await asyncio.sleep(x)

return 'Done after {}s'.format(x)

# 协程对象

coroutine1 = do_some_work(1)

coroutine2 = do_some_work(2)

coroutine3 = do_some_work(4)

# 将协程转成task,并组成list

tasks = [

asyncio.ensure_future(coroutine1),

asyncio.ensure_future(coroutine2),

asyncio.ensure_future(coroutine3)

]

loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.wait(tasks))

for task in tasks:

print('Task ret: ', task.result())

输出结果

Waiting: 1

Waiting: 2

Waiting: 4

Task ret: Done after 1s

Task ret: Done after 2s

Task ret: Done after 4s

.协程中的嵌套

使用async可以定义协程,协程用于耗时的io操作,我们也可以封装更多的io操作过程,这样就实现了嵌套的协程,即一个协程中await了另外一个协程,如此连接起来。

来看个例子。

import asyncio

# 用于内部的协程函数

async def do_some_work(x):

print('Waiting: ', x)

await asyncio.sleep(x)

return 'Done after {}s'.format(x)

# 外部的协程函数

async def main():

# 创建三个协程对象

coroutine1 = do_some_work(1)

coroutine2 = do_some_work(2)

coroutine3 = do_some_work(4)

# 将协程转为task,并组成list

tasks = [

asyncio.ensure_future(coroutine1),

asyncio.ensure_future(coroutine2),

asyncio.ensure_future(coroutine3)

]

# 【重点】:await 一个task列表(协程)

# dones:表示已经完成的任务

# pendings:表示未完成的任务

dones, pendings = await asyncio.wait(tasks)

for task in dones:

print('Task ret: ', task.result())

loop = asyncio.get_event_loop()

loop.run_until_complete(main())

如果这边,使用的是asyncio.gather(),是这么用的

# 注意这边返回结果,与await不一样

results = await asyncio.gather(*tasks)

for result in results:

print('Task ret: ', result)

输出还是一样的。

Waiting: 1

Waiting: 2

Waiting: 4

Task ret: Done after 1s

Task ret: Done after 2s

Task ret: Done after 4s

仔细查看,可以发现这个例子完全是由 上面「协程中的并发」例子改编而来。结果完全一样。只是把创建协程对象,转换task任务,封装成在一个协程函数里而已。外部的协程,嵌套了一个内部的协程。

其实你如果去看下asyncio.await()的源码的话,你会发现下面这种写法

loop.run_until_complete(asyncio.wait(tasks))

看似没有嵌套,实际上内部也是嵌套的。

这里也把源码,贴出来,有兴趣可以看下,没兴趣,可以直接跳过。

# 内部协程函数

async def _wait(fs, timeout, return_when, loop):

assert fs, 'Set of Futures is empty.'

waiter = loop.create_future()

timeout_handle = None

if timeout is not None:

timeout_handle = loop.call_later(timeout, _release_waiter, waiter)

counter = len(fs)

def _on_completion(f):

nonlocal counter

counter -= 1

if (counter <= 0 or

return_when == FIRST_COMPLETED or

return_when == FIRST_EXCEPTION and (not f.cancelled() and

f.exception() is not None)):

if timeout_handle is not None:

timeout_handle.cancel()

if not waiter.done():

waiter.set_result(None)

for f in fs:

f.add_done_callback(_on_completion)

try:

await waiter

finally:

if timeout_handle is not None:

timeout_handle.cancel()

done, pending = set(), set()

for f in fs:

f.remove_done_callback(_on_completion)

if f.done():

done.add(f)

else:

pending.add(f)

return done, pending

# 外部协程函数

async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):

if futures.isfuture(fs) or coroutines.iscoroutine(fs):

raise TypeError(f"expect a list of futures, not {type(fs).__name__}")

if not fs:

raise ValueError('Set of coroutines/Futures is empty.')

if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):

raise ValueError(f'Invalid return_when value: {return_when}')

if loop is None:

loop = events.get_event_loop()

fs = {ensure_future(f, loop=loop) for f in set(fs)}

# 【重点】:await一个内部协程

return await _wait(fs, timeout, return_when, loop)

.协程中的状态

还记得我们在讲生成器的时候,有提及过生成器的状态。同样,在协程这里,我们也了解一下协程(准确的说,应该是Future对象,或者Task任务)有哪些状态。

Pending:创建future,还未执行

Running:事件循环正在调用执行任务

Done:任务执行完毕

Cancelled:Task被取消后的状态

可手工 python3 xx.py 执行这段代码,

import asyncio

import threading

import time

async def hello():

print("Running in the loop...")

flag = 0

while flag < 1000:

with open("F:\\test.txt", "a") as f:

f.write("------")

flag += 1

print("Stop the loop")

if __name__ == '__main__':

coroutine = hello()

loop = asyncio.get_event_loop()

task = loop.create_task(coroutine)

# Pending:未执行状态

print(task)

try:

t1 = threading.Thread(target=loop.run_until_complete, args=(task,))

# t1.daemon = True

t1.start()

# Running:运行中状态

time.sleep(1)

print(task)

t1.join()

except KeyboardInterrupt as e:

# 取消任务

task.cancel()

# Cacelled:取消任务

print(task)

finally:

print(task)

顺利执行的话,将会打印 Pending -> Pending:Runing -> Finished 的状态变化

假如,执行后 立马按下 Ctrl+C,则会触发task取消,就会打印 Pending -> Cancelling -> Cancelling 的状态变化。

.gather与wait

还记得上面我说,把多个协程注册进一个事件循环中有两种方法吗?

使用asyncio.wait()

loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.wait(tasks))

使用asyncio.gather()

# 千万注意,这里的 「*」 不能省略

loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.gather(*tasks))

asyncio.gather 和 asyncio.wait 在asyncio中用得的比较广泛,这里有必要好好研究下这两货。

还是照例用例子来说明,先定义一个协程函数

import asyncio

async def factorial(name, number):

f = 1

for i in range(2, number+1):

print("Task %s: Compute factorial(%s)..." % (name, i))

await asyncio.sleep(1)

f *= i

print("Task %s: factorial(%s) = %s" % (name, number, f))

接收参数方式

asyncio.wait

接收的tasks,必须是一个list对象,这个list对象里,存放多个的task。

它可以这样,用asyncio.ensure_future转为task对象

tasks=[

asyncio.ensure_future(factorial("A", 2)),

asyncio.ensure_future(factorial("B", 3)),

asyncio.ensure_future(factorial("C", 4))

]

loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.wait(tasks))

也可以这样,不转为task对象。

loop = asyncio.get_event_loop()

tasks=[

factorial("A", 2),

factorial("B", 3),

factorial("C", 4)

]

loop.run_until_complete(asyncio.wait(tasks))

asyncio.gather

接收的就比较广泛了,他可以接收list对象,但是 * 不能省略

tasks=[

asyncio.ensure_future(factorial("A", 2)),

asyncio.ensure_future(factorial("B", 3)),

asyncio.ensure_future(factorial("C", 4))

]

loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.gather(*tasks))

还可以这样,和上面的 * 作用一致,这是因为asyncio.gather()的第一个参数是 *coros_or_futures,它叫 非命名键值可变长参数列表,可以集合所有没有命名的变量。

loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.gather(

factorial("A", 2),

factorial("B", 3),

factorial("C", 4),

))

甚至还可以这样

loop = asyncio.get_event_loop()

group1 = asyncio.gather(*[factorial("A" ,i) for i in range(1, 3)])

group2 = asyncio.gather(*[factorial("B", i) for i in range(1, 5)])

group3 = asyncio.gather(*[factorial("B", i) for i in range(1, 7)])

loop.run_until_complete(asyncio.gather(group1, group2, group3))

返回结果不同

asyncio.wait

asyncio.wait 返回dones和pendings

dones:表示已经完成的任务

pendings:表示未完成的任务

如果我们需要获取,运行结果,需要手工去收集获取。

dones, pendings = await asyncio.wait(tasks)

for task in dones:

print('Task ret: ', task.result())

asyncio.gather

asyncio.gather 它会把值直接返回给我们,不需要手工去收集。

results = await asyncio.gather(*tasks)

for result in results:

print('Task ret: ', result)

wait有控制功能

import asyncio

import random

async def coro(tag):

await asyncio.sleep(random.uniform(0.5, 5))

loop = asyncio.get_event_loop()

tasks = [coro(i) for i in range(1, 11)]

# 【控制运行任务数】:运行第一个任务就返回

# FIRST_COMPLETED :第一个任务完全返回

# FIRST_EXCEPTION:产生第一个异常返回

# ALL_COMPLETED:所有任务完成返回 (默认选项)

dones, pendings = loop.run_until_complete(

asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED))

print("第一次完成的任务数:", len(dones))

# 【控制时间】:运行一秒后,就返回

dones2, pendings2 = loop.run_until_complete(

asyncio.wait(pendings, timeout=1))

print("第二次完成的任务数:", len(dones2))

# 【默认】:所有任务完成后返回

dones3, pendings3 = loop.run_until_complete(asyncio.wait(pendings2))

print("第三次完成的任务数:", len(dones3))

loop.close()

输出结果

第一次完成的任务数: 1

第二次完成的任务数: 4

第三次完成的任务数: 5

python 异步io框架_Python并发编程之学习异步IO框架:asyncio 中篇(十)相关推荐

  1. python线程池模块_python并发编程之进程池,线程池,协程(Python标准模块--concurrent.futures(并发未来))...

    需要注意一下 不能无限的开进程,不能无限的开线程 最常用的就是开进程池,开线程池.其中回调函数非常重要 回调函数其实可以作为一种编程思想,谁好了谁就去掉 只要你用并发,就会有锁的问题,但是你不能一直去 ...

  2. python线程通信 消息传递_Python并发编程之线程消息通信机制/任务协调(四)

    大家好,并发编程进入第四篇. 本文目录 前言 Event事件 Condition Queue队列 总结 .前言 前面我已经向大家介绍了,如何使用创建线程,启动线程.相信大家都会有这样一个想法,线程无非 ...

  3. python互斥锁原理_python并发编程之多进程1------互斥锁与进程间的通信

    一.互斥锁 进程之间数据隔离,但是共享一套文件系统,因而可以通过文件来实现进程直接的通信,但问题是必须自己加锁处理. 注意:加锁的目的是为了保证多个进程修改同一块数据时,同一时间只能有一个修改,即串行 ...

  4. python并发处理同一个文件_python并发编程(并发与并行,同步和异步,阻塞与非阻塞)...

    最近在学python的网络编程,学会了socket通信,并利用socket实现了一个具有用户验证功能,可以上传下载文件.可以实现命令行功能,创建和删除文件夹,可以实现的断点续传等功能的FTP服务器.但 ...

  5. python线程池模块_python并发编程之进程池,线程池,协程

    需要注意一下 不能无限的开进程,不能无限的开线程 最常用的就是开进程池,开线程池.其中回调函数非常重要 回调函数其实可以作为一种编程思想,谁好了谁就去掉 只要你用并发,就会有锁的问题,但是你不能一直去 ...

  6. python并发处理list数据_python并发编程之多进程2--------数据共享及进程池和回调函数...

    一.数据共享 1.进程间的通信应该尽量避免共享数据的方式 2.进程间的数据是独立的,可以借助队列或管道实现通信,二者都是基于消息传递的. 虽然进程间数据独立,但可以用过Manager实现数据共享,事实 ...

  7. python创建新进程_Python并发编程(进程的创建)

    动态性:进程的实质是程序在多道程序系统中的一次执行过程,进程是动态产生,动态消亡的. 并发性:任何进程都可以同其他进程一起并发执行 独立性:进程是一个能独立运行的基本单位,同时也是系统分配资源和调度的 ...

  8. python线程池模块_Python并发编程之线程池/进程池--concurrent.futures模块

    一.关于concurrent.futures模块 Python标准库为我们提供了threading和multiprocessing模块编写相应的多线程/多进程代码,但是当项目达到一定的规模,频繁创建/ ...

  9. python 多线程 数据库死锁_python并发编程之多线程2死锁与递归锁,信号量等

    一.死锁现象与递归锁 进程也是有死锁的 所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用, 这些永远在互相等待的进程称为死锁进程 如下就是死锁 ...

最新文章

  1. C++拾取——使用stl标准库实现排序算法及评测
  2. 我又被学弟学妹倒挂了
  3. 几种流行的AJAX框架:jQuery,Mootools,Dojo,Ext JS的对比
  4. 《商业洞察力30讲》学习笔记(上)
  5. 【渝粤教育】国家开放大学2018年春季 8256-21T药物治疗学 参考试题
  6. C语言中static关键字的作用详解,全网最透彻
  7. ssm会员商城管理系统答辩PPT免费下载
  8. 【Oracle】数据迁移工具(2):Data Dump
  9. web高德地图怎么加载离线地图_春节变胖了?高德地图隐藏的实用跑步功能 想怎么跑都随你...
  10. codeforces Div.2(5.21)B题
  11. Office2010安装需要MSXML版本6.10.1129.0的方法
  12. Window下常见的权限维持方式
  13. 使用360安全卫士对计算机进行体检,360安全卫士使用教程 电脑体检
  14. Spyder5 启动报错 spyder-kernels
  15. K线形态识别—K线整理形态
  16. 如果你现在没有目标,或许很迷茫
  17. Android APP签名找回终极版
  18. 怎样查看ie浏览器的版本号
  19. Kubeadm搭建高可用K8S(四)Dashboard安装配置
  20. 简明解释算法中的大O符号

热门文章

  1. null值和空值的区别
  2. 推荐一款提高工作效率的屏幕扩展软件—Splashtop Wired XDisplay
  3. Fastadmin管理Mysql_Fastadmin笔记
  4. linux-ubuntu下使用linuxdeployqt+appimagetool将qt程序打包成xxx.AppImage文件
  5. 让你的微信小程序具有在线支付功能
  6. 开通小米公交卡服务器维护中是什么意思,小米公交卡应用指南,前2点必须注意,3分钟教会...
  7. 用流程图来描述一个App的启动功能
  8. CWeixin 类升级
  9. 航空航天大类C语言程设第三次练习赛
  10. 一次弄懂香浓极限的含义(几种信噪比含义探讨)