消息队列

消息队列是在消息的传输过程中保存消息的容器

消息队列最经典的用法就是消费者 和生产者之间通过消息管道传递消息,消费者和生成者是不同的进程。生产者往管道写消息,消费者从管道中读消息

操作系统提供了很多机制来实现进程间的通信,multiprocessing模块提供了Queue和Pipe两种方法来实现

一、使用multiprocessing里面的Queue来实现消息队列

q = Queue()

q.put(data)  #生产消息

data = q.get() #消费消息

例子:

from multiprocessing import Queue, Processdef write(q):for i in ["a","b","c","d"]:q.put(i)print("put {0} to queue".format(i))def read(q):while 1:result = q.get()print("get {0} from queue".format(result))def main():q = Queue()  #定义一个消息队列容器pw = Process(target=write,args=(q,)) #定义一个写的进程pr = Process(target=read,args=(q,))  #定义一个读的进程pw.start()   #启动进程pr.start()pw.join()    pr.terminate()
if __name__ == "__main__":main()

运行结果:

put a to queue

put b to queueget a from queue

get b from queue

put c to queue

put d to queue

get c from queue

get d from queue

二、通过Multiprocessing里面的Pipe来实现消息队列

1)Pipe方法返回(conn1,conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplux参数为True(默认值),那么这个管道是全双工模式,即conn1和conn2均可收发。duplux为False,conn1负责接收消息,conn2负责发行消息

2)send和recv方法分别是发送和接收消息的方法。close方法表示关闭管道,当消息接收结束以后,关闭管道。

例子:

from multiprocessing import Process,Pipe
import timedef proc1(pipe):for i in xrange(1,10):pipe.send(i)time.sleep(3)print("send {0} to pipe".format(i))def proc2(pipe):n = 9while n>0:result = pipe.recv()time.sleep(3)print("recv {0} from pipe".format(result))n -= 1if __name__ == "__main__":pipe = Pipe(duplex=False)  #定义并实例化一个管道print(type(pipe))p1 = Process(target=proc1,args=(pipe[1],))   #pipe[1],管道的右边,表示进入端,发送数据p2 = Process(target=proc2,args=(pipe[0],))   #pipe[0],管道的左边,表示出口端,接收数据p1.start()p2.start()p1.join()p2.join()pipe[0].close()pipe[1].close()

运行结果:

<type 'tuple'>

send 1 to pipe

recv 1 from pipe

send 2 to pipe

recv 2 from pipe

recv 3 from pipe

send 3 to pipe

send 4 to piperecv 4 from pipe

send 5 to pipe

recv 5 from pipe

recv 6 from pipe

send 6 to pipe

send 7 to pipe

recv 7 from pipe

send 8 to pipe

recv 8 from pipe

recv 9 from pipesend 9 to pipe

三、Queue模块

python提供了Queue模块来专门实现消息队列:

Queue对象实现一个fifo队列(其他的还有lifo、priority队列)。queue只有gsize一个构造函数,用来指定队列容量,指定为0的时候代表容量无限。主要有以下成员函数:

Queue.gsize():返回消息队列的当前空间。返回的值不一定可靠。

Queue.empty():判断消息队列是否为空,返回True或者False。同样不可靠

Queue.full():判断消息是否满

Queue.put(item,block=True,timeout=None):往消息队列中存放数据。block可以控制是否阻塞,timeout控制阻塞时候的等待时间。如果不阻塞或者超时,会引起一个full exception。

Queue.put_nowait(item):相当于put(item,False)

Queue.get(block=True,timeout=None):获取一个消息,其他等同put

以下两个函数用来判断消息对应的任务是否完成:

Queue.task_done():接收消息的线程通过调用这个函来说明消息对应的任务已完成

Queue.join():实际上意味着等到队列为空,再执行别的操作

例子:

from multiprocessing import Process, Pipe, Queue
import time
from threading import Threadclass Proceduer(Thread):def __init__(self,queue):super(Proceduer,self).__init__() # 超类self.queue = queue   #将queue赋给self.queue,便于类中其他函数调用def run(self):try:for i in xrange(1,10):print("put data is: {0} to queue".format(i))self.queue.put(i)except Exception as e:print("put data error")raise eclass Consumer_odd(Thread):def __init__(self,queue):super(Consumer_odd, self).__init__()self.queue = queuedef run(self):try:while self.queue.empty:   #判断消息队列是否为空number = self.queue.get()  #取到消息值if number%2 != 0:print("get {0} from queue ODD".format(number))else:self.queue.put(number)  #将信息放回队列中time.sleep(1)except Exception as e:raise eclass Consumer_even(Thread):def __init__(self,queue):super(Consumer_even,self).__init__()self.queue = queuedef run(self):try:while self.queue.empty:number = self.queue.get()if number%2 == 0:print("get {0} from queue Even,thread name is :{1}".format(number,self.getName()))else:self.queue.put(number)time.sleep(1)except Exception as e:raise edef main():queue = Queue()  #实例化一个消息队列p = Proceduer(queue=queue)  #消息队列作为参数赋值给生产者函数,并实例化p.start()   #启动一个带消息队列的函数p.join()    #等待结束time.sleep(1)c1 = Consumer_odd(queue=queue)   #消息队列作为参数赋值给消费者函数,并实例化c2 = Consumer_even(queue=queue)    #消息队列作为参数赋值给消费者函数,并实例化c1.start()c2.start()c1.join()c2.join()print("All threads terminate!")if __name__ == "__main__":main()

运行结果:

put data is: 1 to queue

put data is: 2 to queue

put data is: 3 to queue

put data is: 4 to queue

put data is: 5 to queue

put data is: 6 to queue

put data is: 7 to queue

put data is: 8 to queue

put data is: 9 to queue

get 1 from queue ODD

get 3 from queue ODD

get 4 from queue Even,thread name is :Thread-3

get 5 from queue ODD

get 7 from queue ODD

get 9 from queue ODD

get 2 from queue Even,thread name is :Thread-3

get 6 from queue Even,thread name is :Thread-3

get 8 from queue Even,thread name is :Thread-3

例子2:

import Queueq = Queue.Queue()for i in range(5):q.put(i)while not q.empty():print q.get()

运行结果:

0

1

2

3

4

转载于:https://blog.51cto.com/huangzp/2051897

python—多进程的消息队列相关推荐

  1. day41——多进程的消息队列、消息队列pipe

    多进程的消息队列 消息队列指的是消息在传输过程中保存消息的容器 消息队列最经典的用法是消费者和生产者之间通过消息管道来传递消息.消费者和和生产者是不同的进程,生产者往管道中写消息,消费者从管道中读消息 ...

  2. 基于Swoole和beanstalkd实现多进程处理消息队列。

    2019独角兽企业重金招聘Python工程师标准>>> 项目地址 SWBT框架 https://gitee.com/chenbotome/SWBT 目的 基于Swoole和beans ...

  3. 阿里云消息队列python_41. Python Queue 多进程的消息队列 PIPE

    消息队列: 消息队列是在消息传输过程中保存消息的容器. 消息队列最经典的用法就是消费者和生产者之间通过消息管道来传递消息,消费者和生产生是不通的进程.生产者往管道中写消息,消费者从管道中读消息. 相当 ...

  4. Python通过amqp消息队列协议中的Qpid实现数据通信

    简介: 这两天看了消息队列通信,打算在配置平台上应用起来.以前用过zeromq但是这东西太快了,还有就是rabbitmq有点大,新浪的朋友推荐了qpid,简单轻便.自己总结了下文档,大家可以瞅瞅. A ...

  5. python 消息中间件_消息队列中间件 RabbitMQ 详细介绍——安装与基本应用(Python)...

    RabbitMQ 是当前最流行的消息中间件(Message Broker)之一,支持多种消息协议(如 AMQP.MQTT). 同时它也是一个轻量级的非常易于部署的开源软件,可以运行在当前大多数操作系统 ...

  6. python (高级消息队列)普通、进程、进程池的消息队列

    一.普通消息队列 from queue import Queue  这个是普通的队列模式,类似于普通列表,先进先出模式,get方法会阻塞请求,直到有数据get出来为止. import Queueq = ...

  7. Python Redis Stream 消息队列 消费组

    项目有用到消息队列来消费不断新增的任务,本来看到Redis有Pub Sub就没准备用kafka了,后来看了下Redis 5.0新加的Stream,感觉刚好符合项目要求,看下文档就直接用上了,类似一个简 ...

  8. Celery + Flower + FastAPI + RabbitMQ ,Python实现异步消息队列和监控

    @Author:Runsen 消息队列 消息队列让应用程序在用户请求之外异步执行称为任务的工作.如果应用程序需要在后台执行工作,它会将任务添加到任务队列中.这些任务稍后由工作服务执行. Celery ...

  9. php 多进程 消息队列,[PHP] 多进程通信-消息队列使用

    向消息队列发送数据和获取数据的测试 $key=ftok(__file__,'a'); //获取消息队列 $queue=msg_get_queue($key,0666); //发送消息 //msg_se ...

  10. Python之kafka消息队列操作入门

    1 kafka简介 1.1 什么是kafka kafka是一个分布式.高吞吐量.高扩展性的消息队列系统.kafka最初是由Linkedin公司开发的,后来在2010年贡献给了Apache基金会,成为了 ...

最新文章

  1. SoapUI实践:自动化测试、压力测试、持续集成
  2. linux 多线程条件变量,linux多线程之条件变量
  3. 手机端viewport的设置规范
  4. ubuntu20.04安装mysql教程
  5. Angularjs $http.post
  6. decode函数_Python 内置函数总一
  7. STM32网络电路设计
  8. mysql 5.6 json查询_mysql5.6及以下版本如何查询数据库里的json
  9. 远程连接Ubuntu服务器
  10. 机器学习实战 --- sklearn
  11. java读取和写入txt_Java读取和写入txt文件
  12. CPI、PPI、PMI
  13. 迪文屏幕T5UID3平台学习笔记零:迪文屏幕的学习和开发
  14. 使用Jimi处理图像
  15. k8s Container资源控制: requests和limits
  16. android真机测试什么不同,android真机测试闪退
  17. Python os.symlink创建软链接
  18. VMware ESXi 扩容后提示“无法打开虚拟机的电源,请确认该虚拟磁盘是适用“厚”选项创建的”等信息,执行VMDK 格式是 zeroedthick 还是 eagerzeroedthick
  19. Atcoder TOYOTA SYSTEMS Programming Contest 2021(AtCoder Beginner Contest 228) B - Takahashi‘s Secret
  20. 25k~50k,比特大陆招人啦!这次会是你吗?

热门文章

  1. ORA-30377 MV_CAPABILITIES_TABLE not found
  2. Django中QuerySet的结果是否为空的判断
  3. An App ID with Identifier 'com.XXX.XXX’ is not available. Please enter a different string.报错
  4. Android -- Layout布局文件里的android:layout_height等属性为什么会不起作用?
  5. 吧唧下工作的事情 - 记用canvas封装的小小控件
  6. MySQL 存储过程参数IN OUT INOUT区别
  7. chrome保护眼睛设置【转】
  8. React-Native集成dva.js
  9. Linux或Linux虚拟机桥接模式使用Python2认证Drcom
  10. VC中Radio控件的用法