我们一直都相信这样一种说法:协程是比多线程更高效的一种并发工作方式,它完全由程序本身所控制,也就是在用户态执行,协程避免了像线程切换那样产生的上下文切换,在性能方面得到了很大的提升。毫无疑问,这是颠扑不破的业界共识,是放之四海而皆准的真理。

但事实上,协程远比大多数人想象中的复杂,正因为协程的“用户态”特性,任务调度权掌握在撰写协程任务的人手里,而仅仅依赖async和await关键字远远达不到“调度”的级别,有时候反而会拖累任务效率,使其在任务执行效率上还不及“系统态”的多线程和多进程,本次我们来探讨一下Python3原生协程任务的调度管理。

Python3.10协程库async.io的基本操作

事件循环(Eventloop)是 原生协程库asyncio 的核心,可以理解为总指挥。Eventloop实例提供了注册、取消和执行任务和回调的方法。

Eventloop可以将一些异步方法绑定到事件循环上,事件循环会循环执行这些方法,但是和多线程一样,同时只能执行一个方法,因为协程也是单线程执行。当执行到某个方法时,如果它遇到了阻塞,事件循环会暂停它的执行去执行其他的方法,与此同时为这个方法注册一个回调事件,当某个方法从阻塞中恢复,下次轮询到它的时候将会继续执行,亦或者,当没有轮询到它,它提前从阻塞中恢复,也可以通过回调事件进行切换,如此往复,这就是事件循环的简单逻辑。

而上面最核心的动作就是切换别的方法,怎么切换?用await关键字:

import asyncio  async def job1():  print('job1开始')  await asyncio.sleep(1)  print('job1结束')  async def job2():  print('job2开始')  async def main():  await job1()  await job2()  if __name__ == '__main__':  asyncio.run(main())

系统返回:

job1开始
job1结束
job2开始

是的,切则切了,可切的对吗?事实上这两个协程任务并没有达成“协作”,因为它们是同步执行的,所以并不是在方法内await了,就可以达成协程的工作方式,我们需要并发启动这两个协程任务:

import asyncio  async def job1():  print('job1开始')  await asyncio.sleep(1)  print('job1结束')  async def job2():  print('job2开始')  async def main():  #await job1()  #await job2()  await asyncio.gather(job1(), job2())  if __name__ == '__main__':  asyncio.run(main())

系统返回:

job1开始
job2开始
job1结束

如果没有asyncio.gather的参与,协程方法就是普通的同步方法,就算用async声明了异步也无济于事。而asyncio.gather的基础功能就是将协程任务并发执行,从而达成“协作”。

但事实上,Python3.10也支持“同步写法”的协程方法:

async def create_task():  task1 = asyncio.create_task(job1())  task2 = asyncio.create_task(job2())  await task1  await task2

这里我们通过asyncio.create_task对job1和job2进行封装,返回的对象再通过await进行调用,由此两个单独的异步方法就都被绑定到同一个Eventloop了,这样虽然写法上同步,但其实是异步执行:

import asyncio  async def job1():  print('job1开始')  await asyncio.sleep(1)  print('job1结束')  async def job2():  print('job2开始')  async def create_task():  task1 = asyncio.create_task(job1())  task2 = asyncio.create_task(job2())  await task1  await task2  async def main():  #await job1()  #await job2()  await asyncio.gather(job1(), job2())  if __name__ == '__main__':  asyncio.run(create_task())

系统返回:

job1开始
job2开始
job1结束

协程任务的上下游监控

解决了并发执行的问题,现在假设每个异步任务都会返回一个操作结果:

async def job1():  print('job1开始')  await asyncio.sleep(1)  print('job1结束')  return "job1任务结果"  async def job2():  print('job2开始')  return "job2任务结果"

通过asyncio.gather方法,我们可以收集到任务执行结果:

async def main():  res = await asyncio.gather(job1(), job2())  print(res)

并发执行任务:

import asyncio  async def job1():  print('job1开始')  await asyncio.sleep(1)  print('job1结束')  return "job1任务结果"  async def job2():  print('job2开始')  return "job2任务结果"  async def main():  res = await asyncio.gather(job1(), job2())  print(res)  if __name__ == '__main__':  asyncio.run(main())

系统返回:

job1开始
job2开始
job1结束
['job1', 'job2']

但任务结果仅仅也就是方法的返回值,除此之外,并没有其他有价值的信息,对协程任务的执行明细讳莫如深。

现在我们换成asyncio.wait方法:

async def main():  res = await asyncio.wait([job1(), job2()])  print(res)

依然并发执行:

import asyncio  async def job1():  print('job1开始')  await asyncio.sleep(1)  print('job1结束')  return "job1任务结果"  async def job2():  print('job2开始')  return "job2任务结果"  async def main():  res = await asyncio.wait([job1(), job2()])  print(res)  if __name__ == '__main__':  asyncio.run(main())

系统返回:

job1开始
job2开始
job1结束
({<Task finished name='Task-2' coro=<job1() done, defined at /Users/liuyue/Downloads/upload/test/test_async.py:4> result='job1任务结果'>, <Task finished name='Task-3' coro=<job2() done, defined at /Users/liuyue/Downloads/upload/test/test_async.py:12> result='job2任务结果'>}, set())

可以看出,asyncio.wait返回的是任务对象,里面存储了大部分的任务信息,包括执行状态。

在默认情况下,asyncio.wait会等待全部任务完成 (return_when=‘ALL_COMPLETED’),它还支持 return_when=‘FIRST_COMPLETED’(第一个协程完成就返回)和 return_when=‘FIRST_EXCEPTION’(出现第一个异常就返回)。

这就非常令人兴奋了,因为如果异步消费任务是发短信之类的需要统计达到率的任务,利用asyncio.wait特性,我们就可以第一时间记录任务完成或者异常的具体时间。

协程任务守护

假设由于某种原因,我们手动终止任务消费:

import asyncio  async def job1():  print('job1开始')  await asyncio.sleep(1)  print('job1结束')  return "job1任务结果"  async def job2():  print('job2开始')  return "job2任务结果"  async def main():  task1 = asyncio.create_task(job1())  task2 = asyncio.create_task(job2())  task1.cancel()  res = await asyncio.gather(task1, task2)  print(res)  if __name__ == '__main__':  asyncio.run(main())

系统报错:

File "/Users/liuyue/Downloads/upload/test/test_async.py", line 23, in main  res = await asyncio.gather(task1, task2)
asyncio.exceptions.CancelledError  

这里job1被手动取消,但会影响job2的执行,这违背了协程“互相提携”的特性。

事实上,asyncio.gather方法可以捕获协程任务的异常:

import asyncio  async def job1():  print('job1开始')  await asyncio.sleep(1)  print('job1结束')  return "job1任务结果"  async def job2():  print('job2开始')  return "job2任务结果"  async def main():  task1 = asyncio.create_task(job1())  task2 = asyncio.create_task(job2())  task1.cancel()  res = await asyncio.gather(task1, task2,return_exceptions=True)  print(res)  if __name__ == '__main__':  asyncio.run(main())

系统返回:

job2开始
[CancelledError(''), 'job2任务结果']

可以看到job1没有被执行,并且异常替代了任务结果作为返回值。

但如果协程任务启动之后,需要保证任务情况下都不会被取消,此时可以使用asyncio.shield方法守护协程任务:

import asyncio  async def job1():  print('job1开始')  await asyncio.sleep(1)  print('job1结束')  return "job1任务结果"  async def job2():  print('job2开始')  return "job2任务结果"  async def main():  task1 = asyncio.shield(job1())  task2 = asyncio.create_task(job2())  res = await asyncio.gather(task1, task2,return_exceptions=True)  task1.cancel()  print(res)  if __name__ == '__main__':  asyncio.run(main())

系统返回:

job1开始
job2开始
job1结束
['job1任务结果', 'job2任务结果']

协程任务回调

假设协程任务执行完毕之后,需要立刻进行回调操作,比如将任务结果推送到其他接口服务上:

import asyncio  async def job1():  print('job1开始')  await asyncio.sleep(1)  print('job1结束')  return "job1任务结果"  async def job2():  print('job2开始')  return "job2任务结果"  def callback(future):  print(f'回调任务: {future.result()}')  async def main():  task1 = asyncio.shield(job1())  task2 = asyncio.create_task(job2())  task1.add_done_callback(callback)  res = await asyncio.gather(task1, task2,return_exceptions=True)  print(res)  if __name__ == '__main__':  asyncio.run(main())

这里我们通过add_done_callback方法对job1指定了callback方法,当任务执行完以后,callback会被调用,系统返回:

job1开始
job2开始
job1结束
回调任务: job1任务结果
['job1任务结果', 'job2任务结果']

与此同时,add_done_callback方法不仅可以获取协程任务返回值,它自己也支持参数参数传递:

import asyncio
from functools import partial  async def job1():  print('job1开始')  await asyncio.sleep(1)  print('job1结束')  return "job1任务结果"  async def job2():  print('job2开始')  return "job2任务结果"  def callback(future,num):  print(f"回调参数{num}")  print(f'回调任务: {future.result()}')  async def main():  task1 = asyncio.shield(job1())  task2 = asyncio.create_task(job2())  task1.add_done_callback(partial(callback,num=1))  res = await asyncio.gather(task1, task2,return_exceptions=True)  print(res)  if __name__ == '__main__':  asyncio.run(main())

系统返回:

job1开始
job2开始
job1结束
回调参数1
回调任务: job1任务结果
['job1任务结果', 'job2任务结果']

结语

成也用户态,败也用户态。所谓水能载舟亦能覆舟,协程消费任务的调度远比多线程的系统级调度要复杂,稍不留神就会造成业务上的“同步”阻塞,弄巧成拙,适得其反。这也解释了为什么相似场景中多线程的出场率要远远高于协程,就是因为多线程不需要考虑启动后的“切换”问题,无为而为,简单粗暴。

运筹帷幄决胜千里,Python3.10原生协程asyncio工业级真实协程异步消费任务调度实践相关推荐

  1. Generator(生成器),入门初基,Coroutine(原生协程),登峰造极,Python3.10并发异步编程async底层实现

    普遍意义上讲,生成器是一种特殊的迭代器,它可以在执行过程中暂停并在恢复执行时保留它的状态.而协程,则可以让一个函数在执行过程中暂停并在恢复执行时保留它的状态,在Python3.10中,原生协程的实现手 ...

  2. 小议Python3的原生协程机制

    此文已由作者张耕源授权网易云社区发布. 欢迎访问网易云社区,了解更多网易技术产品运营经验. 在最近发布的 Python 3.5 版本中,官方正式引入了 async/await关键字.在 asyncio ...

  3. python 协程 asyncio_Python 原生协程------asyncio(选自公众号)

    所谓「异步 IO」,就是你发起一个 IO 操作,却不用等它结束,你可以继续做其他事情,当它结束时,你会得到通知. Asyncio 是并发(concurrency)的一种方式.对 Python 来说,并 ...

  4. python——asyncio模块实现协程、异步编程(一)

    我们都知道,现在的服务器开发对于IO调度的优先级控制权已经不再依靠系统,都希望采用协程的方式实现高效的并发任务,如js.lua等在异步协程方面都做的很强大. python在3.4版本也加入了协程的概念 ...

  5. python并发编程:协程asyncio、多线程threading、多进程multiprocessing

    python并发编程:协程.多线程.多进程 CPU密集型计算与IO密集型计算 多线程.多进程与协程的对比 多线程 创建多线程的方法 多线程实现的生产者-消费者爬虫 Lock解决线程安全问题 使用线程池 ...

  6. python asyncio教程_Python 协程模块 asyncio 使用指南

    Python 协程模块 asyncio 使用指南 前面我们通过5 分钟入门 Python 协程了解了什么是协程,协程的优点和缺点和如何在 Python 中实现一个协程.没有看过的同学建议去看看.这篇文 ...

  7. Python 协程 asyncio 极简入门与爬虫实战

    在了解了 Python 并发编程的多线程和多进程之后,我们来了解一下基于 asyncio 的异步IO编程--协程 01 协程简介 协程(Coroutine)又称微线程.纤程,协程不是进程或线程,其执行 ...

  8. python3之协程(3)---greenlet实现协程操作

    原文链接:https://www.cnblogs.com/xybaby/p/6337944.html 正文 在前面的文章中提到python原生的generator是semicoroutine,而gre ...

  9. 爬虫的单线程+多任务异步协程:asyncio 3.6

    单线程+多任务异步协程:asyncio 3.6 事件循环 无限循环的对象.事件循环中最终需要将一些 特殊的函数(被async关键字修饰的函数) 注册在该对象中. 协程 本质上是一个对象.可以把协程对象 ...

  10. python协程asyncio使用

    协程 协程 (corountine):又称微线程. asyncio,在单线程利用CPU和IO同时执行的原理,实现函数异步执行. 实现协程就是要多个任务的循环,await是挂起命令.每到一个地方awai ...

最新文章

  1. (转)如何成为顶级生物信息学家?
  2. 知识图谱入门知识(五)【转】秒懂词向量Word2Vec的本质
  3. 7723java梦游游戏,渠道SDK登录
  4. float相乘后的类型_4、Python语法入门之基本数据类型
  5. Word2Vec算法详解(CBOW和skip-gram算法详解)
  6. Web移动端常见问题-摘抄
  7. 【物理应用】基于matlab GUI功率谱估计【含Matlab源码 329期】
  8. 【插值】插值方法原理详解
  9. 51单片机AD模块PCF8591 1路AD采样+数码管显示+Proteus仿真
  10. 如何手动启动消防广播_消防应急广播应如何规范设置
  11. 【修真院java小课堂】代码生成
  12. 杭州电子科技大学研究生计算机专业目录,杭州电子科技大学2017年硕士计算机学院招生目录.pdf...
  13. 如何快速开发一个响应式移动端页面
  14. Tkinter 组件详解(一):Label
  15. 想要出国读博作博后的看过来:德国马普育种所植物与微生物互作方向招收3名博士1名博后
  16. mysql连接oracle视图_oracle数据库视图
  17. 带分数——排列和组合
  18. Java 实现阿里云直播
  19. VMware 11 详细安装步骤(图解)
  20. 阿里平头哥无剑100SOCwujian100挂UART外设之①将无剑100下载到gensys开发板

热门文章

  1. F. Fitness Baker
  2. s3cmd配置bucket生命周期
  3. 缺陷管理工具--mantis使用过程
  4. 【WordPress】添加备案信息
  5. ubuntu系统调节电脑亮度
  6. matlab legend颜色不变,关于MATLAB画图中legend标注曲线颜色不匹配问题
  7. 渲染用计算机功耗,【IT之家评测室】满功耗 RTX 3060 笔记本 GPU 表现如何?拯救者 R9000P 实测...
  8. 关于Windows 2003 安装Inter G33/G31 显卡问题
  9. 拍森python百度百科_python对拍_python 拍牌_拍森python - 云+社区 - 腾讯云
  10. Android数据加密之——Base64编码算法