关于python3中的并发编程,经过这些天的学习,归纳如下:

#practice21:多线程

  1. 线程的定义

  • 方法一:直接Thread()构造
  • 方法二:构造Thread的子类
#多线程的使用
from  urllib.request import urlretrieve
import csv
from xml.etree.ElementTree import ElementTree,Element
from threading import Threaddef download(sid,filename):'''下载csv文件'''url = 'http://quotes.money.163.com/service/chddata.html?code=%s&start=20150104&end=20160108' % str(sid)response = urlretrieve(url,filename)def convert(filename):'''csv文件转换为xml文件'''with open(filename,'rt',encoding='GB2312')as rf:if rf:reader = csv.reader(rf)header = next(reader)root = Element('data')for row in reader:line = Element('row')root.append(line)for key,value in zip(header,row):e = Element(key)e.text = valueline.append(e)et = ElementTree(root)et.write('%s.xml' % filename,encoding='utf-8')def handle(sid):print("downloading %s :" % str(sid))download(sid,'demo%s.csv' % str(sid))print("converting %s :" % str(sid))convert('demo%s.csv' % str(sid))#方法一
threads = []
for i in range(1000001,1000010):#注意,args不能是(i),因为必须是元组t = Thread(target=handle,args=(i,))threads.append(t)t.start()#线程等待
for t in threads:t.join()print("main thread")#方法二
class Mythread(Thread):def __init__(self,sid):Thread.__init__(self)self.sid = siddef run(self):handle(self.sid)print('*'*20)
threads = []
for i in range(1000001,1000010):t = Mythread(i)threads.append(t)t.start()#线程等待
for t in threads:t.join()print("main thread")

执行结果:

downloading 1000001 :
downloading 1000002 :
downloading 1000003 :
downloading 1000004 :
downloading 1000005 :
downloading 1000006 :
downloading 1000007 :
downloading 1000008 :
downloading 1000009 :
converting 1000003 :
converting 1000006 :
converting 1000004 :
converting 1000009 :
converting 1000001 :
converting 1000005 :
converting 1000008 :
converting 1000002 :
converting 1000007 :
main thread
********************
downloading 1000001 :
downloading 1000002 :
downloading 1000003 :
downloading 1000004 :
downloading 1000005 :
downloading 1000006 :
downloading 1000007 :
downloading 1000008 :
downloading 1000009 :
converting 1000003 :
converting 1000002 :
converting 1000005 :
converting 1000004 :
converting 1000001 :
converting 1000009 :
converting 1000008 :
converting 1000006 :
converting 1000007 :
main thread
[Finished in 0.9s]

#practice22:线程间通信

  • .Queue,该队列是线程安全的;
  • 一个进程内的多个线程共用地址空间,这是线程间通信的基本依据;
  • 本例采用生产者/消费者模型,有多个生产者和一个消费者,每个生产者占用一个线程
  • 消费者只有一个,故必须使用循环来处理生产者生产的数据
from  urllib.request import urlretrieve
import csv
from xml.etree.ElementTree import ElementTree,Element
from threading import Thread
from queue import Queueclass DownloadThread(Thread):'''下载线程'''def __init__(self,sid,queue):Thread.__init__(self)self.sid = sidself.filename = 'demo{}'.format(str(sid))self.queue = queuedef download(self,sid,filename):'''下载csv文件'''url = 'http://quotes.money.163.com/service/chddata.html?code=%s&start=20150104&end=20160108' % str(sid)response = urlretrieve(url,filename)def run(self):print("downloading %s :" % str(self.sid))self.download(self.sid,self.filename)self.queue.put(self.filename)class ConvertThread(Thread):'''转换现场'''def __init__(self,queue):Thread.__init__(self)self.queue = queuedef convert(self,filename):'''csv文件转换为xml文件'''with open(filename,'rt',encoding='GB2312')as rf:if rf:reader = csv.reader(rf)header = next(reader)root = Element('data')for row in reader:line = Element('row')root.append(line)for key,value in zip(header,row):e = Element(key)e.text = valueline.append(e)et = ElementTree(root)et.write('%s.xml' % filename,encoding='utf-8')def run(self):while True:filename = self.queue.get()if filename == False:breakprint("converting %s :" % str(filename))self.convert(filename)if __name__ == '__main__':#线程使用队列通信q = Queue()threads = []#创建并开启全部线程,包括9个下载线程和一个转换线程for i in range(1000001,1000010):t = DownloadThread(i,q)threads.append(t)t.start()ct = ConvertThread(q)ct.start()#等待下载线程完毕,通知转换线程结束for i in threads:i.join()q.put(False)

程序优化:(三处)
1. StringIO的使用替代文件
2. sid的构造
3. 列表推导式构造线程列表

from  urllib.request import urlretrieve
import csv
from xml.etree.ElementTree import ElementTree,Element
from threading import Thread
from queue import Queue
from io import StringIO
import requestsclass DownloadThread(Thread):'''下载线程'''def __init__(self,sid,queue):Thread.__init__(self)self.sid = sidself.filename = 'demo{}'.format(str(sid))self.queue = queuedef download(self,sid):'''下载csv文件'''#变化3:使用rjust来调整字符串,使得sid只输入一到两位即可url = 'http://quotes.money.163.com/service/chddata.html?code=1%s&start=20150104&end=20160108' % str(sid).rjust(6,'0')response = requests.get(url)#变化1:用类文件对象(内存对象)来存储csv字符串数据,而非文件self.data = StringIO(response.text)def run(self):print("downloading %s :" % str(self.sid))self.download(self.sid)self.queue.put((self.sid,self.data))class ConvertThread(Thread):'''转换现场'''def __init__(self,queue):Thread.__init__(self)self.queue = queuedef convert(self,sid,data):'''csv文件转换为xml文件'''#变化1:csv模块可直接使用stringio对象来获取readerif data:reader = csv.reader(data)header = next(reader)root = Element('data')for row in reader:line = Element('row')root.append(line)for key,value in zip(header,row):e = Element(key)e.text = valueline.append(e)et = ElementTree(root)et.write('1%s.xml' % str(sid).rjust(6,'0'),encoding='utf-8')def run(self):while True:sid,data = self.queue.get()if data == False:breakprint("converting %s :" % str(sid))self.convert(sid,data)if __name__ == '__main__':q = Queue()#变化2:使用列表推导式代替for循环,简化代码threads = [DownloadThread(i,q) for i in range(1,10)]for thread in threads:thread.start()ct = ConvertThread(q)ct.start()for i in threads:i.join()q.put((100,False))

感觉还不是很熟练,来个实例:程序设计要求如下:

1、调用OTCBTC的API,获取所有买家、卖家出价数据

2、涉及的币种有:EOS、ETH、BCH、NEO

3、将获取到的json数据转换成xml格式并保存

4、要求使用多线程

from threading import Thread
import requests
from xml.etree.ElementTree import ElementTree,Element
from queue import Queueclass DownloadThread(Thread):'''下载当前某种货币的卖单与买单'''def __init__(self,coin_id,queue):Thread.__init__(self)self.coin_id = coin_idself.queue = queueself.url = "https://bb.otcbtc.com/api/v2/depth?market=%s&limit=1000"self.url %= coin_iddef download(self,url):'''下载json数据,存储为data'''response = requests.get(url)return response.json()def run(self):print('downloading %s' % self.coin_id)data = self.download(self.url)self.queue.put((self.coin_id,data))class ConvertThread(Thread):'''把请求响应转化为xml文件'''def __init__(self,queue):Thread.__init__(self)self.queue = queuedef setchildtree(self,superelement_tag,spec_dict):'''构建asks tree或者bids tree. superelement_tag是子树的根节点名,spec_dict是整个json字符串转换后的python字典'''e =Element(superelement_tag)for list_item in spec_dict[superelement_tag]:e1 = Element('item')e.append(e1)e2_price = Element('price')e2_price.text = list_item[0]e1.append(e2_price)e2_volumn = Element('volumn')e2_volumn.text = list_item[1]e1.append(e2_volumn)return edef convert(self,coin_id,spec_dict):'''将请求响应body的字典转换为xml文件'''root = Element('data')e_timestamp = Element('timestamp')#必须在xml中把数字变成字符串!否则报错:TypeError: cannot serialize 1530197213 (type int),序列化错误!e_timestamp.text = str(spec_dict['timestamp'])root.append(e_timestamp)asks_childtree = self.setchildtree('asks',spec_dict)root.append(asks_childtree)bids_childtree = self.setchildtree('bids',spec_dict)root.append(bids_childtree)et = ElementTree(root)et.write('%s.xml' % coin_id)def run(self):while True:#获取队列中已经下载好的数据coin_id,data = self.queue.get()#判断队列是否已经收到哨符!if data == False:breakprint('converting %s' % coin_id)self.convert(coin_id,data)if __name__ == '__main__':queue = Queue()markets = ['eosbtc','ethbtc','bchbtc','neobtc']threads = [DownloadThread(market,queue) for market in markets]for thread in threads:thread.start()ct = ConvertThread(queue)ct.start()#等待所有下载线程完毕for thread in threads:thread.join()#添加终止convert线程的哨符queue.put(('xxx',False))

#practice23:线程间事件通知

1、 Event的使用
+ Event.wait与Event.set

from threading import Event,Thread
def f(e):print('hello')e.wait()print('world')e = Event()
t = Thread(target=f,args=(e,))
t.start()

可以看出,e.wait方法相当于阻塞函数,阻塞程序继续执行,直到等到触发信号e.set()

从运行框可以看出,程序并未执行完。

from threading import Event,Thread
def f(e):print('hello')e.wait()print('world')e = Event()
t = Thread(target=f,args=(e,))
t.start()
e.set()

由于e.set(),线程被触发继续执行,程序最后运行完退出。

  • Event.clear()

Event对象调用一对wait/set方法后就不能再次调用这对方法了,若想再次调用,必须先对Event对象调用clear方法!

from threading import Event,Thread
def f(e):while True:print('hello')e.wait()print('world')e = Event()
t = Thread(target=f,args=(e,))
t.start()
e.set()

由于e.set()使得线程内的阻塞函数e.wait()失效,故循环无限往复

from threading import Event,Thread
import time
def f(e):while True:print('hello')e.wait()e.clear()print('world')e = Event()
t = Thread(target=f,args=(e,))
t.start()
e.set()
time.sleep(1)
print('*'*40)
e.set()

主线程与子线程共同维护Event对象e

e.start()启动子线程,对应输出hello,然后开始阻塞;主线程e.set()结束子线程的阻塞,e.clear()使得e.start()可以重新生效,输出world与hello,然循环再次被e.wait()阻塞;

等待一秒,e.set()使得阻塞再次被解除!

2、 实例:

要求:
+ 多线程下载股票csv数据(生产者)
+ 单线程转换为xml文件(消费者)
+ 单线程打包xml文件(每当生成3个xml文件便打包为一个tar.gz包)

import csv
from xml.etree.ElementTree import ElementTree,Element
from threading import Thread,Event
from queue import Queue
from io import StringIO
import requests
import os
import tarfileclass DownloadThread(Thread):'''下载线程'''def __init__(self,sid,queue):Thread.__init__(self)self.sid = sidself.filename = 'demo{}'.format(str(sid))self.queue = queuedef download(self,sid):'''下载csv文件'''url = 'http://quotes.money.163.com/service/chddata.html?code=1%s&start=20150104&end=20160108' % str(sid).rjust(6,'0')response = requests.get(url)self.data = StringIO(response.text)def run(self):print("downloading %s :" % str(self.sid))self.download(self.sid)self.queue.put((self.sid,self.data))class ConvertThread(Thread):'''转换线程'''def __init__(self,queue,cevent,tevent):'''转换线程与打包线程共同维护两个事件:转换事件cevent与打包事件tevent'''Thread.__init__(self)self.queue = queueself.cevent = ceventself.tevent = tevent#生成xml文件的计数器self.count = 0def convert(self,sid,data):'''csv文件转换为xml文件'''if data:reader = csv.reader(data)header = next(reader)root = Element('data')for row in reader:line = Element('row')root.append(line)for key,value in zip(header,row):e = Element(key)e.text = valueline.append(e)et = ElementTree(root)et.write('1%s.xml' % str(sid).rjust(6,'0'),encoding='utf-8')def run(self):while True:sid,data = self.queue.get()if data == False:#当终止哨符发出后,可能最后的xml文件不足3个,但也要打包#必须先设置终止信后,后开始打包global tarstoptarstop = Trueself.tevent.set()breakprint("converting %s :" % str(sid))self.convert(sid,data)#每转换一个xml文件,计数器加1self.count += 1if self.count == 3:self.count = 0#通知打包线程开始打包self.tevent.set()#停止转换,要想循环使用事件,clear()需要紧跟wait()#注意:必须先通知打包线程,再停止转换,反过来不行self.cevent.wait()self.cevent.clear()class TarThread(Thread):'''打包线程'''def __init__(self,cevent,tevent):'''转换线程与打包线程共同维护两个事件:转换事件cevent与打包事件tevent'''Thread.__init__(self)#tar包名称的初始idself.count = 0self.cevent = ceventself.tevent = tevent#任何一个循环执行的线程必须要有出口,设置为守护线程,主线程结束后,该线程自动退出,可能未完成打包任务!经测试不可行!# self.setDaemon(True)def tar(self):'''寻找当前文件夹下xml文件,生成打包文件,同时将源文件删除!'''self.count += 1filename = '%s.tar.gz' % str(self.count)with tarfile.open(filename,'w:gz') as tar:for file in os.listdir('.'):#注意函数名字:endswith不是endwithif file.endswith('.xml'):tar.add(file)os.remove(file)#如果当前文件夹下没有xml文件,但执行上一步任然会生成tar包,需要把空的tar包删除if not tar.members:os.remove(filename)def run(self):global tarstopwhile not tarstop and True:#阻塞等待打包命令,一旦阻塞被解除,执行完动作后应当立即调用clear(),使得下一次调用wait方法有效self.tevent.wait()self.tar()self.tevent.clear()#一旦打包完成,应当立即通知转换线程继续转换self.cevent.set()if __name__ == '__main__':#定义线程安全队列,用于下载与转换线程间通信dcqueue = Queue()tarstop = False#定义转换事件与打包事件cevent,tevent = Event(),Event()#定义下载、转换、打包线程threads = [DownloadThread(i,dcqueue) for i in range(1,11)]ct = ConvertThread(dcqueue,cevent,tevent)tt = TarThread(cevent,tevent)#开启所有线程for thread in threads:thread.start()ct.start()tt.start()#等待下载线程执行完毕,发出转换线程的终止哨符for i in threads:i.join()dcqueue.put((100,False))

不足之处:tar线程 最终的退出方式使用了全局变量,不太优雅;守护线程感觉又不满足条件


#practice24:线程池

concurrent.futures 函数库有一个 ThreadPoolExecutor 类,可以构建多线程(异步执行多个调用)。

1、 多线程的使用方法

from concurrent.futures import ThreadPoolExecutordef handle(a,b):print('hello world',str(a*b))return a*b
#构建多线程对象:executor
executor = ThreadPoolExecutor(max_workers=3)
#调用submit方法,提交任务给线程池,默认一次submit使用一个线程
#线程执行结果由Future对象保存
future = executor.submit(handle,3,4)
#调用result方法提取结果,如果线程未结束,则阻塞起来,直到有结果
result = future.result()
print(result)#除了submit,还有更高效的提交任务方法map,返回迭代器,每次迭代返回函数的执行结果,不是future对象
#使用3个线程,依次执行handle(1,1) handle(2,2) handle(3,3)
for result in executor.map(handle,[1,2,3],[1,2,3]):print(result)

2、实例

要求:
1. 构建echo TCP服务器,响应客户端的请求,即直接返回客户端发来的数据。
2. TCP服务器开启10个线程异步处理客户端请求。
3. 构建echo客户端,发送请求验证多线程。

  • 服务端
import socket
from concurrent.futures import ThreadPoolExecutorHOST = 'localhost'
PORT = 12345def handle_request(conn):with conn as subsock:while True:data = subsock.recv(1024)if not data:breaksubsock.sendall(data)def server(address):pool = ThreadPoolExecutor(10)ip,port = addresswith socket.socket() as s:s.bind(address)s.listen(5)while True:conn,address = s.accept()print('Client ' + ip + ":" + str(port) + ' connected')pool.submit(handle_request,conn)server(('',12345))
  • 客户端
import socketdef run_sockets(addr):with socket.socket() as s:s.connect(addr)s.sendall(b'hello world')data = s.recv(1024)print(data)for i in range(7):run_sockets(('localhost',12345))

【运行结果】

  • 客户端

  • 服务端

先运行服务端代码,作为服务器是无限循环,等待请求

sendall,recv,accept无数据时都会阻塞

必须先运行服务器代码,再运行多个客户端!


#practice25:多进程

1、 多进程的定义

  • 创建子进程
from multiprocessing import Processdef f(a,b):print(a*b)p = Process(target=f,args=(1,5))
p.start()
print('main process')
p.join()
print('main1 process')

  • 与线程的区别

    虽然进程与线程的很多方法相似,但最大的不同是,进程之间占用不同的地址空间。所以不能使用之前线程共用全局变量的方法进通信!

2、 进程间通信

  • 使用multiprocessing.Queue
from multiprocessing import Process,Queue,Pipe#进程安全Queue的基本使用
q = Queue()def f(q):print('hello')#当队列内容为空,get操作会阻塞!,直到传入dataprint(q.get())print('world')p = Process(target=f,args=(q,))
p.start()
q.put('yes it  is')

  • 使用multiprocessing.Pipe
from multiprocessing import Process,Pipedef f(c):#无数据会阻塞在这里data = c.recv()print(data)c1,c2 = Pipe()
p = Process(target=f,args=(c2,))
p.start()
c1.send('hello world')

3、 多进程使用场景:cpu密集型操作

from threading import Thread
from multiprocessing import Processdef isarmstrong(n):'''求n是不是水仙花数,返回bool结果(无须关注具体算法)'''a,t = [],nwhile t > 0:a.append(t % 10)t /= 10k = len(a)return sum(x * k for x in a) == ndef findarmstrong(a,b):'''在a-b间寻找水仙花树'''result = [x for x in range(a,b) if isarmstrong(x)]print(result)def run_multithreads(*args):'''采用多线程处理寻找水仙花树的任务,args传入的是多个查找范围'''threads = [Thread(target=findarmstrong,args=(a,b)) for a,b in args]for thread in threads:thread.start()for thread in threads:thread.join()def run_multiprocess(*args):'''采用多线程处理寻找水仙花树的任务,args传入的是多个查找范围'''proceses = [Process(target=findarmstrong,args=(a,b)) for a,b in args]for process in proceses:process.start()for process in proceses:process.join()if __name__ == '__main__':import timestart = time.time()# run_multiprocess((200000,300000),(300000,400000))run_multithreads((200000,300000),(300000,400000))end = time.time()print(end-start)

多进程明显比多线程快

python3练习题:并发编程(21-25)相关推荐

  1. java并发实战编程pdf_「原创」Java并发编程系列25 | 交换器Exchanger

    2020年Java面试题库连载中 [000期]Java最全面试题库思维导图 [001期]JavaSE面试题(一):面向对象 [002期]JavaSE面试题(二):基本数据类型与访问修饰符 [003期] ...

  2. python开发视频大全_2019年python开发编程21天快速入门视频教程+书籍大全和面试大礼包...

    极力推荐这套python资料,不是那种庞大的复杂的难以入门的课程,这套课程十分简单.其中一套21天入门python的课让你以最快的速度入门,加上另一套python资料包(其中包括了几十本python学 ...

  3. 并发编程 07—— 任务取消

    Java并发编程实践 目录 并发编程 01-- ThreadLocal 并发编程 02-- ConcurrentHashMap 并发编程 03-- 阻塞队列和生产者-消费者模式 并发编程 04-- 闭 ...

  4. Python3 与 C# 并发编程之~ 进程篇

    上次说了很多Linux下进程相关知识,这边不再复述,下面来说说Python的并发编程,如有错误欢迎提出- 如果遇到听不懂的可以看上一次的文章:https://www.cnblogs.com/dotne ...

  5. Python3 与 C# 并发编程之~进程先导篇

    在线预览:http://github.lesschina.com/python/base/concurrency/1.并发编程-进程先导篇.html Python3 与 C# 并发编程之- 进程篇:h ...

  6. python3 asyncio_asyncio--python3未来并发编程主流、充满野心的模块

    介绍 asyncio是Python在3.5中正式引入的标准库,这是Python未来的并发编程的主流,非常重要的一个模块.有一个web框架叫sanic,就是基于asyncio,语法和flask类似,使用 ...

  7. java编程的逻辑 京东,从阿里,京东等大厂面试题中提炼出25道最频繁出现的并发编程难题(附答案)...

    并发编程可以说是Java程序员必须掌握的技能之一,也是最难掌握的一种技能.它要求编程者对计算机最底层的运作原理有深刻的理解,同时要求编程者逻辑清晰.思维缜密,这样才能写出高效.安全.可靠的多线程并发程 ...

  8. Python3 与 C# 并发编程之~ Net篇

    NetCore并发编程 示例代码:https://github.com/lotapp/BaseCode/tree/master/netcore/4_Concurrency 先简单说下概念(其实之前也有 ...

  9. python编程一球从100米_【Python3练习题 015】 一球从100米高度自由落下,每次落地后反跳回原高度的一半,再落下。求它在第10次落地时,共经过多少米?第10次反弹多高?...

    问题:一球从某高度自由落下,每次落地后反跳回原高度的一半:再落下,求它在第n次落地时,共经过多少米?第n次反弹多高? import java.util.Scanner; //题目:一球从100米高度自 ...

最新文章

  1. 【坑爹升级】更新NVIDIA GeForce GTX Ti最新驱动,突然屏幕亮瞎我24K钛合金双眼。导致Fn+F2/F3、win+x无法控制笔记本电脑显示器亮度调整, 电源选项屏幕亮度也不见了!
  2. 程序员应该如何自我驱动,迅速获得成长?
  3. Python 用 if __name__ == ‘__main__‘:语句来控制代码是被直接运行还是导包执行
  4. 计算2的N次方(信息学奥赛一本通-T1170)
  5. 高校网络中心主任挨骂冤不冤?
  6. oracle和mysql文件怎么打开_mysql与oracle数据库停止与打开的批处理文件
  7. Netty-2-服务端创建多个handler
  8. sqoop入门到熟悉
  9. EasyUI 1.5.1 美化主题大包 Insdep Theme 1.0.3 正式版已发布,开源下载
  10. 软件开发模式(瀑布、原型、增量、螺旋、敏捷开发)
  11. 对应阻尼下的开环增益matlab,自动控制原理实验指导书MATLAB版解析.doc
  12. 【深度学习】眼底图像之视盘和黄斑分割的探索
  13. Java学习笔记类对象多态继承(下)
  14. 蚀刻后残留物和光刻胶去除技术
  15. 日活、周活(周重活)、月活 统计
  16. C语言入门与进阶必备书
  17. 将同一文件夹下的图片转化为视频
  18. 图学习中的链路预测任务(持续更新ing...)
  19. 计算机等级一级考试上机试题,计算机等级考试一级上机试题
  20. 汽车滤纸-市场现状及未来发展趋势

热门文章

  1. Qt设置应用程序图标
  2. 经典的”服务器最多65536个连接”误解
  3. Spring Boot使用mongo的GridFS模块
  4. BZOJ3675: [Apio2014]序列分割
  5. 三、初识Socket套接字结构体
  6. RT-thread内核之空闲线程
  7. 2017软件工程第一次作业
  8. POJ 3320 尺取法,Hash,map标记
  9. PhpStorm 默认快捷键
  10. IIS配置不正确可能导致“远程服务器返回错误: (404) 未找到错误一例。