先说结论:Pika is not thread safe. Use a BlockingConnection per-thread。

即 Pika 并不是线程安全的,应该在每个线程里,都使用各种的 BlockingConnection

相关 issue:https://github.com/pika/pika/issues/1237

示例一:线程外创建 connection,线程里创建 channel

来源:https://github.com/pika/pika/issues/1237

# -*- coding: utf-8 -*-
# @Time   : 2020/11/5 18:24
# @Author : wuimport pika
import threadingconnection = pika.BlockingConnection(pika.URLParameters('amqp://guest:guest@localhost:5672'))def loop1():channel = connection.channel()channel.basic_publish(exchange='', routing_key='tasks', body='Hello World!')print(1)def loop2():channel = connection.channel()channel.basic_publish(exchange='', routing_key='tasks', body='Hello World!')print(2)threading.Thread(target=loop1, name='LoopThread').start()
threading.Thread(target=loop2, name='LoopThread').start()

输出

1
Exception in thread LoopThread:
Traceback (most recent call last):File "/Users/wu/.pyenv/versions/3.7.5/lib/python3.7/threading.py", line 926, in _bootstrap_innerself.run()File "/Users/wu/.pyenv/versions/3.7.5/lib/python3.7/threading.py", line 870, in runself._target(*self._args, **self._kwargs)File "/Users/wu/Work/youmi/projects/ym-crawler-ccs/dataparser/test12.py", line 19, in loop2channel.basic_publish(exchange='', routing_key='tasks', body='Hello World!')File "/Users/wu/.pyenv/versions/ym-crawler-ccs/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 2248, in basic_publishself._flush_output()File "/Users/wu/.pyenv/versions/ym-crawler-ccs/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 1336, in _flush_outputself._connection._flush_output(lambda: self.is_closed, *waiters)File "/Users/wu/.pyenv/versions/ym-crawler-ccs/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 522, in _flush_outputraise self._closed_result.value.error
pika.exceptions.StreamLostError: Stream connection lost: IndexError('pop from an empty deque')

示例二:线程外创建 connection 和 channel

# -*- coding: utf-8 -*-
# @Time   : 2020/11/5 18:24
# @Author : wuimport pika
import threadingconnection = pika.BlockingConnection(pika.URLParameters('amqp://guest:guest@localhost:5672'))
channel = connection.channel()def loop1():channel.basic_publish(exchange='', routing_key='tasks', body='Hello World!')print(1)def loop2():channel.basic_publish(exchange='', routing_key='tasks', body='Hello World!')print(2)threading.Thread(target=loop1, name='LoopThread').start()
threading.Thread(target=loop2, name='LoopThread').start()

输出

Exception in thread LoopThread:
Traceback (most recent call last):File "/Users/wu/.pyenv/versions/3.7.5/lib/python3.7/threading.py", line 926, in _bootstrap_innerself.run()File "/Users/wu/.pyenv/versions/3.7.5/lib/python3.7/threading.py", line 870, in runself._target(*self._args, **self._kwargs)File "/Users/wu/Work/youmi/projects/ym-crawler-ccs/dataparser/test12.py", line 13, in loop1channel.basic_publish(exchange='', routing_key='tasks', body='Hello World!')File "/Users/wu/.pyenv/versions/ym-crawler-ccs/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 2248, in basic_publishself._flush_output()File "/Users/wu/.pyenv/versions/ym-crawler-ccs/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 1336, in _flush_outputself._connection._flush_output(lambda: self.is_closed, *waiters)File "/Users/wu/.pyenv/versions/ym-crawler-ccs/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 522, in _flush_outputraise self._closed_result.value.error
pika.exceptions.StreamLostError: Stream connection lost: AssertionError(('_AsyncTransportBase._produce() tx buffer size underflow', -24, 1))

示例三:线程里创建 connection 和 channel

# -*- coding: utf-8 -*-
# @Time   : 2020/11/5 18:24
# @Author : wuimport pika
import threadingdef loop1():connection = pika.BlockingConnection(pika.URLParameters('amqp://guest:guest@localhost:5672'))channel = connection.channel()channel.basic_publish(exchange='', routing_key='tasks', body='Hello World!')print(1)def loop2():connection = pika.BlockingConnection(pika.URLParameters('amqp://guest:guest@localhost:5672'))channel = connection.channel()channel.basic_publish(exchange='', routing_key='tasks', body='Hello World!')print(2)threading.Thread(target=loop1, name='LoopThread').start()
threading.Thread(target=loop2, name='LoopThread').start()

输出

2
1

可见,在线程里创建 connection 和 channel 是正常的,且线程执行顺序不一定。但是有个问题,我们不可能在每个线程里都创建一次 connection 和 channel ,这样其实是会浪费 cpu 的

示例四:线程加锁

# -*- coding: utf-8 -*-
# @Time   : 2020/11/5 18:24
# @Author : wuimport pika
import threading
import timelock = threading.Lock()connection = pika.BlockingConnection(pika.URLParameters('amqp://guest:guest@localhost:5672'))
channel = connection.channel()def loop1():with lock:channel.basic_publish(exchange='', routing_key='tasks', body='Hello World!')print(1)time.sleep(2)def loop2():with lock:channel.basic_publish(exchange='', routing_key='tasks', body='Hello World!')print(2)threading.Thread(target=loop1, name='LoopThread').start()
threading.Thread(target=loop2, name='LoopThread').start()

输出

1
2

加锁之后,代码也是正常执行的,但是有个问题,加锁后,也起不到线程的所用了,且很可以明显看到, loop2 一定是在 loop1 之后执行的,且会受到 loop1 的阻塞影响。因为用了锁之后,loop1 函数的 with lock 下的全部逻辑可以看成是一个原子,整个原子被锁住了。与 python 多线程的 GIL 不一样,python 的 GIL 可能是在某个地方锁住的,这里的是在哪块逻辑下加锁,哪块就会被锁。

很明显,这种加锁起不到多线程的作用,也不是我们要的。

示例五:使用线程局部变量

关于线程局部变量:使用 threading 模块中的 local() 函数,可以为各个线程创建完全属于它们自己的变量(又称线程局部变量)。正是由于各个线程操作的是属于自己的变量,该资源属于各个线程的私有资源,因此可以从根本上杜绝发生数据同步问题。

使用线程池 + 线程局部变量

# -*- coding: utf-8 -*-
# @Time   : 2020/11/5 15:37
# @Author : wu
import pika
import threading
from concurrent.futures import ThreadPoolExecutorlocal = threading.local()def init():c = pika.BlockingConnection(pika.URLParameters('amqp://guest:guest@localhost:5672'))channel = c.channel()return channeldef loop1(n):if not hasattr(local, 'channel'):channel = init()thread_id = threading.currentThread().identprint(f'线程:{thread_id} 创建 channel')local.channel = channellocal.channel.basic_publish(exchange='', routing_key='tasks', body='Hello World!')print(n, end='\n')with ThreadPoolExecutor(max_workers=5) as t:for i in range(10):t.submit(loop1, i)

输出

线程:123145590636544 创建 channel
2
5
6
7
8
9
线程:123145580126208 创建 channel
线程:123145574871040 创建 channel
1
0
线程:123145611657216 创建 channel
线程:123145601146880 创建 channel
4
3

可以看到,由于使用了线程池,且设置 max_workers=5,所以最多会有5个线程。而每个线程因为都有自己的局部变量锁,互不影响,因此需要分别创建5个 pika 的 channel 通道,但这样带来的好处就是,channel 的创建次数只会跟 max_workers 一致,因为在线程池中,一个线程执行任务后,会继续执行其他任务,还是同一个线程,而局部变量中已经存储了 channel这个值,因此可重复使用。

更重要的是,由于每个线程,都是自己创建的channel,互补影响,因此是安全的。这就达到了我们,既想线程安全,又不想每次都频繁创建 connection 和 channel,我们需要做的只是,控制好线程池的数量即可。

示例六: 使用 deferToThread

# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
# @Time   : 2020/11/5 15:37
# @Author : wu
import pika
import threadingfrom twisted.internet import reactor
from twisted.internet.threads import deferToThreadlocal = threading.local()def init():c = pika.BlockingConnection(pika.URLParameters('amqp://guest:guest@localhost:5672'))channel = c.channel()return channeldef loop1(n):thread_id = threading.currentThread().identif not hasattr(local, 'channel'):channel = init()print(f'线程:{thread_id} 创建 channel')local.channel = channellocal.channel.basic_publish(exchange='', routing_key='tasks', body='Hello World!')print(n, end='\n')return f'线程:{thread_id}发送成功'def pprint(res):print(res)def run():for i in range(50):d = deferToThread(loop1, i)d.addCallback(pprint)if __name__ == '__main__':run()reactor.run()

这个示例与上一示例基本一致,只是将 concurrent.futures.ThreadPoolExecutor 换成了 twisted 的 reactor 和 deferToThread

这种方式,可创建一个 reactor 的环,通过将 deferToThread 的实例加入 reactor 去执行,然后成功后回调结果,这也是异步的一种方式。

总结

  • Pika 并不是线程安全的,应该在每个线程里,都使用各种的 BlockingConnection
  • 为了避免每次都创建 connection,在多线程中,最好是使用线程池+ threading.local() 结合使用,线程池可以避免线程的频繁创建,threading.local()避免了pika connection 的频繁创建。参考实例五

来源:http://www.jayden5.cn/2020/11/24/pika-%E7%BA%BF%E7%A8%8B%E4%B8%8D%E5%AE%89%E5%85%A8/

[1005]pika 线程不安全相关推荐

  1. 多线程编程指南 part 2

    多线程编程指南 Sun Microsystems, Inc. 4150 Network Circle Santa Clara, CA95054 U.S.A. 文件号码819–7051–10 2006 ...

  2. pika主从同步原理

    pika主从同步 主要为了分析探索一下pika是如何实现主从同步的,pika的主从同步的原理与redis的同步方案还不相同,本文主要是为了分析其主从同步的相关流程(pika基于3.4版本). pika ...

  3. amqp协议与pika库浅析

    AMQP协议 简介 高级消息队列协议使得遵从该规范的客户端应用和消息中间件服务器的全功能互操作成为可能. 为了完全实现消息中间件的互操作性,需要充分定义网络协议和消息代理服务的功能语义. 一套确定的消 ...

  4. Linux学习之系统编程篇:练习验证线程共享全局变量

    #include <pthread.h> #include <stdio.h> #include <unistd.h> int var = 1001; void * ...

  5. RabbitMQ/pika模块

    简介 MessageQueue用于解决跨进程.跨线程.跨应用.跨网络的通信问题. RabbitMQ使用erlang开发,在windows上使用时要先安装erlang. 官方的示例比较容易理解,可以点这 ...

  6. pika集群水平扩展——让性能容量不再受限

    女主宣言 Pika是一个可持久化的大容量redis存储服务,兼容string.hash.list.zset.set的绝大部分接口(兼容详情),解决redis由于存储数据量巨大而导致内存不够用的容量瓶颈 ...

  7. Java多线程系列--【JUC线程池 02】- 线程池原理(一)

    参考:http://www.cnblogs.com/skywang12345/p/java_threads_category.html 概要 在前面一章"Java多线程系列--"J ...

  8. Pika使用入门(一)【介绍】

    介绍 Pika是AMQP 0-9-1协议的纯Python实现,试图保持与底层网络支持库相当独立. 支持Python 2.6+和3.3+.由于线程不适合每一种情况,它不需要线程. 也不要禁止他们. gr ...

  9. python 使用pika对接rabbitMQ

    1.简易阐述原理 原则上,消息,只能有交换机传到队列,就像我们家里面的交换机道理一样. 有多个设备连接到交换机,那么,这个交换机把消息发给那个设备呢,就是根据交换机的类型来定.类型有:direct\t ...

最新文章

  1. vue项目打包之后原本好的样式变得不好了的原因分析
  2. Java互联网架构-京东国美高并发核心技术“秒杀”
  3. 接口管理平台DOClever5.2.0 发布,大幅增强自动化测试,支持可视化UI
  4. SpringBoot测试时出现Whitelabel Error Page
  5. python3基础:字符串、文本文件
  6. 【2017年第4期】大数据平台的基础能力和性能测试
  7. 【英语学习】【WOTD】cubit 释义/词源/示例
  8. qlv视频转换器免费版_迅捷视频转换器无法转换腾讯视频怎么办?亲测操作快速转换...
  9. oracle undoautotune,Oracle 隐藏参数:_undo_autotune、一个吃力不讨好的活
  10. 每日一句20191229
  11. 安装卸载Windows服务方法(2种方法)
  12. python程序设计pdf机械出版_Python程序设计
  13. 开源电子海图和webGIS
  14. 学习微信开发公众号的第一天(根据文字自动回复文字)
  15. 使用Excel生成符合正态分布的随机数
  16. Linux服务器监控性能测试
  17. 什么是数据库分组查询(详解)
  18. Equalize the Array(思维)
  19. php 中margin-top,margin-top是什么意思-css编程词典-php中文网
  20. C语言:围圈报数游戏

热门文章

  1. SEO人员快速提升关键词优化排名的方法
  2. 启明欣欣STM32开发板 --- 运行LWIP (无RTOS)
  3. 不要随便设置随机种子
  4. 《UNIX网络编程》第一步:编写自己的daytime客户端,并从daytime服务器获取时间
  5. 分享电脑日常使用的小技巧
  6. 医学图像处理——DeepDrr工具CT生成DRR
  7. apple pay充游戏后退款_2019,7月苹果王者荣耀退款
  8. win10系统如何看服务器地址,win10查看电脑DNS服务器地址具体步骤
  9. 销量惨淡,广告费ACOS飙升
  10. 6-1 判断顺序表是否有序(Java语言描述 ) (15 分)