一、RabbitMQ

python的Queue与RabbitMQ之间的理解:

python的进程或线程Queue只能python自己用。RabbitMQ队列多个应用之间共享队列,互相通信。

1、简单的实现生产者与消费者

  生产者

  (1)建立socket连接;(2)声明一个管道;(3)声明队列(queue);(4)通过管道发消息;(5)routing_key(queue名字);(6)body(内容)

  消费者

  (1)建立连接;(2)声明管道;(3)声明队列;(4)消费者声明队列(防止生产者后启动,消费者报错);(5)消费消息;(6)callback如果收到消息就调用函数处理消息 queue队列名字;

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : Willpower-chen
# @blog: http://www.cnblogs.com/willpower-chen/import pika
#建立socket连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
#声明一个管道
channel = connection.channel()
#声明一个队列
channel.queue_declare(queue='hello')
#通过管道发消息,routing_key 队列queue名字 ,body发送内容
channel.basic_publish(exchange='',routing_key='hello',body='Hello World! 1 2')
print("[x] send 'Hello World! 1 2 '")
connection.close()

producer

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : Willpower-chen
# @blog: http://www.cnblogs.com/willpower-chen/import pika,time
#建立连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
#声明一个管道
channel = connection.channel()
#声明队列,防止生产者(发送端)没开启,消费者端报错
channel.queue_declare(queue='hello')
#ch管道的内存对象地址,如果收到消息就调用函数callback,处理消息
def callbak(ch,method,properties,body):print("[x] Received %r " % body)# time.sleep(30)
#消费消息
channel.basic_consume(callbak,queue='hello',no_ack=True #消息有没处理,都不给生产者发确认消息
                      )
print('[*] Waitting for messages TO exit press ctrl+c')
channel.start_consuming() #开始

consumer

2、消费者对生产者,可以1对多,而且默认是轮询机制

no_ack=True如果注释掉的话,消费者端不给服务器端确认收到消息,服务器端就不会把要发的消息从队列里清除

如下图注释了no_ack,加了一个时间,

开启三个消费者,一个生产者,生产者只send一次数据,挨个停止consumer,会发现同一条消息会被重新发给下一个consumer,直到producer收到consumer的确认收到的消息

3、队列查询

清除队列消息

4、消息持久化

(1)durable只是队列持久化

channel.queue_declare(queue='hello',durable=True)

生产者和消费者都需要添加durable=True

(2)要实现消息持久化,还需要

5、消息(1对多)实现权重功能

消费者端添加在消费消息之前

channel.basic_qos(prefetch_count=1)

6、广播消息fanout(纯广播)订阅发布

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : Willpower-chen
# @blog: http://www.cnblogs.com/willpower-chen/import pika
import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()channel.exchange_declare(exchange='logs',type='fanout')
#message = ' '.join(sys.argv[1:]) or "info: Hello World!"
message = "info: Hello World!2"channel.basic_publish(exchange='logs',routing_key='',body=message)
print(" [x] Sent %r" % message)connection.close()

fanout_producer

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : Willpower-chen
# @blog: http://www.cnblogs.com/willpower-chen/
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()channel.exchange_declare(exchange='logs',type='fanout')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
print("random queuename",queue_name)channel.queue_bind(exchange='logs',queue=queue_name)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):print(" [x] %r" % body)channel.basic_consume(callback,queue=queue_name,no_ack=True)channel.start_consuming()

fanout_consumer

7、direct广播模式(有选择性的发送接收消息)

import pika
import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()channel.exchange_declare(exchange='direct_logs',type='direct')severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

direct_producer

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : Willpower-chen
# @blog: http://www.cnblogs.com/willpower-chen/
import pika
import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()channel.exchange_declare(exchange='direct_logs',type='direct')result = channel.queue_declare(exclusive=True)
queue_name = result.method.queueseverities = sys.argv[1:]
if not severities:sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])sys.exit(1)for severity in severities:channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=severity)print(severities)
print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):print(" [x] %r:%r" % (method.routing_key, body))channel.basic_consume(callback,queue=queue_name,no_ack=True)channel.start_consuming()

direct_consumer

8、更细致的消息判断 type = topic

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : Willpower-chen
# @blog: http://www.cnblogs.com/willpower-chen/import pika
import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()channel.exchange_declare(exchange='topic_logs',type='topic')routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',routing_key=routing_key,body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

topic_producer

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : Willpower-chen
# @blog: http://www.cnblogs.com/willpower-chen/import pika
import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()channel.exchange_declare(exchange='topic_logs',type='topic')result = channel.queue_declare(exclusive=True)
queue_name = result.method.queuebinding_keys = sys.argv[1:]
if not binding_keys:sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])sys.exit(1)for binding_key in binding_keys:channel.queue_bind(exchange='topic_logs',queue=queue_name,routing_key=binding_key)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):print(" [x] %r:%r" % (method.routing_key, body))channel.basic_consume(callback,queue=queue_name,no_ack=True)channel.start_consuming()

topic_consumer

 
 

转载于:https://www.cnblogs.com/willpower-chen/p/5977633.html

python运维开发之第十一天(RabbitMQ,redis)相关推荐

  1. Python运维开发基础01-语法基础【转】

    开篇导语 整个Python运维开发教学采用的是最新的3.5.2版,当遇到2.x和3.x版本的不同点时,会采取演示的方式,让同学们了解. 教学预计分为四大部分,Python开发基础,Python开发进阶 ...

  2. day01.介绍python运维开发

    第1节:介绍python运维开发 课程的开场白: 学完次课程可以开发出高效的自动化软件.运维监控.聊天软件.网站等内容. 这个运维开发跟实际上的开发是有区别的,区别在我们是实现功能,但是不能向开发那样 ...

  3. Python运维开发工程师养成记(循环语句)

    图示 循环语句类型 while循环 for循环 嵌套循环 循环控制语句 break语句:在语句块执行过程中终止循环,并且跳出整个循环 continue语句:在语句块执行过程中终止当前循环,跳出该次循环 ...

  4. 阅后即焚,Python 运维开发99速成

    2019独角兽企业重金招聘Python工程师标准>>> -欢迎大家订阅微信公众号:Python从程序猿到程序员 导读 本文篇幅较长,请收藏并耐心阅读 首先请读者原谅这个文章标题有些唬 ...

  5. Python运维开发基础09-函数基础【转】

    上节作业回顾 #!/usr/bin/env python3 # -*- coding:utf-8 -*- # author:Mr.chen # 实现简单的shell命令sed的替换功能import s ...

  6. Python运维开发基础10-函数基础【转】

    一,函数的非固定参数 1.1 默认参数 在定义形参的时候,提前给形参赋一个固定的值. #代码演示: def test(x,y=2): #形参里有一个默认参数 print (x) print (y) t ...

  7. python运维开发培训_运维架构师-Python 自动化运维开发-014

    运维架构师-Python 自动化运维开发-014 九.标准数据类型 1.为什么编程语言中要有类型 类型有以下几个重要角色:对机器而言,类型描述了内存中的电荷是怎么解释的. 对编译器或者解释器而言,类型 ...

  8. python运维开发招聘_GitHub - PlutoaCharon/LiunxNotes: 校招-运维开发(Liunx,Python,Golang)面试学习笔记...

    校招-运维开发(Liunx,Python,Golang)面试学习笔记 1. 网络基础类 2. Linux系统管理类 3. Linux服务管理类 4. 数据库管理 ​ 索引(包括分类及优化方式,失效条件 ...

  9. python运维开发工程师_运维开发工程师的工作职责精选

    运维开发工程师需要负责优化.改进运维支撑系统,并保证其安全高效稳定的运行.下面是学习啦小编为您精心整理的运维开发工程师的工作职责精选. 运维开发工程师的工作职责精选1 职责: 1. 负责主导运维平台的 ...

最新文章

  1. Oracle 索引扫描的五种类型
  2. Java黑皮书课后题第11章:11.1(Triangle类)设计一个名为Triangle的类来继承GeometricObject类。该类包括:
  3. ASP.NET Core 双因素验证2FA 实战经验分享
  4. python xpath定位不到_Python+Selenium定位不到元素常见原因及解决办法(报:NoSuchElementException)...
  5. MacOS下IDEA设置智能提示不区分大小写
  6. 一些关于Spring的随笔
  7. (转载)7个去伪存真的JavaScript面试题
  8. 服务器自动post,Go Web服务器自动重定向POST请求
  9. 为informix数据库中的表创建同义词
  10. php错误日志分析_php错误日志
  11. 现学活用的XPath爬取豆瓣音乐
  12. mysql 联合查询_MySQL联合查询
  13. 极验打码平台官网地址
  14. 【黄啊码】微信小程序+php实现即时通讯聊天功能
  15. IOS开发之UI进阶(安全区高度)
  16. 赋能农业生态链,打造“云端青柚”UZ新概念
  17. 码元、码元速率、波特率、比特率理解
  18. rgb文件格式的文件读取与转换
  19. 图像形成(5)球面透视投影和近似相机模型
  20. Luminati代理动态IP,海量资源可调用!

热门文章

  1. nodejs 实践:express 最佳实践(五) connect解析
  2. Cocos2d 之FlyBird开发---GameData类
  3. java二维码生成与解析代码实现
  4. CentOS 上安装MYSQL+Apache+PHP
  5. Mysql:is not allowed to connect to this MySQL server
  6. Spring+hibernate里使用jdbc connection
  7. MxGraph从入门到精通之3:设置图形样式
  8. oracle 自动补全函数,Oracle自我补充之trunc()函数的使用方法
  9. DPDK 初识DPDK(十五)
  10. 以太网实习_一位工科男在拿到华为实习生offer后的面经干货