【前言】

  • 最近一段时间在学习使用Python进行异步编程,与C++、Java等编译型语言类型类似,Python也提供了一些可用于异步编程的内建模块。例如,支持多线程处理的threading模块、支持多进程处理的multiprocessing 模块、以及为异步执行(包括多线程和多进程)提供高级接口的concurrent.futures 模块等。由于学习过程中涉及到的知识点较多,因此有了写这篇文章的想法。一来可重新整理一下学习过程中的疑惑和学习心得,二来做个记录,方便以后查阅需要。由于Python存在全局解释器锁(GIL),因此,若处理的任务属于CPU密集型,则应使用多进程;若处理的任务属于I/O密集型,则应使用多线程。本文着重介绍使用Python多线程进行异步编程,以后有时间的话,会另写一篇使用Python多进程进行异步编程的文章。
  • 本篇文章也假设读者已经明白“阻塞”、“异步”、“锁”等术语背后的基本原理,本篇文章的所有示例均基于threading 模块。
  • 本篇文章的解释内容包括:创建线程

    线程锁

    关闭/中断线程

    基于线程实现取消服务

【创建线程】

  • Python中可通过调用threading.Thread直接创建线程,如下所示,调用threading.Thread通常需要传入两个参数:target为“线程函数”;args为传递给“线程函数”的参数,必须为tuple类型。
import threading
def worker(name):print("thread %s is running" % name)
worker = threading.Thread(target=worker, args=('worker',))
worker.start()
worker.join()

  • 也可通过继承threading.Thread类创建线程,如下所示。其中super(MyThread, self).__init__()表示对继承自父类(threading.Thread)的属性进行初始化,并用父类的初始化方法来初始化继承的属性。此外,继承自threading.Thread类的子类必须具有run()方法。线程启动时,自动调用每个线程实例的run()方法。
import threading
import timeclass MyThread(threading.Thread):def __init__(self, delay):super(MyThread, self).__init__()self.delay = delaydef run(self):while True:print("sleep %ds" % self.delay)time.sleep(self.delay)delays = [2, 4, 6]
threads = []
for delay in delays:threads.append(MyThread(delay))
for t in threads:t.start()
for t in threads:t.join()

运行结果如下所示

【线程锁】Python3 多线程 | 菜鸟教程

如果多个线程同时对某个资源进行“写操作”,则可能会出现不可预料的结果。考虑这样一种情况:一个列表里所有元素都是0,线程"set"从后向前把所有元素改成1,而线程"print"负责从前往后读取列表并打印。那么,可能线程"set"开始改的时候,线程"print"便来打印列表了,输出就成了一半0一半1,这就是数据的不同步。因此,为了避免这种情况即为了保证资源的正确性,需要引入锁的概念。锁有两种状态——锁定和未锁定。每当一个线程比如"set"要访问共享数据时,必须先获得锁定;如果已经有别的线程比如"print"获得锁定了,那么就让线程"set"暂停,也就是同步阻塞;等到线程"print"访问完毕,释放锁以后,再让线程"set"继续。

使用 Thread 对象的 Lock 和 Rlock 可以实现简单的线程同步,这两个对象都有 acquire 方法和 release 方法,对于那些每次只允许一个线程操作的资源,可以将对其进行的操作放到 acquire 和 release 方法之间,如下所示。

import threading
import timeclass myThread (threading.Thread):def __init__(self, threadID, name, counter):#super(myThread, self).__init__()threading.Thread.__init__(self)self.threadID = threadIDself.name = nameself.counter = counterdef run(self):print ("开启线程: " + self.name)# 获取锁,用于线程同步threadLock.acquire()print_time(self.name, self.counter, 3)# 释放锁,开启下一个线程threadLock.release()def print_time(threadName, delay, counter):while counter:time.sleep(delay)print ("%s: %s" % (threadName, time.ctime(time.time())))counter -= 1threadLock = threading.Lock()
threads = []# 创建新线程
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)# 开启新线程
thread1.start()
thread2.start()# 添加线程到线程列表
threads.append(thread1)
threads.append(thread2)# 等待所有线程完成
for t in threads:t.join()
print ("退出主线程")

运行结果如下所示

当然,Python 的 Queue 模块中提供了同步的线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列 PriorityQueue。这些队列都实现了锁原语,能够在多线程中直接使用,可以使用队列来实现线程间的同步。此处,不再赘述。

【关闭/中断线程】

  • 借助ctypes 第三方模块采用抛出异常的方式强制中断线程,但是如果线程之间涉及到“锁”,那么采用这种方式强制关闭线程有可能会导致“不可预期的结果”。
def _async_raise(tid, exctype):"""Raises an exception in the threads with id tid"""if not inspect.isclass(exctype):raise TypeError("Only types can be raised (not instances)")res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), ctypes.py_object(exctype))if res == 0:raise ValueError("invalid thread id")elif res != 1:# """if it returns a number greater than one, you're in trouble,# and you should call it again with exc=NULL to revert the effect"""ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)raise SystemError("PyThreadState_SetAsyncExc failed")def stop_thread(thread):_async_raise(thread.ident, SystemExit)

  • 更合理的方式是使用threading.Event()创建一个事件管理标记flag,然后线程在运行过程中以极短的时间间隔访问flag的状态值,一旦flag的状态值满足“关闭线程”,立刻执行关闭线程操作。
class MyThread(threading.Thread):"""Thread class with a stop() method. The thread itself has to checkregularly for the stopped() condition."""def __init__(self, *args, **kwargs):super(MyThread, self).__init__(*args, **kwargs)self._stop_event = threading.Event()def stop(self):self._stop_event.set()def stopped(self):return self._stop_event.is_set()def run(self):while True:#做任务print("sleep 1s")time.sleep(1)if self.stopped():# 做一些必要的收尾工作break

【基于线程实现取消服务】

考虑这样一个场景:调度器A和执行器B之间通过web通信,调度器A给执行器B传入“执行”参数,执行器B基于单线程或多线程开始执行任务;之后,调度器A给执行器B传入了“取消”参数,执行器B取消当前正在执行的任务。那如何实现“取消”服务呢?

解决思路:当调度器A给执行器B传递参数后,执行器B端首先判断参数为“执行”操作还是“取消”操作。若为“执行”操作,执行器B端新建一个“线程池”并开始执行任务;若为“取消”操作,执行器B端将“线程池”中的线程资源利用“threading.Event()”的方式合理关闭,并清理或重新初始化“线程池”。需要注意的是,执行器B端的“线程池”对象需要为全局变量。Demo代码如下所示。

模拟调度器A端代码

import requests#json_data = {"cancel": "yes"}
json_data = {"nums": 2, "cancel": "no"}r = requests.post("http://127.0.0.1:5000/access_api", json=json_data)
print(r.text, type(r.text))

模拟执行器B端封装web API代码

import json
from flask import Flask, request
from test_API.test_process import MyThreadapp = Flask(__name__)
threads = []@app.route('/')
def hello_world():return 'hello world'@app.route('/access_api', methods=['POST'])
def access_api():#print(request.headers)global threads#声明变量类型为全局变量state = Nonetry:print(threads)cancel = request.json['cancel']if cancel == "no":print("for testing of performing")nums = request.json['nums']for i in range(nums):threads.append(MyThread(i+1))for t in threads:t.start()elif cancel == "yes":print('for testing of canceling')for t in threads:t.stop()#MyThread.stop()threads = []state = 'Successful'except:state = 'Failed'finally:return stateif __name__ == '__main__':app.run(host='0.0.0.0', port=5000, debug=True)#本地访问ip为127.0.0.1#局域网其它电脑访问:提供本电脑的ip地址

模拟执行器B端执行逻辑代码

import json
import time
import threadingclass MyThread(threading.Thread):"""Thread class with a stop() method. The thread itself has to checkregularly for the stopped() condition."""def __init__(self, delay):super(MyThread, self).__init__()self._stop_event = threading.Event()self.delay = delaydef stop(self):self._stop_event.set()def stopped(self):return self._stop_event.is_set()def run(self):print("begin run the child thread")while True:print("sleep %ds" % self.delay)time.sleep(self.delay)if self.stopped():# 做一些必要的收尾工作break

当cancel值为no时,测试结果如下所示

重新传入cancel值为yes时,测试结果如下

再重新传入cancel值为no时,测试结果如下

  • 小结1:基于以上测试结果,demo代码可实现了“取消”服务,而且“取消”方式也不是采用抛出异常的方式强制关闭线程,而是采用设置线程状态标记值的方式合理关闭。但请注意,采用设置线程状态标记值的方式关闭线程,只能在当前正在运行的这次循环结束之后起作用。也就是说,在执行下次循环中的print时,线程检测到标记值已被重置,因此退出循环。这种行为的典型应用场景是服务器监听客户端的访问这种类型!
  • 小结2:当然,如果线程之间不涉及到“锁”,采用抛出异常的方式也可!该种方式适用于单次且耗时任务的执行!比如,从mysql数据库迁移数据至oracle数据库。整个过程涉及到数据库读、数据库写,比较耗时。此时,若想立刻结束该服务,应采用抛出异常的方式立即中断线程。但应做好数据库的回滚等后续处理,尽量降低“不可预期行为”发生的可能性。
  • 小结3:对“线程池”资源threads粗暴地进行重新赋值的方式并不可取,应该对“线程池”资源进行合理释放并重新初始化

就写这么多吧,欢迎讨论指正!!!

async python两个_Python多线程一览相关推荐

  1. python线程唤醒_python 多线程

    python 多线程 真正的多线程吗? 对于多核处理器,在同一时间确实可以多个线程独立运行,但在Python中确不是这样的了.原因在于,python虚拟机中引入了GIL这一概念.GIL(Global ...

  2. async python两个_【Python】python中实现多进程与多线程

    进程与线程 进程(process)就是任务,是计算机系统进行资源分配和调度的基本单位[1].比如,打开一个word文件就是启动了一个word进程. 线程(thread)是进程内的子任务.比如word中 ...

  3. 多线程python实现方式_python多线程的两种实现方式(代码教程)

    本篇文章给大家带来的内容是关于python多线程的两种实现方式(代码教程),有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助. 线程是轻量级的进程,进程中可划分出多个线程,线程可独立的调度 ...

  4. python线程创建对象_python 多线程(一)

    (一) 线程概述 几乎所有的操作系统都支持同时运行多个任务,一个任务通常就是一个程序,每个运行中的程序就是一个进程.当一个程序运行时,内部可能包含多个顺序执流,每个顺序执行流就是一个线程. 进程和线程 ...

  5. python技术简介_Python多线程技术简介,简单,阐述,python

    python多线程 python中创建多线程方法有两种,这里只介绍简单的一种: from threading import Thread #导入模块 import time def test(thre ...

  6. python 上市公司 概念股_python多线程和多进程获取所有上市公司的实时数据

    前一天简单介绍了python怎样获取历史数据和实时分笔数据,那么如果要获取所有上市公司的实时分笔数据,应该怎么做呢? 肯定有人想的是,用一个列表存储所有上市公司的股票代号,然后无限循环获取不就得了吗? ...

  7. python 两个[]_Python中的两个测试工具

    ♚ 作者:jclian,喜欢算法,热爱分享,希望能结交更多志同道合的朋友,一起在学习Python的道路上走得更远! 当我们在写程序的时候,我们需要通过测试来验证程序是否出错或者存在问题,但是,编写大量 ...

  8. python线程创建对象_Python多线程编程基础:如何创建线程?

    Python标准库threading中的Thread类用来创建和管理线程对象,支持使用两种方法来创建线程: 1)直接使用Thread类实例化一个线程对象并传递一个可调用对象作为参数: 2)继承Thre ...

  9. python线程数组_Python多线程

    多线程技术需.要用到threading模块,应当避免使用thread模块,原因是它不支持守护线程.当主线程退出时,所有的子线程不管他们是都还在工作,都会被强制退出.有时候我们并不希望发生这种行为 ,这 ...

  10. python线程监控_Python多线程的事件监控

    设想这样一个场景: 你创建了10个子线程,每个子线程分别爬一个网站,一开始所有子线程都是阻塞等待.一旦某个事件发生:例如有人在网页上点了一个按钮,或者某人在命令行输入了一个命令,10个爬虫同时开始工作 ...

最新文章

  1. 中国电子学会青少年编程能力等级测试图形化一级编程题:小狗进圈
  2. 【Java基本功】一文读懂String及其包装类的实现原理
  3. 使用OpenSSL进行RSA加密和解密(非对称)
  4. linux nfs 修复文件,linux nfs Read-only file system
  5. react学习(68)--ant design inputNumber
  6. 新驾考科目三有四个地方易犯错 多名教练提供对策
  7. phpcms上传php,phpcms如何上传视频
  8. 谈谈一些有趣的CSS题目(十二)-- 你该知道的字体 font-family
  9. java 删除文件路径下的指定文件
  10. 解决ASP.NET 安装完成报错500
  11. 使用树莓派打造家庭监控系统
  12. C语言零基础——简单门票费程序
  13. 安卓手机格式化怎么弄_一加6/7/7Pro怎么从氢OS安卓10降级安卓9系统-完美降级教程...
  14. linux 配制aria2
  15. 在excel的单元格中设置下拉菜单
  16. PPTV多屏互动服务器可以看文档吗,关闭或删除PPTv多屏互动服务器的方法
  17. Android——浙理体育(飞翔的红蜻蜓)反编译分析
  18. 如何使用万能的钢笔抠图
  19. 数据结构与算法分析(三)数组练习代码
  20. 基于bp的神经网络算法,bp神经网络是什么算法

热门文章

  1. C# Maximum request length exceeded. 产生错误的原因,以及解决方法.
  2. Oracle JOB 用法小结
  3. ASP.NET 参数传递,长度限制,及使用注意事项。
  4. 深入浅出Hadoop: 高效处理大数据
  5. java、js的编码、解码
  6. display:table-cell 在页面中的应用
  7. 使用工厂方法模式实现多数据库WinForm手机号码查询器(附源码)
  8. 关于oracle数据库高版本向低版本迁移的解决方法
  9. 在向服务器发送请求时发生传输级错误。
  10. Hibernate 简单的CURD操作