简介

RabbitMQ,消息队列,那它跟我们之前的学习的python的线程queue和进程的queue有什么区别呢?其实他们干的事情都是一样的。先来说说我们之前学习的python的queue吧。
1、线程queue:只是用于多个线程之间,进行数据同步交互的。
2、进程queue:只是用户父进程与子进程进行交互,或者属于同一父进程下的多个子进程进行交互。
如果是两个独立的程序,即便是python 程序,两个完全独立的python程序也依然是不用这个python的这个线程或者进程queue来通信的。
那么问题来了,现在两个独立的python程序,或者python跟Java程序,或者跟PHP程序,或者两台独立机器之间的也涉及到生产者消费者模型,这个时候用python的线程queue和进程queue就通信不了了。那怎么办呢?这个时候我们只能搞一个中间代理,这个中间代理就是RabbitMQ。

当然类似于RabbitMQ还有很多,比如:ZeroMQ、ActiveMQ…只不过现在RabbitMQ用的最多,也是最火的一个。


基本通信实例


send端(producer):
建立socket->声明管道->声明queue->通过一个exchange发送内容至queue->关闭连接

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#Author:liyananimport pika#通过这个实例建立一个socketconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
#声明一个管道
channel = connection.channel()
#声明queue
channel.queue_declare(queue='hello1')#这个queue名字叫做hello1
channel.basic_publish(exchange='',routing_key='hello1',#queue的名字body='hello world')#body是发送的内容
print('[x]send hello word')#关闭连接
connection.close()

receive端(consumers)
创建socket连接->创建管道->声明queue->创建回调函数callback->消费的消息->开启消费

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#Author:liyanan##消费者有可能在其他机器上
import pika#建立一个socket连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
#创建一个管道
channel = connection.channel()channel.queue_declare(queue='hello1')def callback(ch,method,properites,body):print('--->',ch,method,properites)print(" [x] Received %r" % body)channel.basic_consume(callback,#如果收到消息,就调用callback函数处理消息queue='hello1',#queue的名字no_ack=True,
)print(' [*] Waiting for messages. To exit press CTRL+C')
#这个start只要一启动,就一直运行,它不止收一条,而是永远收下去,没有消息就在这边卡住
channel.start_consuming()

输出:

[*] Waiting for messages. To exit press CTRL+C
---> <BlockingChannel impl=<Channel number=1 OPEN conn=<SelectConnection OPEN socket=('::1', 53226, 0, 0)->('::1', 5672, 0, 0) params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>> <Basic.Deliver(['consumer_tag=ctag1.db63a03363a54cbca1a4c02537add6b1', 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=hello1'])> <BasicProperties>
[x] Received b'hello world'

从上面的输出可以看的出callback中的ch,method,properites分别是:
ch:是send端管道的内存对象的地址
method:指的send端的是发给谁,发给哪个Q的一些信息,一般不怎么用
properites:send端的属性,这边至的send端发过来给recive端的属性
body:是send端发过来的消息


远程配置rabbitmq
1、创建自己的账号
首先在RabbitMQ server上创建一个账号 sudo rabbitmqctl add_user 用户名 密码

sudo rabbitmqctl  add_user liyanan 123

2、配置权限
需要配置权限,允许从外面访问

sudo rabbitmqctl set_permissions -p / liyanan ".*" ".*" ".*"

3、客户端连接的时候需要配置认证参数

#认证信息
credentials = pika.PlainCredentials('alex', 'alex3714')
#连接信息
connection = pika.BlockingConnection(pika.ConnectionParameters('10.211.55.5',5672,'/',credentials))
channel = connection.channel()

小结:
1、RabbitMQ的默认端口是15672,在python中使用的模块是pika。
2、consumers中如果不声明queue的话,则如果consumers先启动,则会报错。如果是producer先启动,consumers后启动则不报错。但是如果说consumer声明了,consumer先启动就不会报错。如果是producers先启动的话,则忽略。
3、所有的socket传输都是bytes类型。
4、消费者和生产者不一定在同一台机器上,在其他机器上运行也是可以的。
5、consumers启动以后会一直运行下去,它会永远的收下去。producers可以运行多次,只要运行一次,consumers就会接收一次。


RabbitMQ消息分发轮询

之前演示了,如何通过rabbitmq实现消息队列的,而且是一个生产者对应一个消费者,那我现在想一个生产者对应多个消费者,又会有什么样的情况发生呢?

分发轮询实验

前提条件:1个生产者 —-> 多个消费者,且no_ack=True #no_ack—>no acknowledgement—>不确定
①初始化状态:3个消费者都在等待生产者发消息

②生产者发第1条消息:只有第1个消费者受到消息,第2个和第3个消费者没有收到消息



③生产者发第2条消息:只有第2个消费者受到消息,第1个和第3个没有收到的消息



④生产者发第3条消息:只有第3个收到消息,第1个和第2个没有收到消息。


总结:一个生产者对应多个消费者是采用轮询机制,公平的依次发给每一个消费者,每个消费者消费1个。


消息分发过程中突然中断

现在说,我这个消费者受到这个消息之后,就立刻打印了,假如说这个消费者收到这个消息之后,需要处理这个消息花费30s时间,在处理这个30s的过程中,消费者断电了,也就是down及了,那这个任务只处理了一半,还没有处理完毕。
遇到这种情况该如何解决?
这种情况就需要去怎么样定义的你的业务逻辑了。如果你认为这个任务必须要处理完,那就是应该这个消费者把这个任务处理完了,给生产者发一个确认。也就是说代表我处理完了。处理完了之后,生产者会把这个消息从这个消息队列中删除,就代表着这个任务确实处理完了,所以生产者一定等消费者把这个任务处理完了之后,告诉生产者,说处理完了,生产者才会把这个消息从队列中删除。
回到刚才那个问题,如果没有处理完,处理了一半,down机了,它就没有办法告诉生产者,所以在这个时候生产者应该如何解决呐?
生产者说,只要你没有给我回复确认,就代表消息没有处理完,也不用设置超时时间,你只要没有确认,就代表消息没有处理完。也就是callback函数处理完了,就代表消息处理完了,callback没有处理完,就是消息没有处理完。
1、no_ack参数
no_ack 意思是 no acknowlargement 不确定的意思,如果这个no_ack=True 表示你服务器不care也不关心这个消息是否处理完了,但是如果你关心的话,我们就不加。一般情况下,我们是不加的,rabbitmq 默认就是说消息处理完了,自动确认。

def callback(ch,method,properites,body):#print('--->',ch,method,properites)print('---',method)time.sleep(30)#模拟callback处理这个消息需要花30s的时间print(" [x] Received %r" % body)

接收端注释掉no_ack,就是说只要客户端没有给它确认,RabbitMQ就不会把这个消息删掉,生产者那一端没有删除。
处理过程:
①初始化状态:3个消费者都在等生产者发消息

②生产者发消息:第1个消费者受到消息,第2个和第3个没有收到消息

③第1个消费者down机:第1个消费者down机,则消息则发送到第2个消费者那边去了。

④第2个消费者down机:第2个消费者down机,则消息则发送到第3个消费者那边去了。

总结:只要把这个no_ack=True注释掉,就能保证消息被完整的处理。它存在RabbitMQ的sever的那个队列里面。
逻辑图:

小结:
1、一个生产者对应多个消费者,生产者发送多次消息,是采用轮询的机制,公平的分给每一个消费者。
2、消费者代码中no_ack=True,一般情况下是不加的,保证了连接断开,消息就会转给下一个消费者。
3、RabbitMQ判断如果socket断了,就知道连接断了,消息就会转给下一个消费者。
4、消费者的启动顺序,代表着是第几个消费者。


RabbitMQ消息持久化

RabbitMQ手动确认

说明:RabbitMQ是手动确认是否处理完毕的,而不是自动处理的。不信我们看下面的实验。
①生产者连续发两条消息到队列中,用rabbitmqctl.bat list_queues命令查看当前RabbitMQ server中的队列以及消息个数

[root@liyanan_python ~]# rabbitmqctl list_queues
Listing queues ...
hello    1
hello1    2

②消费者开始消费

③这个时候再去看,shuaigaogao消息队列中的个数

[root@liyanan_python ~]# rabbitmqctl list_queues
Listing queues ...
hello    1
hello1    2

还是2个消息,说明客户端那边根本就没有发确认给服务端,所以客户端(生产者)这边必须手动给服务端(生产者)一个手动确认。
手动确认需要在客户端的消息callback函数中的消息处理结束后,手动确认,确认代码如下:

def callback(ch,method,properites,body):#print('--->',ch,method,properites)print('---',method)print(" [x] Received %r" % body)ch.basic_ack(delivery_tag=method.delivery_tag)

这个时候消费者消费,再看看效果:

[root@liyanan_python ~]# rabbitmqctl list_queues
Listing queues ...
hello    1
hello1    0

消息处理完毕,为什么需要手动确认一下呢?
因为你调用的这个callback函数之后,它可能处理完收到这个消息,它接下来要干很多事情,就是说跟这个消息没有关系了。就是函数没有处理完,但是消息已经处理完了,所以你要等到函数处理完可能要花2个消息,那服务器要等2个小时候之后才能收到,所以这个也是手动确认也会处理。


RabbitMQ消息持久化实验

队列里面还为消息等着客户端(消费者)去接收,但是这个时候服务器端down机了,这个时候消息是否还在?带着这个疑问,我们来做几个实验。
1、重启队列和消息丢失
说明:服务端发送消息->重启RabbitMQ
①发送3条消息

[root@liyanan_python ~]# rabbitmqctl list_queues
Listing queues ...
hello    1
hello1    3

②重启RabbitMQ

[root@liyanan_python ~]# service rabbitmq-server stop
Redirecting to /bin/systemctl stop rabbitmq-server.service
[root@liyanan_python ~]# service rabbitmq-server start
Redirecting to /bin/systemctl start rabbitmq-server.service
[root@liyanan_python ~]# rabbitmqctl list_queues
Listing queues ...

这个时候我的queue都没有了,都丢了,这种情况下消息都在内存里面,如果down了就down了,队列和消息都没有了
2、队列持久化
我们把队列进行持久化,就算重启我的RabbitMQ服务,我的队列也不会丢。
队列持久化,在服务端(生产者)声明queue的时候,需如下定义:

channel.queue_declare(queue='hello1',durable=True)#durable-->持久化

①服务端发3条消息

[root@liyanan_python ~]# rabbitmqctl list_queues
Listing queues ...
hello1    3

②重启服务器

[root@liyanan_python ~]# rabbitmqctl list_queues
Listing queues ...
hello1    0

这种情况的只把队列持久化了,队列自己持久化了,但是队列里面的消息还是没有了
3、队列和消息都持久化
现在我想队列和消息都持久化,那怎么办呢。
队列和消息都持久化,在服务端(生产者)声明queue的时候,需如下定义:

channel.queue_declare(queue='hello1',durable=True)#durable-->持久化
channel.basic_publish(exchange='',routing_key='hello1',#queue的名字body='hello world',properties=pika.BasicProperties(delivery_mode=2,)# make message persistent=>使消息持久化)#body是发送的内容

①服务端发3条消息

[root@liyanan_python ~]# rabbitmqctl list_queues
Listing queues ...
hello1    3

②重启服务器

[root@liyanan_python ~]# rabbitmqctl list_queues
Listing queues ...
hello1    3

如果想队列和消息都保证持久化,就必须是声明和发消息的时候,都需要声明持久化。
这边注意了,如果你在服务端声明持久化,在客户端也必须声明queue的时候也需要声明持久化,不然的话就会报错,声明持久化的方式和服务端一样:

channel.queue_declare(queue='hello1',durable=True)def callback(ch,method,properites,body):#print('--->',ch,method,properites)print('---',method)print(" [x] Received %r" % body)ch.basic_ack(delivery_tag=method.delivery_tag)

小结:
1、RabbitMQ在服务端没有声明队列queue持久化(durable=True)时,队列是存在内存中的,服务端down机了,队列也随之消失。
2、RabbitMQ在服务端只声明queue持久化,但是在发送消息时,没有声明持久化(properties=pika.BasicProperties(delivery_mode=2,)),服务器down机,只保留队列,但是不保留消息。
3、RabbitMQ在服务端声明queue持久化,并且在发消息时也声明了持久化,服务down机重启后,队列和消息都能得到保存。
4、服务端声明持久化,客户端想接受消息的话,必须也要声明queue时,也要声明持久化,不然的话,客户端执行会报错。


RabbitMQ fanout广播模式

消息公平分发

如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。

注意了,这种公平指的是你消费者有多大本事,就干多少活,你消费者处理的越慢,我就分发的少,你消费者处理的越多,处理的快,我就多发点消息。我server端给客户端发消息的时候,先检查一下,你现在还有多少消息,你如果处理的消息超过1条,我就不给你发了,就是你当前消息没有处理完毕,我就不给你发消息了,没有消息,我就给你发。

def callback(ch,method,properites,body):#print('--->',ch,method,properites)print('---',method)print(" [x] Received %r" % body)ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)#在消费之前加
channel.basic_consume(callback,#如果收到消息,就调用callback函数处理消息queue='hello1',#queue的名字no_ack=True,
)
fanout广播模式

之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了,Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息:
1、fanout:所有bind到此exchange的queue都可以接收消息(纯广播的,所有消费者都能收到消息)
2、direct:通过routingKey和exchange决定的那个唯一的queue可以接收消息
3、topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
4、headers:通过headers 来决定把消息发给哪些queue(这个很少用,一般情况下,我们用不到)

1、fanout广播模式
说明:fanout这种模式是所有绑定exchange的queue都可以接收到消息。exchange=>转换器
1.1生产者(fanout_publiser)
说明:跟之前写的不同,生产者这边并没有声明queue,因为生产者是以广播的形式,所以这边不需要声明queue

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#Author:liyananimport pika# 创建socket连接
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
# 创建管道
channel = connection.channel()# 声明exchange,且exchange的名字是logs,exchange的类型为fanout
channel.exchange_declare(exchange='logs', exchange_type="fanout")# 发送的消息
message = "info:hello world"# 生产一个消息
channel.basic_publish(exchange="logs",routing_key='',body=message
)
print("[X] Send {0}".format(message))# 关闭连接
connection.close()

注:这边的exchange的名字logs是随便起的
1.2消费者(fanout_consumer)
说明:消费者这边要声明一个唯一的queue_name的对象,并且从对象中获取queue名

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#Author:liyananimport pika# 创建一个socket
connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
# 创建一个管道
channel = connection.channel()
# 声明exchange,exchange的名字logs,类型是fanout广播模式
channel.exchange_declare(exchange="logs",exchange_type="fanout")
# 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除,result是queue的对象
result = channel.queue_declare(exclusive=True)  # exclusive=>排他的,唯一的
# 获取queue名
queue_name = result.method.queue
# 绑定exchange
channel.queue_bind(exchange="logs",queue=queue_name)print(' [*] Waiting for logs. To exit press CTRL+C')# 声明回调函数
def callback(ch, method, properties, body):"回调函数"print("[X] {0}".format(body))# 消费者消费
channel.basic_consume(callback,queue=queue_name,no_ack=True)
# 启动消费模式
channel.start_consuming()

1.3代码逻辑图

①服务端没有声明queue,为什么客户端要声明一个queue?
答:生产者发消息到exchange上,exchange就会遍历一遍,所有绑定它的哪些queue,然后把消息发到queue里面,它发了queue就不管了,消费者从queue里面去收,所以就收到广播了,而不是说exchange直接就把消息发给消费者,消费者只会从queue里去读消息,且拿着queue去绑定exchange。
②为什么queue要自动生成,而不是自己手动去写?
答:我这个queue只是为了收广播的,所以如果我消费者不收了,这个queue就不需要了,所以就让它自动生成了,不需要的了,就自动销毁
2、广播实时性
广播是实时的,你不在的时候,就是你消费者没有开启的时候,发消息的时候,就没有收到,这个时候就没有了。如果消费者开启了,生产者发消息时,消费者是收的到的,这个又叫订阅发布,收音机模式
2.1消费者断开->生产者发消息->消费再连接
①消费者全部断开,生产者发消息
②消费再连接
很显然,消费者是无法收到消息的。
2.22、消费者连接->生产者发消息
①消费者处于连接状态,生产者发消息
②查看各个消费者
结论:只有在消费者处于连接状态,才能接受消息。


RabbitMQ direct广播模式

之前我们谈到的是1对多以广播的方式,发送给所有的消费者。那如果消费者可以进行过滤,有选择的进行接收我想要的消息。下面我们就来学习第二种广播方式,即 direct广播模式
RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。

代码实现

1、生产者(publisher)

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#Author:liyananimport pika,sysconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))channel = connection.channel()#定义direct类型的exchange
channel.exchange_declare(exchange='direct_logs',exchange_type='direct')
#定义重要程度,定义什么级别的日志
severity = sys.argv[1] if len(sys.argv)>1 else 'info'
message = ' '.join(sys.argv[2:]) or 'hello world'#发送消息
channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message)print(" [x] Sent %r:%r" % (severity, message))
connection.close()

这边routing_key是只的你过滤的条件
2、消费者(consumer)

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#Author:liyananimport pika, sysconnection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
# 定义direct类型的exchange
channel.exchange_declare(exchange="direct_logs", exchange_type="direct")
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
# 手动输入安全级别
severities = sys.argv[1:]
if not severities:sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])sys.exit(1)
# 循环遍历绑定消息队列
for severity in severities:channel.queue_bind(exchange="direct_logs",queue=queue_name,routing_key=severity)
print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properites, body):"回调函数"print(" [x] %r:%r" % (method.routing_key, body))# 消费消息
channel.basic_consume(callback, queue=queue_name, no_ack=True)channel.start_consuming()

绑定队列时,需要循环遍历绑定。

执行实验

1、符合条件的执行结果
①服务端不输入参数执行->客户端能正常收到消息

[root@liyanan_python day11]# python3 receive3.py
Usage: receive3.py [info] [warning] [error]
[root@liyanan_python day11]# python3 receive3.py info
[*] Waiting for logs. To exit press CTRL+C
[x] 'info':b'hello world'

②服务端输入参数->客户端能正常收到消息

#服务端
[root@liyanan_python day11]# python3 send3.py
[x] Sent 'info':'hello world'
[root@liyanan_python day11]# python3 send3.py warning from liyanan
[x] Sent 'warning':'from liyanan'
[root@liyanan_python day11]#

#客户端
[root@liyanan_python day11]# python3 receive3.py info warning
[*] Waiting for logs. To exit press CTRL+C
[x] 'info':b'hello world'
[x] 'warning':b'from liyanan'

结论:客户端设置只允许warning和info的级别的收到,其他的收不到
2、不符合符合条件的执行结果

#服务端
[root@liyanan_python day11]# python3 send3.py
[x] Sent 'info':'hello world'
[root@liyanan_python day11]# python3 send3.py
[x] Sent 'info':'hello world'
[root@liyanan_python day11]# python3 send3.py error from liyanan2
[x] Sent 'error':'from liyanan2'

#客户端
[root@liyanan_python day11]# python3 receive3.py error warning
[*] Waiting for logs. To exit press CTRL+C
[x] 'error':b'from liyanan2'

客户端不接受不满足info条件的消息。


RabbitMQ topic细致的消息过滤广播模式

刚才我们做了一个区分,把error、warning绑定级别把消息区分了。我们回到日志上,如果想做的更细致的区分,比如说,你现在搜索的有error,有warning等,在Linux上有一个系统日志,这个系统日志搜索所有应用的系统日志。所有程序都在这个日志里面打日志。那如果我想划分出来。什么是mysql的发出来的日志,什么是apache发出来的日志。然后mysql日志里面同时是info,又包含warning,error。Apache也是一样,所以我们要做更细的区分,更细致的消息过滤。

代码实现

1、生产者(publicer)

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#Author:liyananimport pika, sysconnection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
# 声明一个topic的exchange
channel.exchange_declare(exchange="topic_logs",exchange_type="topic")routing_key = sys.argv[1] if len(sys.argv) > 1 else "anonymous.info"message = " ".join(sys.argv[2:]) or "hello world"
channel.basic_publish(exchange="topic_logs",routing_key=routing_key,body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

这边修改了exchange的类型,类型为topic,topic=>话题,意思是:你对什么话题感兴趣,就收什么消息。
2、消费者(consumer)

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#Author:liyananimport pika, sysconnection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))channel = connection.channel()
# 声明topic类型的exchange
channel.exchange_declare(exchange="topic_logs",exchange_type="topic")result = channel.queue_declare(exclusive=True)
queue_name = result.method.queuebanding_keys = sys.argv[1:]
if not banding_keys:sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])sys.exit(1)
# 循环绑定queue
for banding_key in banding_keys:channel.queue_bind(exchange="topic_logs",queue=queue_name,routing_key=banding_key)print(' [*] Waiting for logs. To exit press CTRL+C')# 回调函数
def callback(ch, method, properites, body):"回调函数"print(" [x] %r:%r" % (method.routing_key, body))# 消费者消费
channel.basic_consume(callback, queue=queue_name, no_ack=True)channel.start_consuming()

这边除了定义exchange的类型变了,其他的都没有改变。

运行结果

1、客户端匹配mysql..error

#客户端
[root@liyanan_python day11]# python3 receive4.py mysql.* *.error
[*] Waiting for logs. To exit press CTRL+C
[x] 'mysql.info':b'hello world'
[x] 'liyanan.error':b'hello world'

#服务端
[root@liyanan_python day11]# python3 send4.py mysql.info
[x] Sent 'mysql.info':'hello world'
[root@liyanan_python day11]# python3 send4.py liyanan.error
[x] Sent 'liyanan.error':'hello world'

2、匹配所有的

#客户端
[root@liyanan_python day11]# python3 receive4.py '#'
[*] Waiting for logs. To exit press CTRL+C
[x] 'abc.mysql.info':b'hello world'
[x] 'mysql.info':b'hello world'
[x] 'mysql.info':b'hello world'

#服务端
[root@liyanan_python day11]# python3 send4.py abc.mysql.info
[x] Sent 'abc.mysql.info':'hello world'
[root@liyanan_python day11]# python3 send4.py mysql.info
[x] Sent 'mysql.info':'hello world'
[root@liyanan_python day11]# python3 send4.py mysql.info
[x] Sent 'mysql.info':'hello world'
匹配规则

To receive all the logs run: =># 是匹配所有的

python receive_logs_topic.py “#”
To receive all logs from the facility “kern”: =>只匹配kern开头的

python receive_logs_topic.py “kern.*”
Or if you want to hear only about “critical” logs: =>匹配critical结尾的

python receive_logs_topic.py “*.critical”
You can create multiple bindings:

python receive_logs_topic.py “kern.” “.critical”
And to emit a log with a routing key “kern.critical” type:

python emit_log_topic.py “kern.critical” “A critical kernel error”


RabbitMQ rpc实现

之前我们都是单向发送消息,客户端发送消息给服务端,那么问题来了,我现在发一个命令给远程客户端,让它去执行,执行之后的结果,我想把这个结果再返回。这个模型叫什么呐,这种模型叫RPC=>remote procedure call。
怎么返回这个消息呢?
答:就server 端和客户端既是消费者,又是生产者。

逻辑代码

1、RPC Client

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#Author:liyananimport pika, uuid, timeclass FibonacciRpcClient(object):"斐波那契数列rpc客户端"def __init__(self):self.connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))self.channel = self.connection.channel()result = self.channel.queue_declare(exclusive=True)self.callback_queue = result.method.queueself.channel.basic_consume(self.on_response, no_ack=True,queue=self.callback_queue)def on_response(self, ch, method, props, body):print("---->", method, props)if self.corr_id == props.correlation_id:  # 我发过去的结果就是我想要的结果,保持数据的一致性self.response = bodydef call(self, n):self.response = Noneself.corr_id = str(uuid.uuid4())self.channel.publish(exchange="",routing_key="rpc_queue",properties=pika.BasicProperties(reply_to=self.callback_queue,correlation_id=self.corr_id),body=str(n))while self.response is None:self.connection.process_data_events()  # 非阻塞版的start_consumer()print("no msg....")time.sleep(0.5)return int(self.response)if __name__ == "__main__":fibonacci_rpc = FibonacciRpcClient()print(" [x] Requesting fib(30)")response = fibonacci_rpc.call(30)print(" [.] Got %r" % response)

注:
1、我是想不阻塞,而是想每过一段时间,就过来检查一下,就不能用start_consumer,而是用connection.process_data_evevts(),它是不阻塞的,如果收到消息就收到,收不到消息也返回,就继续往下执行。
2、reply_to就是想让服务器执行完命令之后,把结果返回到这个queue里面。
3、在while self.respose is None中的代码我可以不做time.sleep,我这边可以发消息给服务器端,这个消息不一定按顺序发给服务器端,如果不做self.corr_id == props.correlation_id的验证,那数据就可能对不上了。
2、RPC Server

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#Author:liyananimport pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
channel = connection.channel()
channel.queue_declare(queue="rpc_queue")def fib(n):"斐波那契数列"if n == 0:return 0elif n == 1:return 1else:return fib(n - 1) + fib(n - 2)def on_request(ch, method, props, body):n = int(body)print(" [.] fib(%s)" % n)response = fib(n)ch.basic_publish(exchange="",routing_key=props.reply_to,properties=pika.BasicProperties(correlation_id= \props.correlation_id),# props的是客户端的发过来的信息,这边把correlation_id返回给客户端做验证body=str(response))ch.basic_ack(delivery_tag=method.delivery_tag)channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue="rpc_queue")print(" [x] Awaiting RPC requests")
channel.start_consuming()

注:props.reply_to,这个就是客户端返回过来的queue。
问:如果客户端和服务用的是同一个queue,会有什么影响?
答:如果客户端也发到rpc_queue中,那么客户端就会收到自己的消息,就会形成一个死循坏,把自己给玩死了。



Redis string操作

Redis 介绍

redis是业界主流的key-value nosql 数据库之一。和Memcached类似,它支持存储的value类型相对更多,包括string(字符串)、list(链表)、set(集合)、zset(sorted set –有序集合)和hash(哈希类型)。这些数据类型都支持push/pop、add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的。在此基础上,redis支持各种不同方式的排序。与memcached一样,为了保证效率,数据都是缓存在内存中。区别的是redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,并且在此基础上实现了master-slave(主从)同步。
Redis优点
1、异常快速 : Redis是非常快的,每秒可以执行大约110000设置操作,81000个/每秒的读取操作。
2、支持丰富的数据类型 : Redis支持最大多数开发人员已经知道如列表,集合,可排序集合,哈希等数据类型。
这使得在应用中很容易解决的各种问题,因为我们知道哪些问题处理使用哪种数据类型更好解决。
3、操作都是原子的 : 所有 Redis 的操作都是原子,从而确保当两个客户同时访问 Redis 服务器得到的是更新后的值(最新值)。
4、MultiUtility工具:Redis是一个多功能实用工具,可以在很多如:缓存,消息传递队列中使用(Redis原生支持发布/订阅),在应用程序中,如:Web应用程序会话,网站页面点击数等任何短暂的数据;
运行redis:

redis-cli
python操作redis

1、安装redis模块

sudo pip install redis
or
sudo easy_install redis
or
源码安装详见:https://github.com/WoLpH/redis-py

2、连接方式
①逻辑图

②操作模式
redis-py提供两个类Redis和StrictRedis用于实现Redis的命令,StrictRedis用于实现大部分官方的命令,并使用官方的语法和命令,Redis是StrictRedis的子类,用于向后兼容旧版本的redis-py。

import redisr = redis.Redis(host='localhost', port=6379)
r.set('foo', 'Bar')
print r.get('foo')

③连接池
redis-py使用connection pool来管理对一个redis server的所有连接,避免每次建立、释放连接的开销。默认,每个Redis实例都会维护一个自己的连接池。可以直接建立一个连接池,然后作为参数Redis,这样就可以实现多个Redis实例共享一个连接池。

import redispool = redis.ConnectionPool(host="localhost",port=6379)r = redis.Redis(connection_pool=pool)
r.set('foo','bar')
print(r.get('foo'))  #打印出来的是bytes类型,因为在python3中,通过socket的都是bytes类型
python操作

1、String操作
redis中的String在在内存中按照一个name对应一个value来存储。如图:

2、操作方式
2.1set(name, value, ex=None, px=None, nx=False, xx=False)

在Redis中设置值,默认,不存在则创建,存在则修改
参数:ex,过期时间(秒)px,过期时间(毫秒)nx,如果设置为True,则只有name不存在时,当前set操作才执行xx,如果设置为True,则只有name存在时,岗前set操作才执行

redis命令操作:

127.0.0.1:6379> set age 22  #设置age为22
OK
127.0.0.1:6379> set age 22 ex 2  #设置age为22,过期时间为2秒
OK
127.0.0.1:6379> set age 23
OK
127.0.0.1:6379> set age 24 nx #当age的值已经存在,则无法重新设置新的值
(nil)
127.0.0.1:6379> get age
"23"
127.0.0.1:6379> set age 24 xx #当age的值已经存在,则才能重新设置新的值
OK
127.0.0.1:6379> get age
"24"

2.2setnx(name, value)

设置值,只有name不存在时,执行设置操作(添加),相当于set name liyanan nx

2.3setex(name, value, time)

# 设置值
# 参数:# time,过期时间(数字秒 或 timedelta对象)
#相当于 set name liyanan ex 2

2.4psetex(name, time_ms, value)

# 设置值
# 参数:# time_ms,过期时间(数字毫秒 或 timedelta对象)
#相当于 set name liyanan px 2

2.5mset(*args, **kwargs)

批量设置值
如:mset(k1='v1', k2='v2')或mget({'k1': 'v1', 'k2': 'v2'})

redis命令操作:

127.0.0.1:6379> mset name liyanan age 24
OK
127.0.0.1:6379> keys *
1) "longin_users"
2) "val"
3) "foo"
4) "age"
5) "name"
127.0.0.1:6379> get name
"liyanan"

2.6get(name)

获取值

redis命令操作:

127.0.0.1:6379> set age 25
OK
127.0.0.1:6379> get age
"25"

2.7mget(keys,*args)

批量获取
如:mget('abc', 'def')或r.mget(['abc', 'def'])

2.8getset(name,value)

设置新值并获取原来的值

redis命令操作:

127.0.0.1:6379[1]> set age 23  #设置age的值为23
OK
127.0.0.1:6379[1]> getset age 18  #重新设置age的值为18,并返回原来的值23
"23"
127.0.0.1:6379[1]> get age  #获取age当前的值
"18"

2.9getrange(key,start,end)

# 获取子序列(根据字节获取,非字符)
# 参数:# name,Redis 的 name# start,起始位置(字节)# end,结束位置(字节)
# 如: "李亚楠" ,0-3表示 "李"

redis命令操作:

127.0.0.1:6379> set name liyanan
OK
127.0.0.1:6379> getrange name 0 1#获取0到1的字符串,这边相当于切片,但是是顾头又顾尾
"li"

2.10setrange(name,offset,value)

# 修改字符串内容,从指定字符串索引开始向后替换(新值太长时,则向后添加)
# 参数:# offset,字符串的索引,字节(一个汉字三个字节)# value,要设置的值

redis命令操作:

127.0.0.1:6379> get name
"liyanan"
127.0.0.1:6379> setrange name 3 m#把字符串中的第3个字符替换成m
(integer) 7
127.0.0.1:6379> get name
"liymnan"

2.11setbit(name,offset,value)

# 对name对应值的二进制表示的位进行操作# 参数:# name,redis的name# offset,位的索引(将值变换成二进制后再进行索引)# value,值只能是 1 或 0# 注:如果在Redis中有一个对应: n1 = "foo",那么字符串foo的二进制表示为:01100110 01101111 01101111所以,如果执行 setbit('n1', 7, 1),则就会将第7位设置为1,那么最终二进制则变成 01100111 01101111 01101111,即:"goo"# 扩展,转换二进制表示:# source = "武沛齐"source = "foo"for i in source:num = ord(i)print bin(num).replace('b','')特别的,如果source是汉字 "武沛齐"怎么办?答:对于utf-8,每一个汉字占 3 个字节,那么 "武沛齐" 则有 9个字节对于汉字,for循环时候会按照 字节 迭代,那么在迭代时,将每一个字节转换 十进制数,然后再将十进制数转换成二进制11100110 10101101 10100110 11100110 10110010 10011011 11101001 10111101 10010000-------------------------- ----------------------------- -----------------------------武                        沛                          齐

*用途举例,用最省空间的方式,存储在线用户数及分别是哪些用户在线
2.12getbit(name,offset)

# 获取name对应的值的二进制表示中的某位的值 (0或1)

2.13bitcount(key,start=None,end=None)

# 获取name对应的值的二进制表示中 1 的个数
# 参数:# key,Redis的name# start,位起始位置# end,位结束位置

2.14strlen(name)

# 返回name对应值的字节长度(一个汉字3个字节)

redis的操作:

127.0.0.1:6379> get name
"liymnan"
127.0.0.1:6379> strlen name#计算name值得字符串的长度
(integer) 7

2.15incr(self,name,amount=1)

# 自增 name对应的值,当name不存在时,则创建name=amount,否则,则自增。# 参数:# name,Redis的name# amount,自增数(必须是整数)# 注:同incrby

redis的操作:

127.0.0.1:6379[1]> INCR user_name
(integer) 1
127.0.0.1:6379[1]> INCR user_name
(integer) 2
127.0.0.1:6379[1]> INCR user_name
(integer) 3

*用途举例,这个一般用于计算用户的登录的总量的。
2.16incrbyfloat(self,name,amount=1.0)

# 自增 name对应的值,当name不存在时,则创建name=amount,否则,则自增。# 参数:# name,Redis的name# amount,自增数(浮点型)

redis的操作:

127.0.0.1:6379[1]> incrbyfloat user_login 1.2
"1.2"
127.0.0.1:6379[1]> incrbyfloat user_login 1.4
"2.6"

2.17decr(self, name, amount=1)

# 自减 name对应的值,当name不存在时,则创建name=amount,否则,则自减。# 参数:# name,Redis的name# amount,自减数(整数)

redis的操作:

127.0.0.1:6379[1]> decr user_name
(integer) 2
127.0.0.1:6379[1]> decr user_name
(integer) 1
127.0.0.1:6379[1]> decr user_name
(integer) 0

*用途举例,用于如果用户账户失效,则减去
2.18append(key,value)

# 在redis name对应的值后面追加内容# 参数:key, redis的namevalue, 要追加的字符串

redis的操作:

127.0.0.1:6379> get name
"liymnan"
127.0.0.1:6379> append name wu#在name值后面追加wu字符串
(integer) 9
127.0.0.1:6379> get name
"liymnanwu"

2.19help变量名

#查看操作的使用方法

redis的操作方法:

127.0.0.1:6379[1]> help set    #help  命令SET key value [EX seconds] [PX milliseconds] [NX|XX]  #set命令的用法summary: Set the string value of a key  #作用since: 1.0.0group: string    #所属类型,是string

Redis hash操作

之前我们学习的是redis的string操作,今天我们来学习 hash 操作,hash表现形式上有些像pyhton中的dict,可以存储一组关联性较强的数据。redis中Hash在内存中的存储格式如下图:

hash操作

1、hset(name, key, value)

# name对应的hash中设置一个键值对(不存在,则创建;否则,修改)# 参数:# name,redis的name# key,name对应的hash中的key# value,name对应的hash中的value# 注:# hsetnx(name, key, value),当name对应的hash中不存在当前key时则创建(相当于添加)

redis命令操作:

127.0.0.1:6379> hset info name liyanan#设置info name值
(integer) 1
127.0.0.1:6379> hgetall info#获取info中所以的key和value
1) "name"
2) "liyanan"
127.0.0.1:6379> hget info name#获取info 中name的值
"liyanan"

2、hmset(name, mapping)

# 在name对应的hash中批量设置键值对# 参数:# name,redis的name# mapping,字典,如:{'k1':'v1', 'k2': 'v2'}# 如:# r.hmset('xx', {'k1':'v1', 'k2': 'v2'})

redis命令操作:

127.0.0.1:6379> hmset info2 k1 1 k2 2  #设置多个key-value
OK
127.0.0.1:6379> hkeys info2  #查看所有的key
1) "k1"
2) "k2"
127.0.0.1:6379> hgetall info2  #查看所有的key-value
1) "k1"
2) "1"
3) "k2"
4) "2"

3、hget(name,key)

# 在name对应的hash中获取根据key获取value

4、hmget(name, keys, *args)

# 在name对应的hash中获取多个key的值# 参数:# name,reids对应的name# keys,要获取key集合,如:['k1', 'k2', 'k3']# *args,要获取的key,如:k1,k2,k3# 如:# r.mget('xx', ['k1', 'k2'])# 或# print r.hmget('xx', 'k1', 'k2')

5、hgetall(name)

获取name对应hash的所有键值

6、hlen(name)

# 获取name对应的hash中键值对的个数

redis的命令操作:

127.0.0.1:6379> hmset info2 k1 1 k2 2
OK
127.0.0.1:6379> hlen info2 #查看info2的所有keys的个数
(integer) 2

7、hkeys(name)

# 获取name对应的hash中所有的key的值

8、hvals(name)

# 获取name对应的hash中所有的value的值

redis的命令操作:

127.0.0.1:6379> hmset info2 k1 1 k2 2
OK
127.0.0.1:6379> hvals info2  #显示values的值
1) "1"
2) "2"

9、hexists(name, key)

# 检查name对应的hash是否存在当前传入的key

redis的命令操作:

127.0.0.1:6379> hmset info2 k1 1 k2 2
OK
127.0.0.1:6379> hexists info2 name  #不存在返回0
(integer) 0
127.0.0.1:6379> hexists info2 k1  #存在返回1
(integer) 1

10、hdel(name,*keys)

# 将name对应的hash中指定key的键值对删除

redis的命令操作:

127.0.0.1:6379> hgetall info2
1) "k1"
2) "1"
3) "k2"
4) "2"
127.0.0.1:6379> hdel info2 k1  #删除k1
(integer) 1
127.0.0.1:6379> hgetall info2  #显示k1已经被删除
1) "k2"
2) "2"

11、hincrby(name, key, amount=1)

# 自增name对应的hash中的指定key的值,不存在则创建key=amount
# 参数:# name,redis中的name# key, hash对应的key# amount,自增数(整数)

redis的命令操作:

127.0.0.1:6379> hincrby info2 k2 1
(integer) 3
127.0.0.1:6379> hincrby info2 k2 2
(integer) 5

12、hincrbyfloat(name, key, amount=1.0)

# 自增name对应的hash中的指定key的值,不存在则创建key=amount# 参数:# name,redis中的name# key, hash对应的key# amount,自增数(浮点数)# 自增name对应的hash中的指定key的值,不存在则创建key=amount

13、hscan(name, cursor=0, match=None, count=None)

Start a full hash scan with:

HSCAN myhash 0

Start a hash scan with fields matching a pattern with:

HSCAN myhash 0 MATCH order_*

Start a hash scan with fields matching a pattern and forcing the scan command to do more scanning with:

HSCAN myhash 0 MATCH order_* COUNT 1000

# 增量式迭代获取,对于数据大的数据非常有用,hscan可以实现分片的获取数据,并非一次性将数据全部获取完,从而放置内存被撑爆# 参数:# name,redis的name# cursor,游标(基于游标分批取获取数据)# match,匹配指定key,默认None 表示所有的key# count,每次分片最少获取个数,默认None表示采用Redis的默认分片个数# 如:# 第一次:cursor1, data1 = r.hscan('xx', cursor=0, match=None, count=None)# 第二次:cursor2, data1 = r.hscan('xx', cursor=cursor1, match=None, count=None)# ...# 直到返回值cursor的值为0时,表示数据已经通过分片获取完毕

redis的命令操作:

127.0.0.1:6379> HGETALL info2
1) "k1"
2) "1"
3) "k2"
4) "5"
127.0.0.1:6379> HSCAN info2 0 match n*#匹配n开头的key的信息
1) "0"
2) (empty list or set)

14、hscan_iter(name, match=None, count=None)

# 利用yield封装hscan创建生成器,实现分批去redis中获取数据# 参数:# match,匹配指定key,默认None 表示所有的key# count,每次分片最少获取个数,默认None表示采用Redis的默认分片个数# 如:# for item in r.hscan_iter('xx'):#    print item

Redis list集合操作

List操作,redis中的List在在内存中按照一个name对应一个List来存储.

操作

1、lpush(name,values)

# 在name对应的list中添加元素,每个新的元素都添加到列表的最左边# 如:# r.lpush('oo', 11,22,33)# 保存顺序为: 33,22,11

redis命令操作:

127.0.0.1:6379> lpush names liyanan liqin liyifeng
(integer) 3
127.0.0.1:6379> lrange names 0 -1
1) "liyifeng"
2) "liqin"
3) "liyanan"

注:lpush => left push的意思,表示从左向右操作,上面的逻辑如下图:

2、rpush(name,values)

# 在name对应的list中添加元素,每个新的元素都添加到列表的最右边,表示从右向左操作# 如:# r.rpush('oo', 11,22,33)# 保存顺序为: 33,22,11

redis命令操作:

127.0.0.1:6379> rpush names2 liyanan liqin liyifeng
(integer) 3
127.0.0.1:6379> lrange names2 0 -1
1) "liyanan"
2) "liqin"
3) "liyifeng"

注:rpush => right push的意思,表示从右向左操作,上面的逻辑如下图:

3、lpushx(name,value)

# 在name对应的list中添加元素,只有name已经存在时,值添加到列表的最左边# 更多:# rpushx(name, value) 表示从右向左操作

redis的命令操作:

127.0.0.1:6379> lpushx names lixiaolong#在列表的最左边添加元素
(integer) 4
127.0.0.1:6379> lrange names 0 -1
1) "lixiaolong"
2) "liyifeng"
3) "liqin"
4) "liyanan"

4、llen(name)

# name对应的list元素的个数

redis的命令操作:

127.0.0.1:6379> llen names
(integer) 4

5、linsert(name, where, refvalue, value))

# 在name对应的列表的某一个值前或后插入一个新值# 参数:# name,redis的name# where,BEFORE或AFTER# refvalue,标杆值,即:在它前后插入数据# value,要插入的数据

redis的命令操作:

127.0.0.1:6379> linsert names before liqin liyundi
(integer) 5
127.0.0.1:6379> lrange names 0 -1
1) "lixiaolong"
2) "liyifeng"
3) "liyundi"#已经插入
4) "liqin"
5) "liyanan"

6、r.lset(name, index, value)

# 对name对应的list中的某一个索引位置重新赋值# 参数:# name,redis的name# index,list的索引位置# value,要设置的值

redis的命令操作:

127.0.0.1:6379> lrange names 0 -1
1) "lixiaolong"
2) "liyifeng"
3) "liyundi"
4) "liqin"
5) "liyanan"
127.0.0.1:6379> lset names 1 lixiaolu
OK
127.0.0.1:6379> lrange names 0 -1
1) "lixiaolong"
2) "lixiaolu"#已经把李易峰改成李小璐了
3) "liyundi"
4) "liqin"
5) "liyanan"

7、r.lrem(name, value, num)

# 在name对应的list中删除指定的值# 参数:# name,redis的name# value,要删除的值# num,  num=0,删除列表中所有的指定值;# num=2,从前到后,删除2个;# num=-2,从后向前,删除2个

redis的命令操作:

127.0.0.1:6379> lrange names 0 -1
1) "lixiaolong"
2) "lixiaolu"
3) "liyundi"
4) "liqin"
5) "liyanan"
127.0.0.1:6379> lrem names 1 lixiaolong
(integer) 1
127.0.0.1:6379> lrange names 0 -1
1) "lixiaolu"
2) "liyundi"
3) "liqin"
4) "liyanan"

8、lpop(name)

# 在name对应的列表的左侧获取第一个元素并在列表中移除,返回值则是第一个元素# 更多:# rpop(name) 表示从右向左操作

redis的命令操作:

127.0.0.1:6379> lrange names 0 -1
1) "lixiaolu"
2) "liyundi"
3) "liqin"
4) "liyanan"
127.0.0.1:6379> LPOP names#删除最左边的值,并且返回
"lixiaolu"
127.0.0.1:6379> lrange names 0 -1
1) "liyundi"
2) "liqin"
3) "liyanan"

9、lindex(name, index)

在name对应的列表中根据索引获取列表元素

10、lrange(name, start, end)

# 在name对应的列表分片获取数据
# 参数:# name,redis的name# start,索引的起始位置# end,索引结束位置

redis的命令操作:

127.0.0.1:6379> lrange names 0 -1
1) "liyundi"
2) "liqin"
3) "liyanan"
127.0.0.1:6379> lrange names 0 1
1) "liyundi"
2) "liqin"

11、ltrim(name, start, end)

# 在name对应的列表中移除没有在start-end索引之间的值
# 参数:# name,redis的name# start,索引的起始位置# end,索引结束位置

redis的操作:

127.0.0.1:6379> lrange names 0 -1
1) "liyundi"
2) "liqin"
3) "liyanan"
127.0.0.1:6379> ltrim names 0 1
OK
127.0.0.1:6379> lrange names 0 -1
1) "liyundi"
2) "liqin"

12、rpoplpush(src, dst)

# 从一个列表取出最右边的元素,同时将其添加至另一个列表的最左边
# 参数:# src,要取数据的列表的name# dst,要添加数据的列表的name

redis的命令操作:

127.0.0.1:6379> lrange names 0 -1
1) "liyundi"
2) "liqin"
127.0.0.1:6379> lrange names2 0 -1
1) "liyanan"
2) "liqin"
3) "liyifeng"
127.0.0.1:6379> lrange names 0 -1
1) "liyundi"
2) "liqin"
127.0.0.1:6379> rpoplpush names2 names
"liyifeng"
127.0.0.1:6379> lrange names2 0 -1
1) "liyanan"
2) "liqin"
127.0.0.1:6379> lrange names 0 -1
1) "liyifeng"
2) "liyundi"
3) "liqin"

13、blpop(keys, timeout)

# 将多个列表排列,按照从左到右去pop对应列表的元素# 参数:# keys,redis的name的集合# timeout,超时时间,当元素所有列表的元素获取完之后,阻塞等待列表内有数据的时间(秒), 0 表示永远阻塞# 更多:# r.brpop(keys, timeout),从右向左获取数据

redis的命令操作:

127.0.0.1:6379> lrange names2 0 -1
1) "liyanan"
2) "liqin"
127.0.0.1:6379> BLPOP names2 2
1) "names2"
2) "liyanan"
127.0.0.1:6379> lrange names2 0 -1
1) "liqin"

但是又一种情况,如果names2中为空值?

127.0.0.1:6379> lrange names2 0 -1
(empty list or set)

从上面可以看得出来,当我names2中没有值的时候,就会在那边一直卡着40秒,那我如果再开一个 终端 话,然后往里面push值,会发生什么?

这意味着这个names2的列表,是两个进程或者是两个程序,一个进程我可以往里面放值,另外一个进程我可以往里面取值,这个有点像队列。
14、brpoplpush(src, dst, timeout=0)

# 从一个列表的右侧移除一个元素并将其添加到另一个列表的左侧# 参数:# src,取出并要移除元素的列表对应的name# dst,要插入元素的列表对应的name# timeout,当src对应的列表中没有数据时,阻塞等待其有数据的超时时间(秒),0 表示永远阻塞

redis的命令操作:

127.0.0.1:6379> lrange names2 0 -1
1) "li"
127.0.0.1:6379> lrange names 0 -1
1) "liyifeng"
2) "liyundi"
3) "liqin"
127.0.0.1:6379> brpoplpush names names2 1
"liqin"
127.0.0.1:6379> lrange names 0 -1
1) "liyifeng"
2) "liyundi"
127.0.0.1:6379> lrange names2 0 -1
1) "liqin"
2) "li"

跟上面一样,如果names列表中的为空值,结果还是一样的,如果names列表为空的话,那也会等40s中以后会自动退出。我们跟刚刚一样,开另外一个终端往names列表中push值。
问: 这样做有什么好处呢?
两个进程,现在就两个列表,一个进程里面的列表可以往里面放数据,另外一个列表进程get到数据,然后把他移到自己的进程里面的列表中,然后把第1个列表中的数据删掉,这样就保证了数据的一致性的情况下,你同步删,同步往里面放,我同步往外取,相互不影响。


集合set 和 有序集合操作

set 集合操作

Set集合是无序的,Set集合就是不允许重复的列表
1、sadd(name,values)

# name对应的集合中添加元素

redis的命令操作:

127.0.0.1:6379> sadd nams liyanan liqin liyifeng#names集合中添加元素
(integer) 3
127.0.0.1:6379> smembers nams#set集合去重,无序的
1) "liyanan"
2) "liyifeng"
3) "liqin"

2、scard(name)

获取name对应的集合中元素个数

redis命令操作:

127.0.0.1:6379> SMEMBERS nams
1) "liyanan"
2) "liyifeng"
3) "liqin"
127.0.0.1:6379> scard nams#统计names集合中的元素个数
(integer) 3

3、sdiff(keys, *args)

在第一个name对应的集合中且不在其他name对应的集合的元素集合

redis的命令操作:

127.0.0.1:6379> smembers nams
1) "liyanan"
2) "liyifeng"
3) "liqin"
127.0.0.1:6379> smembers nams2
1) "lixiaolu"
2) "liyanan"
3) "liqin"
127.0.0.1:6379> sdiff nams nams2#获取names集合和names2集合的差集,就是names有,names2没有的
1) "liyifeng"

4、sdiffstore(dest, keys, *args)

# 获取第一个name对应的集合中且不在其他name对应的集合,再将其新加入到dest对应的集合中

redis的命令操作:

127.0.0.1:6379> smembers nams2
1) "lixiaolu"
2) "liyanan"
3) "liqin"
127.0.0.1:6379> smembers nams
1) "liyanan"
2) "liyifeng"
3) "liqin"
127.0.0.1:6379> sdiffstore nams3 nams nams2#names和names2的差集赋值给name3
(integer) 1
127.0.0.1:6379> smembers nams3
1) "liyifeng"

5、sinter(keys, *args)

# 获取多一个name对应集合的交集

redis的命令操作:

127.0.0.1:6379> smembers nams
1) "liyanan"
2) "liyifeng"
3) "liqin"
127.0.0.1:6379> smembers nams2
1) "lixiaolu"
2) "liyanan"
3) "liqin"
127.0.0.1:6379> sinter nams nams2#取names和names2的交集
1) "liyanan"
2) "liqin"

6、sinterstore(dest, keys, *args)

# 获取多一个name对应集合的并集,再讲其加入到dest对应的集合中

redis的命令操作:

127.0.0.1:6379> sinterstore nams4 nams nams2
(integer) 2
127.0.0.1:6379> smembers nams4
1) "liyanan"
2) "liqin"

7、sismember(name, value)

# 检查value是否是name对应的集合的成员

redis的命令操作:

127.0.0.1:6379> smembers nams4
1) "liyanan"
2) "liqin"
127.0.0.1:6379> sismember nams4 liqin#存在
(integer) 1
127.0.0.1:6379> sismember nams4 alex#不存在
(integer) 0

8、smembers(name)

# 获取name对应的集合的所有成员

9、smove(src, dst, value)

# 将某个成员从一个集合中移动到另外一个集合

redis的命令操作:

127.0.0.1:6379> smove nams nams2 liyifeng把names集合中的李易峰移至names2
(integer) 1

10、spop(name)

# 从集合的右侧(尾部)移除一个成员,并将其返回

11、 srandmember(name, numbers)

# 从name对应的集合中随机获取 numbers 个元素

12、srem(name, values)

# 在name对应的集合中删除某些值

redis的命令操作:

127.0.0.1:6379> srem nams2 liyanan
(integer) 1

13、sunion(keys, *args)

# 获取多一个name对应的集合的并集

14、sunionstore(dest,keys, *args)

# 获取多一个name对应的集合的并集,并将结果保存到dest对应的集合中

15、sscan(name, cursor=0, match=None, count=None)

匹配集合中的符合规则的元素

redis的命令操作:

127.0.0.1:6379> smembers nams2
1) "lixiaolu"
2) "liyifeng"
3) "liqin"
127.0.0.1:6379> sscan nams2 0 match l*
1) "0"
2) 1) "lixiaolu"2) "liyifeng"3) "liqin"

16、sscan_iter(name, match=None, count=None)

# 同字符串的操作,用于增量迭代分批获取元素,避免内存消耗太大
有序序列(Sort Set)

有序集合,在集合的基础上,为每元素排序;元素的排序需要根据另外一个值来进行比较,所以,对于有序集合,每一个元素有两个值,即:值和分数,分数专门用来做排序。
1、zadd(name, *args, **kwargs)

# 在name对应的有序集合中添加元素
# 如:# zadd('zz', 'n1', 1, 'n2', 2)# 或# zadd('zz', n1=11, n2=22)

redis的命令操作:

127.0.0.1:6379> zadd nemes 5 liyanan 10 liqin 8 liyifeng 4 lixiaolu# zadd 键  权重  值
(integer) 4
127.0.0.1:6379> zrange nemes 0 -1#按权重排序
1) "lixiaolu"
2) "liyanan"
3) "liyifeng"
4) "liqin"

注:值永远只有1个,如果重复添加的话,只是修改了值得权重
2、zcard(name)

# 获取name对应的有序集合元素的数量

3、zcount(name, min, max)

# 获取name对应的有序集合中分数 在 [min,max] 之间的个数

redis命令的操作:

127.0.0.1:6379> zcount nemes 0 8  #score 在0-8 之间的个数
(integer) 3

4、zincrby(name, value, amount)

# 自增name对应的有序集合的 name 对应的分数

5、r.zrange( name, start, end, desc=False, withscores=False, score_cast_func=float)

# 按照索引范围获取name对应的有序集合的元素# 参数:# name,redis的name# start,有序集合索引起始位置(非分数)# end,有序集合索引结束位置(非分数)# desc,排序规则,默认按照分数从小到大排序# withscores,是否获取元素的分数,默认只获取元素的值# score_cast_func,对分数进行数据转换的函数# 更多:# 从大到小排序# zrevrange(name, start, end, withscores=False, score_cast_func=float)# 按照分数范围获取name对应的有序集合的元素# zrangebyscore(name, min, max, start=None, num=None, withscores=False, score_cast_func=float)# 从大到小排序# zrevrangebyscore(name, max, min, start=None, num=None, withscores=False, score_cast_func=float)

redis 的命令操作:

127.0.0.1:6379> zrange nemes 0 -1
1) "lixiaolu"
2) "liyanan"
3) "liyifeng"
4) "liqin"
127.0.0.1:6379> zrange nemes 0 -1 withscores #显示权重
1) "lixiaolu"
2) "4"
3) "liyanan"
4) "5"
5) "liyifeng"
6) "8"
7) "liqin"
8) "10"

6、zrank(name, value)

# 获取某个值在 name对应的有序集合中的排行(从 0 开始)# 更多:# zrevrank(name, value),从大到小排序

redis的操作:

127.0.0.1:6379> zrange nemes 0 -1
1) "lixiaolu"
2) "liyanan"
3) "liyifeng"
4) "liqin"
127.0.0.1:6379> zrank nemes liyanan
(integer) 1

7、zrem(name, values)

# 删除name对应的有序集合中值是values的成员# 如:zrem('zz', ['s1', 's2'])

8、zremrangebyrank(name, min, max)

# 根据排行范围删除

redis的命令操作:

127.0.0.1:6379> zrange nemes 0 -1
1) "lixiaolu"
2) "liyanan"
3) "liyifeng"
4) "liqin"
127.0.0.1:6379> ZREMRANGEBYRANK nemes 0 1#删除0 - 1 的元素
(integer) 2
127.0.0.1:6379> zrange nemes 0 -1
1) "liyifeng"
2) "liqin"

9、zremrangebyscore(name, min, max)

# 根据分数范围删除

10、zscore(name, value)

# 获取name对应有序集合中 value 对应的分数

11、zinterstore(dest, keys, aggregate=None)

# 获取两个有序集合的交集,如果遇到相同值不同分数,则按照aggregate进行操作
# aggregate的值为:  SUM  MIN  MAX

redis的命令操作:

127.0.0.1:6379> zinterstore z3 2 z1 z2  #zinterstore  新集合  参与集合个数  集合1  集合2 ...
(integer) 2
127.0.0.1:6379> zrange z1 0 -1 withscores
1) "jack"
2) "5"
3) "alex"
4) "10"
5) "liyanan"
6) "20"
127.0.0.1:6379> zrange z2 0 -1 withscores
1) "liyanan"
2) "5"
3) "alex"
4) "10"
127.0.0.1:6379> zrange z3 0 -1 withscores  #只有值相同的才加
1) "alex"
2) "20"
3) "liyanan"
4) "25"

12、zunionstore(dest, keys, aggregate=None)

# 获取两个有序集合的并集,如果遇到相同值不同分数,则按照aggregate进行操作
# aggregate的值为:  SUM  MIN  MAX

redis的命令操作:

127.0.0.1:6379> zrange z1 0 -1 withscores
1) "jack"
2) "5"
3) "alex"
4) "10"
5) "liyanan"
6) "20"
127.0.0.1:6379> zrange z2 0 -1 withscores
1) "liyanan"
2) "5"
3) "alex"
4) "10"
127.0.0.1:6379> zunionstore z4 2 z1 z2  #并集,zunionstore  新集合  集合个数  集合1 集合2....
(integer) 3
127.0.0.1:6379> zrange z4 0 -1 withscores
1) "jack"
2) "5"
3) "alex"
4) "20"
5) "liyanan"
6) "25"

13、zscan(name, cursor=0, match=None, count=None, score_cast_func=float)

匹配有序集合中的符合规则的元素

redis的命令操作:

127.0.0.1:6379> zrange nemes 0 -1
1) "liyifeng"
2) "liqin"
127.0.0.1:6379> zscan nemes 0 match *qin
1) "0"
2) 1) "liqin"2) "10"

14、zscan_iter(name, match=None, count=None,score_cast_func=float)

# 同字符串相似,相较于字符串新增score_cast_func,用来对分数进行操作

Redis 发布订阅

其他常用操作

1、delete(*names)

# 根据删除redis中的任意数据类型

2、exists(name)

# 检测redis的name是否存在

3、keys(pattern=’*’)

# 根据模型获取redis的name# 更多:# KEYS * 匹配数据库中所有 key 。# KEYS h?llo 匹配 hello , hallo 和 hxllo 等。# KEYS h*llo 匹配 hllo 和 heeeeello 等。# KEYS h[ae]llo 匹配 hello 和 hallo ,但不匹配 hillo

4、expire(name ,time)

# 为某个redis的某个name设置超时时间

5、rename(src, dst)

# 对redis的name重命名为

6、move(name, db))

# 将redis的某个值移动到指定的db下

注:redis的数据库一共有16个,分别是0-15,用redis命令操作的时候,用select db_index来切换数据库,如select 2
7、randomkey()

# 随机获取一个redis的name(不删除)

8、type(name)

# 获取name对应值的类型

9、scan(cursor=0, match=None, count=None)

正则匹配name

10、scan_iter(match=None, count=None)

# 同字符串操作,用于增量迭代获取key
管道

redis-py默认在执行每次请求都会创建(连接池申请连接)和断开(归还连接池)一次连接操作,如果想要在一次请求中指定多个命令,则可以使用pipline实现一次请求指定多个命令,并且默认情况下一次pipline 是原子性操作。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#Author:liyananimport redispool = redis.ConnectionPool(host='localhost',port=6379,db=2)#可以设置dbr = redis.Redis(connection_pool=pool)pipe = r.pipeline(transaction=True)pipe.set('name','alex')#这边只是设置了,但是没有执行
pipe.set('role','sb')pipe.execute()#当执行execute,才会批量去执行上面的命令
发布订阅

1、原理图
发布者:服务器 订阅者:Dashboad和数据处理


2、实现
2.1、RedisHelper
说明:对发布订阅进行封装

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#Author:liyananimport redisclass RedisHelper(object):def __init__(self):self.__conn = redis.Redis(host="localhost")self.chan_sub = 'fm104.5'self.chan_pub = 'fm104.5'def public(self, msg):"发布"self.__conn.publish(self.chan_pub, msg)  # 发布消息return Truedef subscribe(self):"订阅"pub = self.__conn.pubsub()  # 打开收音机pub.subscribe(self.chan_sub)  # 调频道pub.parse_response()  # 准备接收return pub

2.2、订阅者:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#Author:liyananfrom monitor.redis_helper import RedisHelperobj = RedisHelper()
redis_sub = obj.subscribe()while True:msg = redis_sub.parse_response()  # 第2次准备接收动作print(msg)

2.3、发布者:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#Author:liyananfrom monitor.redis_helper import RedisHelperobj = RedisHelper()
obj.public("hello world")  # 发布消息

3、redis命令订阅发布
3.1、发布

127.0.0.1:6379> help publishPUBLISH channel messagesummary: Post a message to a channelsince: 2.0.0group: pubsub127.0.0.1:6379> publish "fm104.5" helloword  #publish 频道  消息
(integer) 1

3.2订阅

127.0.0.1:6379> help subscribeSUBSCRIBE channel [channel ...]summary: Listen for messages published to the given channelssince: 2.0.0group: pubsub127.0.0.1:6379> subscribe "fm104.5"  #subscribe  频道,可以订阅多个频道
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "fm104.5"
3) (integer) 1

day11_rabbitmq和redis相关推荐

  1. Spring AOP + Redis解决重复提交的问题

    Spring AOP + Redis解决重复提交的问题 用户在点击操作的时候,可能会连续点击多次,虽然前端可以通过设置按钮的disable的属性来控制按钮不可连续点击,但是如果别人拿到请求进行模拟,依 ...

  2. Redis问题——Error: 磁盘在使用中,或被另一个进程锁定。

    Redis出于对数据保护,默认只能本地客户端连接.远程连接就会出现以上错误.如何解决这一问题,看下: server -A,PC-A, 修改server-A的redis.conf:注释掉本地绑定: bi ...

  3. 实现 连续15签到记录_MySQL和Redis实现用户签到,你喜欢怎么实现?

    现在的网站和app开发中,签到是一个很常见的功能 如微博签到送积分,签到排行榜 微博签到 如移动app ,签到送流量等活动, 移动app签到 用户签到是提高用户粘性的有效手段,用的好能事半功倍! 下面 ...

  4. Redis 笔记(16)— info 指令和命令行工具(查看内存、状态、客户端连接数、监控服务器、扫描大key、采样服务器、执行批量命令等)

    Info 命令返回关于 Redis 服务器的各种信息和统计数值.通过给定可选的参数 section ,可以让命令只返回某一部分的信息. 1. 显示模块 server : 一般 Redis 服务器信息, ...

  5. Redis 笔记(15)— 管道 pipeline(客户端将批量命令打包发送用来节省网络开销)

    Redis 是一种基于客户端-服务端模型以及请求/响应协议的 TCP 服务.这意味着通常情况下一个请求会遵循以下步骤: 客户端向服务端发送一个查询请求,并监听 Socket 返回,通常是以阻塞模式,等 ...

  6. Redis 笔记(14)— 持久化及数据恢复(数据持久方式 RDB 和 AOF、数据恢复、混合持久化)

    1. 持久化 所谓持久化是指将数据从内存中以某种形式同步到硬盘中,在 Redis 重启后能够根据硬盘中的记录恢复数据.Redis 持久化有两种方式,分别为 RDB(redis data base) [ ...

  7. Redis 笔记(13)— scan 和 keys 寻找特定前缀key 字段(命令格式、使用示例、定位大key)

    1. keys Redis 提供了一个简单暴力的指令 keys 用来列出所有满足特定正则字符串规则的 key. 127.0.0.1:6379> keys * (empty array) 127. ...

  8. Redis 笔记(12)— 单线程架构(非阻塞 IO、多路复用)和多个异步线程

    Redis 使用了单线程架构.非阻塞 I/O .多路复用模型来实现高性能的内存数据库服务.Redis 是单线程的.那么为什么说是单线程呢? Redis 在 Reactor 模型内开发了事件处理器,这个 ...

  9. Redis 笔记(11)— 文本协议 RESP(单行、多行字符串、整数、错误、数组、空值、空串格式、telnet 登录 redis)

    RESP 是 Redis 序列化协议Redis Serialization Protocol 的简写.它是一种直观的文本协议,优势在于实现异常简单,解析性能极好. ​ Redis 协议将传输的结构数据 ...

  10. Redis 笔记(10)— 发布订阅模式(发布订阅单个信道、订阅信道后的返回值分类、发布订阅多个信道)

    1. 发布-订阅概念 发布-订阅 模式包含两种角色,分别为发布者和订阅者. 订阅者可以订阅一个或者若干个频道(channel): 而发布者可以向指定的频道发送消息,所有订阅此频道的订阅者都可以收到此消 ...

最新文章

  1. Linux下的Memcache安装(含libevent的安装)
  2. Java获取小程序带参二维码(太阳码)
  3. c++ 8.整数加法实现
  4. 2020ICPC(上海) - Walker(分类讨论+二分)
  5. CSS3实现侧边栏快速定位的隐藏和消失
  6. formidable ajax上传,nodejs+express+ajax实现图片上传及显示
  7. 【CSS3】CSS3文本字体相关属性大全
  8. js入门·对象属性方法大总结
  9. powerDesign导出word操作步骤
  10. cad调了比例因子没反应_CAD教程:自由缩放命令的操作流程
  11. 一张图秒懂Android事件分发机制
  12. 四字母net域名值钱吗?四字母域名取名有什么技巧?
  13. mysql my.cnf 生效_mysql配置文件生效顺序
  14. 【QA】数学符号 word输入问题 在word里面怎么输入字母头顶上的那个小尖儿
  15. Resultful API
  16. 2022无线蓝牙耳机选哪个?盘点超热门的蓝牙耳机品牌推荐
  17. [面向对象程序设计] 汽车租赁系统(Java实现)
  18. 网吧服务器网络维护教程,网管员维护服务器过程中的反黑技巧
  19. Window Installer Clean Up好用的软件管理工具
  20. 噪声与振动控制行业的发展和展望

热门文章

  1. 神经影像(核磁共振)概念及数据分析学习
  2. 通达信大资金进出指标公式
  3. Node+puppeteer学习笔记(三)--API问题解决--切换frame和iframe框
  4. 谷歌gmail注册入口_如何删除您的Gmail帐户而不删除您的Google帐户
  5. mysql 完美卸载
  6. Linux 软件 缺少库查询
  7. RestAssured接口自动化框架学习
  8. Python 爬虫框架Scrapy Spiders学习
  9. 如何获取服务器的 CA 证书?
  10. 使用Apache Ignite瘦客户端– Apache Ignite内部博客