Python实现并发编程(补充joblib&pymysql)

文章目录

  • 一. 并发编程知识点
    • 1.1. 为什么引入并发编程
    • 1.2. 如何选择多线程多进程多协程
      • 1.2.1. 什么是CPU密集型计算、IO密集型计算
      • 1.2.2. 多线程、多进程、多协程的对比
      • 1.2.3. 怎样根据任务选择对应技术?
    • 1.3. Python速度慢的原因
      • 1.3.1. Python慢的原因(动态,GIL)
      • 1.3.2. 全局解释器锁(Global Interpreter Lock,GIL)
      • 1.3.3. 怎样规避GIL带来的限制
  • 二. 并发编程实战
    • 2.1. 多线程加速爬虫
      • 2.1.1. <font color=blue>理解一下多线程(重要)
      • 2.1.2. Python创建多线程的方法
      • 2.1.3. 代码实现
    • 2.2. Python实现生产者消费爬虫
      • 2.2.1 多组件的Pipeline技术架构
      • 2.2.2 生产者消费者爬虫的架构
      • 2.2.3 多线程数据通信的queue.Queue
      • 2.2.4 代码实现
    • 2.3. 线程安全问题与解决
      • 2.3.1. 线程安全介绍
      • 2.3.2. Lock用于解决线程安全问题
      • 2.3.2. 代码实现
    • 2.4. 好用的线程池
      • 2.3.1. 线程池原理
      • 2.3.2. 使用线程池的好处
      • 2.3.3. ThreadPoolExecutor的使用语法
      • 2.3.4. 使用线程池改造爬虫代码
    • 2.5. 多进程multiprocessing加速<font color=blue>(重要)
      • 2.5.1. 有了多线程threading,为什么还要用多进程mutiprocessing?
      • 2.5.2. 多进程multiprocessing知识梳理
      • 2.5.3. 代码实战(单线程vs多线程vs多进程对比CPU密集型速度)
      • 2.5.4. <font color=blue>多进程加进度条bar
    • 2.6. Flask服务中使用进程池
    • 2.7. <font color=blue>异步IO实现并发爬虫(新知识asyncio)
    • 2.8. <font color=blue>异步IO使用信号量爬虫并发度
  • 三. 并发编程(基于joblib包)
  • 四. Python连接Mysql数据库

一. 并发编程知识点

1.1. 为什么引入并发编程

  • 场景1:一个网络爬虫,按顺序爬取花了1小时,采用并发下载减少到20分钟!
  • 场景2:一个APP应用,优化前每次打开页面需要3秒,采用异步并发提升到每次200毫秒;
  • 引入并发编程的目的就是为了提升程序的运行速度!

  • Python对并发编程的支持
  • 多线程:threading,利用CPU和IO可以同时执行的原理,让CPU不会干巴巴等待IO完成
  • 多进程:multiprocessing,利用多核CPU的能力,真正的并行执行任务
  • 异步IO:asyncio,在单线程利用CPU和IO同时执行的原理,实现函数异步执行
  • 使用Lock对资源加锁,防止冲突访问
  • 便用Queue实现不同线程/进程之间的数据通信,实现生产者-消费者模式
  • 使用线程池Pool/进程池Pool,简化线程/进程的任务提交、等待结東、获取结果
  • 使用subprocess启动外部程序的进程,并进行输入输出交互

1.2. 如何选择多线程多进程多协程

  • Python并发编程有三种方式:多线程Thread、多进程Process、多协程Coroutine
  • 多协程是新出的一个技术,性能会更好,但是要求库支持协程。

1.2.1. 什么是CPU密集型计算、IO密集型计算

1.2.2. 多线程、多进程、多协程的对比

1.2.3. 怎样根据任务选择对应技术?

1.3. Python速度慢的原因

1.3.1. Python慢的原因(动态,GIL)

  • 相比C/C+ +/JAVAPython确实慢,在一些特殊场景下,Python比C++慢100~200倍
  • 由于速度慢的原因,很多公司的基础架构代码依然用C/C++开发,比如各大公司阿里/腾讯/快手的推荐引擎、搜索引擎、存储引擎等底层对性能要求高的模块。

1.3.2. 全局解释器锁(Global Interpreter Lock,GIL)

  • 由于GIL的存在,即使电脑有多核CPU,单个时刻也只能使用1个,相比并发加速的C++/JAVA所以慢
  • Python这么牛逼的语言,为什么有GIL这个软肋的存在呢?下面解释一下:
  • 引入GIL是为了解决多线程之间数据完整性和数据同步的问题,Python中对象的管理是使用引用计数器进行的,引用数为0则释放对象
  • 真实模拟如下,线程A和线程B都引用了对象objobj,ref_num=2,假设某一个时刻线程A和B都想撤销对obj的引用。对于线程A来说,首先执行ref_num--,变成1,可是在操作系统执行的过程中线程可以随时发生切换,假如这个时候发生多线程调度切换,切换到了线程B,B首先将ref_num--,变成0,线程B检测到这个ref_num==0了,释放obj对象。这个时候又发生多线程调度切换,切换到了线程A,这个时候发现ref_num==0,这个时候obj在内存中已经不存在了,这个时候程序会报错。
  • 在此基础上Python进行了封装,解决了这些问题,这就是为什么GIL的存在了,它简化了Python对共享资源的管理

1.3.3. 怎样规避GIL带来的限制

二. 并发编程实战

2.1. 多线程加速爬虫

2.1.1. 理解一下多线程(重要)

  • 下面的运行时间为什么是0呢? 正常按照我们的理解,两个线程并行它应该是2s呀,是因为我们在运行.py文件的时候,我们这里创建了2个线程thread,还有一个线程就是主线程,虽然两个线程运行起来了,但是在下面脚本中运行了其它代码print(),它在主线程中运行的,所以一共有3个线程,其中有一个mainThread主线程,线程之间既然可以并行,就意味着程序thread1,thread2开始之后,在sleep之前就主线程仍然可以往下执行print。
  • 但是主线程执行完之后程序并没有退出,如果主线程退出的话,就不会打印出end了,主线程一旦退出,进程也就退出了,end是执行不到了。

import threading
import time# 比如爬虫,简单写一些
def get_detail_html(url):""" 模拟获取html,当成网络请求 """print("get detail html started")time.sleep(2)print("get detail html end")def get_detail_url(url):""" 模拟获取url"""print("get detail url started")time.sleep(2)print("get detail url end")if __name__ == '__main__':thread1 = threading.Thread(target=get_detail_html, args=("",), name="html")thread2 = threading.Thread(target=get_detail_url, args=("",), name="url")start_time = time.time()thread1.start()thread2.start()print("last time: {}".format(time.time() - start_time))
# 运行结果:
get detail html startedget detail url started
last time: 0.0003478527069091797get detail html end
get detail url end
  • 为了解决上面主线程和其它线程并行执行了,我们希望在两个线程执行完毕再执行主线程,这个时候只需要在前面加上 thread1.join(), thread2.join(),这样就会在此位置就行阻塞,它等待两个线程完成之后才会执行下面print时间的地方。

2.1.2. Python创建多线程的方法

  • 1. 准备一个函数
def my_func(a, b):do_craw(a, b)
  • 2. 怎样创建一个线程
import threadingt = threading.Thread(target=my_func, args(100, 200)) # 创建一个线程对象,args是一个元组
  • 3. 启动线程
t.start()
  • 4. 等待结束
t.join() # 等到线程结束

2.1.3. 代码实现

  • 单线程执行过程中是顺序执行的,多线程执行不是顺序执行的,系统按照自己的规则调度的!
import time
import requests
import threading
import functoolsurls = [f"https://www.cnblogs.com/#{page}"for page in range(1, 51)
]def craw(url):r = requests.get(url)# print(url, len(r.text))# craw(urls[0])def timer(func):""" 计时器装饰器 """@functools.wraps(func)def wrapper(*args, **kwargs):start = time.time()func(*args, **kwargs)end = time.time()print("{} takes {} seconds".format(func.__name__, end - start))return wrapper@timer
def single_thread():""" 单线程版本 """print("single_thread start")for url in urls:craw(url)print("single_thread end")@timer
def multi_thread():""" 多线程版本 """print("multi_thread start")thread_list = []# 1. 创建多个线程for url in urls:  # 对于每个url我都启动一个线程thread_list.append(threading.Thread(target=craw, args=(url,))  # 元组逗号)# 2. 启动线程for thread in thread_list:  # 启动50个线程thread.start()# 3. 等待结束for thread in thread_list:  # 等待结束thread.join()print("multi_thread end")if __name__ == '__main__':single_thread()multi_thread()# 执行结果
single_thread start
single_thread end
single_thread takes 5.113051891326904 seconds
multi_thread start
multi_thread end
multi_thread takes 1.73423171043396 seconds

2.2. Python实现生产者消费爬虫

2.2.1 多组件的Pipeline技术架构

  • 通常复杂的事情一般都不会一下子做完,而是会分很多中间步骤一步步完成,如下例子:由输入数据得到输出数据中间会经过很多的模块,而且模块之间会通过中间数据进行交互,这些不同的处理模块叫做处理器,比如编号1,编号X,编号N很多个。把一件事分成很多处理模块的架构叫做Pipeline,每个处理模块也有一个名字叫Process。
  • 其实生产者消费者就是典型的Pipeline,有2个角色生产者和消费者,生产者生产的结果会通过中间数据交给消费者进行消费。而生产者以数据数据作为原料,消费者以输出作为最终的数据数据。

2.2.2 生产者消费者爬虫的架构

  • 里面有2个Processor:
  • 第1个Processor生产者获取待爬取的URL进行网页下载,下载好的内容放到网页队列中。
  • 第2个Processor消费者从队列中获取数据,进行网页的解析,把结果进行存储,如商品价格、品牌。

2.2.3 多线程数据通信的queue.Queue

  • queue.Queue可以 用于多线程之间的、线程安全(线程安全指多个线程并发的访问数据不会出现冲突)的数据通信
# 1. 导入类库
import queue# 2. 创建Queue
q = queue.Queue()# 3. 添加元素
q.put(item)# 4. 获取元素
item = q.get()# 5. 查询状态
q.qsize()  # 元素的大小
q.empty()  # 判断是否为空
q.full()  # 判断是否已满

2.2.4 代码实现

  • 接着上一个例子,也是爬取网页,这里解析出文章的标题。
  • blog_spider.py如下
import time
import requests
import functools
from bs4 import BeautifulSoupurls = [f"https://www.cnblogs.com/#{page}"for page in range(1, 51)
]def craw(url):""" 爬取网页 """r = requests.get(url)return r.textdef parse(html):""" 解析标题 """# class="post-item-title"soup = BeautifulSoup(html, "html.parser")links = soup.find_all("a", class_="post-item-title")# 标题和链接元组return [(link["href"], link.get_text()) for link in links]def timer(func):""" 计时器装饰器 """@functools.wraps(func)def wrapper(*args, **kwargs):start = time.time()func(*args, **kwargs)end = time.time()print("{} takes {} seconds".format(func.__name__, end - start))return wrapperif __name__ == '__main__':for result in parse(craw(urls[1])):print(result)
  • producer_consumer_spider.py如下
import queue
import threading
import blog_spider
import time
import randomdef do_craw(url_queue: queue.Queue, html_queue: queue.Queue):"""生产者:这是一个processor,它有两个参数输入队列和输出队列"""while True:url = url_queue.get()html = blog_spider.craw(url)html_queue.put(html)print(threading.currentThread().name, f"craw={url}",f"url_queue.size={url_queue.qsize()}")  # 打印一下当前线程的名字time.sleep(random.randint(1, 2))def do_parse(html_queue: queue.Queue, fout):"""消费者:这是一个processor,两个参数,写入到一个文件中"""while True:html = html_queue.get()results = blog_spider.parse(html)for result in results:fout.write(str(result) + "\n")print(threading.currentThread().name, f"len(result)={len(result)}",f"url_queue.size={url_queue.qsize()}")  # 打印一下当前线程的名字time.sleep(random.randint(1, 2))if __name__ == '__main__':url_queue = queue.Queue()html_queue = queue.Queue()for url in blog_spider.urls:url_queue.put(url)# 新建3个生产者线程for idx in range(6):t = threading.Thread(target=do_craw, args=(url_queue, html_queue), name=f"craw{idx}")t.start()# 新建2个消费者线程fout = open("o2.data.txt", "w")for idx in range(2):t = threading.Thread(target=do_parse, args=(html_queue, fout), name=f"parse{idx}")t.start()
  • 最后程序卡住了,因为while True一直等待queue中的内容。
craw0 craw=https://www.cnblogs.com/#1 url_queue.size=44
craw1 craw=https://www.cnblogs.com/#2 url_queue.size=44
parse1 len(result)=2 parse0 len(result)=2 url_queue.size=44
url_queue.size=44
craw4 craw=https://www.cnblogs.com/#5 url_queue.size=44
craw2 craw=https://www.cnblogs.com/#3 url_queue.size=44
craw3 craw=https://www.cnblogs.com/#4 url_queue.size=44
craw5 craw=https://www.cnblogs.com/#6 url_queue.size=44
parse1 len(result)=2 url_queue.size=39
craw1 craw=https://www.cnblogs.com/#7 url_queue.size=39
craw2 craw=https://www.cnblogs.com/#9 url_queue.size=39
craw3 craw=https://www.cnblogs.com/#10 url_queue.size=39
craw5 craw=https://www.cnblogs.com/#11 url_queue.size=39
craw4 craw=https://www.cnblogs.com/#8 url_queue.size=39
parse0 len(result)=2 url_queue.size=38
craw0 craw=https://www.cnblogs.com/#12 url_queue.size=38
parse1 len(result)=2 url_queue.size=37
craw1 craw=https://www.cnblogs.com/#13 url_queue.size=34
craw2 craw=https://www.cnblogs.com/#14 url_queue.size=34
craw3 craw=https://www.cnblogs.com/#15 url_queue.size=34
craw5 craw=https://www.cnblogs.com/#16 url_queue.size=34
parse1 len(result)=2 url_queue.size=34
craw1 craw=https://www.cnblogs.com/#17 url_queue.size=31
craw4 craw=https://www.cnblogs.com/#18 url_queue.size=31
craw5 craw=https://www.cnblogs.com/#19 url_queue.size=31
parse0 len(result)=2 url_queue.size=30
craw0 craw=https://www.cnblogs.com/#20 url_queue.size=30
craw2 craw=https://www.cnblogs.com/#21 url_queue.size=28
craw3 craw=https://www.cnblogs.com/#22 url_queue.size=28
parse1 len(result)=2 url_queue.size=27
craw0 craw=https://www.cnblogs.com/#23 url_queue.size=26
craw4 craw=https://www.cnblogs.com/#25 url_queue.size=23
craw1 craw=https://www.cnblogs.com/#24 url_queue.size=23
craw3 craw=https://www.cnblogs.com/#26 url_queue.size=23
craw5 craw=https://www.cnblogs.com/#27 url_queue.size=23
parse0 len(result)=2 url_queue.size=23
parse1 len(result)=2 url_queue.size=23
craw0 craw=https://www.cnblogs.com/#28 url_queue.size=20
craw2 craw=https://www.cnblogs.com/#29 url_queue.size=20
craw1 craw=https://www.cnblogs.com/#31 url_queue.size=18
craw4 craw=https://www.cnblogs.com/#30 url_queue.size=18
craw3 craw=https://www.cnblogs.com/#32 url_queue.size=18
craw0 craw=https://www.cnblogs.com/#33 url_queue.size=15
craw2 craw=https://www.cnblogs.com/#34 url_queue.size=15
craw5 craw=https://www.cnblogs.com/#35 url_queue.size=14
craw4 craw=https://www.cnblogs.com/#36 url_queue.size=13
craw3 craw=https://www.cnblogs.com/#37 url_queue.size=13
parse0 len(result)=2 url_queue.size=13
parse1 len(result)=2 url_queue.size=13
craw0 craw=https://www.cnblogs.com/#38 url_queue.size=10
craw5 craw=https://www.cnblogs.com/#40 url_queue.size=8
craw1 craw=https://www.cnblogs.com/#39 url_queue.size=8
craw3 craw=https://www.cnblogs.com/#42 url_queue.size=8
craw4 craw=https://www.cnblogs.com/#41 url_queue.size=8
parse1 len(result)=2 url_queue.size=8
craw2 craw=https://www.cnblogs.com/#43 url_queue.size=7
craw0 craw=https://www.cnblogs.com/#44 url_queue.size=6
craw1 craw=https://www.cnblogs.com/#45 url_queue.size=4
craw4 craw=https://www.cnblogs.com/#46 url_queue.size=4
parse0 len(result)=2 url_queue.size=4
parse1 len(result)=2 url_queue.size=4
craw5 craw=https://www.cnblogs.com/#47 url_queue.size=3
craw3 craw=https://www.cnblogs.com/#48 url_queue.size=2
craw4 craw=https://www.cnblogs.com/#49 url_queue.size=1
parse0 len(result)=2 url_queue.size=1
parse1 len(result)=2 url_queue.size=1
craw2 craw=https://www.cnblogs.com/#50 url_queue.size=0
parse1 len(result)=2 url_queue.size=0
parse0 len(result)=2 url_queue.size=0

2.3. 线程安全问题与解决

2.3.1. 线程安全介绍

  • 线程安全指某个函数、函数库在多线程环境中被调用时,能够正确地处理多个线程之间的共享变量,使程序功能正确完成
  • 由于线程的执行随时会发生切换,就造成了不可预料的结果,出现线程不安全

2.3.2. Lock用于解决线程安全问题

2.3.2. 代码实现

  • 运行结果:有时候执行成功,有时候执行失败,如果在下面加一句sleep它会一直出问题,因为sleep语句一定会导致当前线程的阻塞,进行线程的切换。
import threading
import timeclass Account:""" 银行账户 """def __init__(self, balance):self.balance = balancedef draw(account: Account, amount):""" 取钱 """if account.balance > amount:time.sleep(0.1)print(threading.currentThread().name, "取钱成功")account.balance -= amountprint(threading.currentThread().name, "余额", account.balance)else:print(threading.currentThread().name, "取钱失败, 余额不足")if __name__ == '__main__':account = Account(1000)ta = threading.Thread(target=draw, args=(account, 800), name="ta")tb = threading.Thread(target=draw, args=(account, 800), name="tb")ta.start()tb.start()ta.join()tb.join()
  • 增加锁lock
import threading
import timelock = threading.Lock()class Account:""" 银行账户 """def __init__(self, balance):self.balance = balancedef draw(account: Account, amount):""" 取钱 """with lock:if account.balance > amount:time.sleep(0.1)print(threading.currentThread().name, "取钱成功")account.balance -= amountprint(threading.currentThread().name, "余额", account.balance)else:print(threading.currentThread().name, "取钱失败, 余额不足")if __name__ == '__main__':account = Account(1000)ta = threading.Thread(target=draw, args=(account, 800), name="ta")tb = threading.Thread(target=draw, args=(account, 800), name="tb")ta.start()tb.start()ta.join()tb.join()

2.4. 好用的线程池

2.3.1. 线程池原理

  • 新建线程系统需要分配资源、终止线程 系统需要回收资源,

  • 如果可以重用线程,则可以减去新建/终止的开销(减少时间开销)
  • 线程池的原理本身就是基于重用线程这个原理来减少时间开销,具体如何流转的呢?
  • 线程池由两部分组成:提前建立好的线程,这些线程会被重复地使用,同时还有一个 任务队列 的概念! 当新来一个任务,首先不是一个一个的创建线程,而是先放进一个任务队列,咱们创建好的线程挨着取出任务进行依次的执行,执行好这个任务之后,它会取下一个任务进行执行。

2.3.2. 使用线程池的好处

2.3.3. ThreadPoolExecutor的使用语法

2.3.4. 使用线程池改造爬虫代码

import concurrent.futures
import blog_spider# craw
with concurrent.futures.ThreadPoolExecutor() as pool:""" 用法1:map方式, 很简单"""htmls = pool.map(blog_spider.craw, blog_spider.urls)htmls = list(zip(blog_spider.urls, htmls))for url, html in htmls:print(url, len(html))
print("craw over")# parse
with concurrent.futures.ThreadPoolExecutor() as pool:""" 用法1:feature方式,更强大"""features = dict()for url, html in htmls:feature = pool.submit(blog_spider.parse, html)  # 它是一个一个提交的features[feature] = url# for feature, url in features.items(): # 方式1:输出有顺序#     print(url, feature.result())for feature in concurrent.futures.as_completed(features):  # 方式2:输出无顺序url = features[feature]print(url, feature.result())print("parse over")

2.5. 多进程multiprocessing加速(重要)

2.5.1. 有了多线程threading,为什么还要用多进程mutiprocessing?

  • 全局解释器锁GIL是计算机程序设计语言解释器用于同步线程的一种机制,它使得 任何时刻仅有一个线程在执行即便在多核心处理器上,使用 GIL 的解释器也只允许同一时间执行一个线程。所以对于CPU密集型计算,多线程反而会降低执行速度!

2.5.2. 多进程multiprocessing知识梳理

  • multiprocessing模块就是python为了解决GIL缺陷引入的一个模块,原理是用多进程在多CPU上并行执行

2.5.3. 代码实战(单线程vs多线程vs多进程对比CPU密集型速度)

import time
import math
import functools
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutorimport multiprocessingPRIMES = [1, 2, 4, 5, 6, 112272535095293] * 5def timer(func):""" 计时器装饰器 """@functools.wraps(func)def wrapper(*args, **kwargs):start = time.time()func(*args, **kwargs)end = time.time()print("{} takes {} seconds".format(func.__name__, end - start))return wrapperdef is_prime(num):""" 判断一个数是否为素数 """if num == 1:return False# 循环需要判断的次数for i in range(2, int(math.sqrt(num) + 1)):if num % i == 0:return Falsereturn True@timer
def single_thread():""" 单线程 """for num in PRIMES:is_prime(num)@timer
def multi_thread():""" 多线程 """with ThreadPoolExecutor() as pool:pool.map(is_prime, PRIMES)@timer
def multi_process1():""" 多进程 """with ProcessPoolExecutor() as pool:results = pool.map(is_prime, PRIMES)print(list(results))@timer
def multi_process2():pool = multiprocessing.Pool()results = pool.map(is_prime, PRIMES)print(list(results))pool.close()@timer
def multi_process3():pool = multiprocessing.Pool()results = []for num in PRIMES:# pool.apply_async(is_prime, (num,))results.append(pool.apply_async(is_prime, (num,)))print([result.get() for result in results])# print(list(results))if __name__ == '__main__':single_thread()multi_thread()multi_process1()multi_process2()multi_process3()# 运行结果
single_thread takes 7.030707120895386 seconds
multi_thread takes 6.476134300231934 seconds
[False, True, False, True, False, True, False, True, False, True, False, True, False, True, False, True, False, True, False, True, False, True, False, True, False, True, False, True, False, True]
multi_process1 takes 2.2322838306427 seconds
[False, True, False, True, False, True, False, True, False, True, False, True, False, True, False, True, False, True, False, True, False, True, False, True, False, True, False, True, False, True]
multi_process2 takes 2.474424123764038 seconds
[False, True, False, True, False, True, False, True, False, True, False, True, False, True, False, True, False, True, False, True, False, True, False, True, False, True, False, True, False, True]
multi_process3 takes 2.73321795463562 seconds

2.5.4. 多进程加进度条bar

  • Python多进程打印进度条:https://cloud.tencent.com/developer/article/1617778

2.6. Flask服务中使用进程池

  • 当你定义这个pool的时候,它所依赖的这些函数必须都已经声明完了,所以process_pool = ProcessPoolExecutor()必须放在最下面,所有函数都声明完,才能正常使用。同时还要定义在if __name__ == '__main__':函数中。
import math
import json
import flask
from concurrent.futures import ProcessPoolExecutorapp = flask.Flask(__name__)def is_prime(num):""" 判断一个数是否为素数 """if num == 1:return False# 循环需要判断的次数for i in range(2, int(math.sqrt(num) + 1)):if num % i == 0:return Falsereturn True@app.route("/is_prime/<numbers>")
def api_is_prime(numbers):number_list = [int(x) for x in numbers.split(",")]  # 这是cpu密集型计算,多进程加速results = process_pool.map(is_prime, number_list)return json.dumps(dict(zip(number_list, results)))if __name__ == '__main__':process_pool = ProcessPoolExecutor()app.run()

  • 注意对于多线程的使用其实非常灵活,你定义在哪里都可以,比较灵活,因为它共享当前进程的所有环境。但是在多进程在使用的过程中就遇到一些问题。而flask框架中使用多进程的方式在main函数里面,在app.run()之前初始化进程池,然后在所有的函数里就可以使用这个pool.map。

2.7. 异步IO实现并发爬虫(新知识asyncio)

  • 单线程爬虫的执行路径

  • 其中 超级循环不会一直等待,而是会执行下一个任务。

import time
import asyncio
import aiohttp
import blog_spider# 1. 定义协程,注意异步的实现前面都加上了async
async def async_craw(url):print("cral url: ", url)async with aiohttp.ClientSession as session:  # 定义一个异步对象async with session.get(url) as response:result = await response.text()  # 获取内容,刚才提到await时候,超级循环不会一直等待,而会执行下一个任务print(f"craw url: {url}, {len(result)})")  # 这样的话携程就开发完了,协程就是在异步io中执行该函数# 2. 获取事件循环
loop = asyncio.get_event_loop()# 3. 创建task列表
tasks = [loop.create_task(async_craw(url))  # 对每个url创建一个taskfor url in blog_spider.urls
]start = time.time()
loop.run_until_complete(asyncio.wait(tasks))
print("cost time is: ", time.time() - start)

2.8. 异步IO使用信号量爬虫并发度

import time
import asyncio
import aiohttp
import blog_spidersemaphore = asyncio.Semaphore(10)  ####### 信号量,并发度设置10# 1. 定义协程,注意异步的实现前面都加上了async
async def async_craw(url):async with semaphore:  #######print("cral url: ", url)async with aiohttp.ClientSession as session:  # 定义一个异步对象async with session.get(url) as response:result = await response.text()  # 获取内容,刚才提到await时候,超级循环不会一直等待,而会执行下一个任务await asyncio.sleep(5)print(f"craw url: {url}, {len(result)})")  # 这样的话携程就开发完了,协程就是在异步io中执行该函数# 2. 获取事件循环
loop = asyncio.get_event_loop()# 3. 创建task列表
tasks = [loop.create_task(async_craw(url))  # 对每个url创建一个taskfor url in blog_spider.urls
]start = time.time()
loop.run_until_complete(asyncio.wait(tasks))
print("cost time is: ", time.time() - start)

三. 并发编程(基于joblib包)

  • Python的并行远不如Matlab好用。比如Matlab里面并行就直接把for改成parfor就行(当然还要注意迭代时下标的格式),而Python查 一查并行,各种乱七八糟的方法一大堆,而且最不爽的一点就是只能对函数进行并行。
  • 大名鼎鼎的sklearn里面集成了很方便的并行计算,仔细查看其代码发现就是用 joblib实现 的,并且用法还挺巧
# !/usr/bin/env python
# -*- encoding: utf-8 -*-
import time
import numpy as np
from numba import jit
from joblib import Parallel, delayed# @jit(nopython=True)  # jit,numba装饰器中的一种
def test_mvar(a, b):a = np.random.rand(10, 10)b = np.random.rand(10, 10)return np.matmul(a, b)start = time.time()
maxlen = 100000
# 指定5个CPU(默认是分配给不同的CPU), 一般而言,直接采用默认方式loky就好。
parallel = Parallel(n_jobs=5, backend='multiprocessing')  # 提供的有 multiprocessing, threading, finally
# delayed(sqrt)表示要用的函数是sqrt,这里这种用法就非常类似C++里面的委托(delegate)。
out = parallel(delayed(test_mvar)(i, j) for i, j in zip(np.random.rand(maxlen), np.random.rand(maxlen))
)
# print(out)
print(time.time() - start)# multiprocessing耗时: 3.5902414321899414
# loky耗时: 10.363945722579956
  • numba支持的东西很有限,意义不大。

  • Python之并行–基于joblib:https://blog.csdn.net/cauchy7203/article/details/107545490
  • numba,让python速度提升百倍:https://zhuanlan.zhihu.com/p/78882641

四. Python连接Mysql数据库

  • Python连接Mysql数据库:https://blog.csdn.net/Kqs19513500676/article/details/123750419
  • 【Python之pymysql库学习】4.数据表插入多条数据(保姆级图文+实现代码):https://blog.csdn.net/u011027547/article/details/122520776
# !/usr/bin/env python
# -*- encoding: utf-8 -*-
"""====================================
@Project: wuhan
@Author : kaifang zhang
@Date   : 2022/11/21 15:06
@Contact: kaifang.zkf@dtwave-inc.com
====================================="""
import pymysql
import datetime
import pandas as pdchinese2englist = {'合江门': 'yibin', '白沙湾': 'jiangan', '泸州站': 'luzhou', '朱沱站': 'zhutuo', '寸滩站': 'cuntan','长寿站': 'changshou', '涪陵站': 'fuling', '万州站': 'wanzhou', '奉节站': 'fengjie', '秭归': 'zigui','宜昌': 'yichang', '李家渡': 'zhijiang', '沙市(杨陵矶)': 'shashi', '郝穴': 'zhougongdi', '监利': 'jianli','城陵矶': 'chenglingji', '莫家河': 'mojiahe', '汉口': 'hankou', '黄石': 'huangshi', '九江': 'jiujiang','安庆': 'anqing', '铜陵': 'tongling', '芜湖': 'wuhu', '南京': 'nanjing', '镇江': 'zhenjiang'
}# 打开数据库连接
db = pymysql.connect(host='192.168.90.202',port=3306,user='root',passwd='root@1298',  db='waterPredict',charset='utf8')
# 使用cursor()方法获取操作游标
cursor = db.cursor()def select_data(start, end):""" 查询指定日期的数据 """sql = '''select * from bas_water_level_copy1where measure_time between '{}' and '{}''''.format(start, end)try:cursor.execute(sql)  # 执行SQL语句results = cursor.fetchall()  # 获取所有记录列表for row in results:print(row)except pymysql.MySQLError as e:print(f"查询数据失败,{e}")def delete_data(start, end):""" 删除指定当天的数据 """sql = '''delete from bas_water_level_copy1 where measure_time between '{}' and '{}''''.format(start, end)try:cursor.execute(sql)  # 执行sql语句db.commit()  # 提交事务print('删除数据成功')except pymysql.MySQLError as e:db.rollback()print(f'删除数据失败,{e}')def insert_data(data):""" 批量添加数据到mysql数据库中 """sql = '''insert into bas_water_level_copy1 (site_name,measure_time,measure_water_85,site_name_en)value (%s,%s,%s,%s)'''try:cursor.executemany(sql, data)  # 执行sql语句db.commit()  # 提交到数据库执行print('插入多条数据成功')except pymysql.MySQLError as e:db.rollback()  # 如果发生错误则回滚print(f'插入多条数据失败,{e}')def read_xlsx(filepath):""" 读取当天水位数据 """today_data = pd.read_excel(filepath)[['站点名称', '测量时间', '输出水位85']]today_data = [row + [chinese2englist[row[0]]] for row in today_data.values.tolist()]return today_datadef main():# ------ 获取当前的时间now = datetime.datetime.now()today = now.strftime("%F %T")[:10]yesterday = (now - datetime.timedelta(days=1)).strftime("%F %T")[:10]filepath = "final_" + today + ".xlsx"  # 文件路径start = yesterday + " 09:00:00"  # 开始日期end = today + " 09:00:00"  # 结束日期# ------ 读取数据data_list = read_xlsx(filepath)# ------ 查询数据select_data(start, end)# ------ 删除数据delete_data(start, end)# # ------ 插入数据insert_data(data_list)db.close()  # 关闭数据库连接if __name__ == '__main__':main()

『Python学习笔记』Python实现并发编程(补充joblibpymysql)相关推荐

  1. 『Python学习笔记』Python中的异步Web框架之fastAPI介绍RestAPI

    Python中的异步Web框架之fastAPI介绍&RestAPI 文章目录 一. fastAPI简要介绍 1.1. 安装 1.2. 创建 1.3. get方法 1.4. post方法 1.5 ...

  2. 『Python学习笔记』Mac系统汇总:终端文件显示不同颜色Pycharm代码模板JAVA环境变量

    Mac系统:终端文件显示不同颜色&Pycharm代码模板&JAVA环境变量! 文章目录 一. 显示不同颜色 二. mac电脑怎么设置触控板拖动 三. mac系统中termius终端中文 ...

  3. python基础论文_北大博士Python学习笔记,Python基础语法总结,一篇文章带你入门...

    image.png 网上现在Python学习资料有很多,但是很杂.很多初学Python的朋友就不知道该怎么去抉择,那些是自己当下所需要的. 刚好朋友是北大的博士,在IT行业也工作八年了.就把他学习Py ...

  4. 2022年最高效,踏实的Python学习笔记以及Python学习规划

    一.学习建议 1.找到自己感兴趣的方向,并且结合市场需求进行选择 Python的应用范围 测试 运维 web 人工智能 大数据 爬虫及数据分析 办公自动化 2.学习过程中一定要勤加练习,并且尝试去使用 ...

  5. python面向对象编程72讲_2020-07-22 Python学习笔记27类和面向对象编程

    一些关于自己学习Python的经历的内容,遇到的问题和思考等,方便以后查询和复习. 声明:本人学习是在扇贝编程通过网络学习的,相关的知识.案例来源于扇贝编程.如果使用请说明来源. 第27关 类与面向对 ...

  6. Python学习笔记1 Python基础

    第1章 Python基础 1.1 Python概述及版本介绍 1.Python是一种面向对象的解释型计算机程序设计语言(解释型语言源代码->解释器逐行解释每一句源代码) 2.优点:高级语言.可移 ...

  7. Python学习笔记: Python 标准库概览二

    本文来自:入门指南 开胃菜参考:开胃菜 使用Python解释器:使用Python解释器 本文对Python的简介:Python 简介 Python流程介绍:深入Python 流程 Python数据结构 ...

  8. Python学习笔记: Python 标准库概览

    本文来自:入门指南 开胃菜参考:开胃菜 使用Python解释器:使用Python解释器 本文对Python的简介:Python 简介 Python流程介绍:深入Python 流程 Python数据结构 ...

  9. Python学习笔记(一):编程和Python

    计算机是怎么工作的 计算机有很多很多的电路.在最底层,这些电路是一些开关.工程师和计算机科学家们使用"1"和"0"来代表开关的"开"和&qu ...

最新文章

  1. 梅宏院士:操作系统变迁有20年周期律,泛在计算是一片新蓝海
  2. Redis 过期键删除策略
  3. 江西财经计算机应用杨教授,江西财经大学信息管理学院研究生导师介绍:刘德喜...
  4. Erlang的散列数据结构
  5. sql日期相关函数的使用方法
  6. 在SAP云平台测试帐号中启用Kyma运行环境
  7. git status查看文件的状态
  8. Oracle官方推荐的性能测试工具!简单、精准又直观!
  9. sql IDENTITY(函数)
  10. 四川大学计算机组成原理实验报告_20092014年计算机组成原理考研真题与解析
  11. Oracle 密码过期问题
  12. 重大改革!Python将被加入高考科目!
  13. 网易校招2018----题目2----相反数
  14. .NET Compact Framework 2.0 SP1 发布了
  15. Idea打包Jar文件
  16. 对计算机病毒的防治也,对计算机病毒及防范对策研究.doc
  17. 初识Python自动化运维(一)
  18. Burpsuite 上
  19. Can't connect to MySQL server on 'xxx.xxx.xxx.xxx' (10061),错误代码2003
  20. CentOS 7 YUM安装MariaDB 10.1

热门文章

  1. Web项目 - 登录注册业务逻辑
  2. ChatGPT背后的经济账
  3. mysql无法启动以及cmd下mysql命令无法识别的
  4. 数据结构与算法图解——树
  5. 入手评测 r5 5600x和r7 5700g选哪个好
  6. 人力资源(HR)软件的全球与中国市场2022-2028年:技术、参与者、趋势、市场规模及占有率研究报告
  7. 【JavaWeb】Http请求报文详解
  8. MinIO高性能对象存储
  9. UNIX痛恨者手册[转载]
  10. java工具类编写:工具类如何与业务代码完全分开