Python之队列和数据库
一 数据库
Python可以和数据库进行交互,这里也有一些交互的模块,比如MySQLdb、pymysql等,但是3.x 已经不再支持MySQLdb,所以你安装的时候会报错
我们以pymysql为例子:
1.1 创建连接
pymysql有几种创建连接的方式
import pymysql conn = pymysql.Connect(host="localhost",port=3306,user="root",password="123456",database="employee",charset
='utf8') conn1 = pymysql.connect(host="localhost",port=3306,user="root",password="123456",database="ofbiz",charset=
'utf8') conn2 = pymysql.Connection(host="localhost",port=3306,user="root",password="123456",database="ofbiz",charset=
'utf8')
其实connect==Connection=Connect
1.2 简单的例子
# 创建游标 cursor = conn.cursor()# 查询操作,并返回收影响行数 effect_row = cursor.execute("select * from empinfo")# 更新操作,并返回受影响行数 effect_row = cursor.execute("update empinfo set DeptID = 3 where empID = %s", (1,))# 创建多条记录,并返回受影响行数,执行多次 effect_row = cursor.executemany("insert into deptinfo(DeptName) values(%s)", [("财务部"),("公关部")]) print(effect_row)# 提交,不然无法保存新建或者修改的数据 conn.commit()# 关闭游标 cursor.close() # 关闭连接 conn.close()
1.3 获取查询数据
utils = DBUtils() conn = utils.getConnection(host,port,user,password,
db,charset) cursor = conn.cursor() # 查询数据 cursor.execute("SELECT invoice_id,invoice_type_id,party_id_from,party_id,status_id,invoice_date FROM invoice") # 获取剩余结果的第一行数据 row_1 = cursor.fetchone() # 获取剩余结果前n行数据 rows_n = cursor.fetchmany(3)# 获取剩余结果所有数据 rows_all = cursor.fetchall() utils.foreach(rows_all) utils.commitAndClose(conn,cursor)
1.4 获取新创建数据的自增ID
utils = DBUtils() conn = utils.getConnection(host,port,user,password,
db,charset) cursor = conn.cursor() effect_row = cursor.executemany("insert into deptinfo(DeptName) values(%s)", [("市场部"), ("广告部")]) new_id = cursor.lastrowid print(new_id) utils.commitAndClose(conn,cursor)
1.5 移动游标
utils = DBUtils() conn = utils.getConnection(host,port,user,password,db,charset) cursor = conn.cursor() # 查询数据 cursor.execute("SELECT invoice_id,invoice_type_id,party_id_from,party_id,status_id,invoice_date FROM invoice") # 移动游标:在fetch数据时按照顺序进行,可以使用cursor.scroll(num, mode)来移动游标位置,如 # 从0开始移动2行数据 cursor.scroll(2,"absolute") row_1 = cursor.fetchone() # 结果应该是第三行的数据 8002 utils.foreach(row_1) # 从当前游标8003开始往下移动2行数据, cursor.scroll(2, "relative") # 获取剩余结果的第一行数据,即8005 row_1 = cursor.fetchone() utils.foreach(row_1) utils.commitAndClose(conn,cursor)
1.6 fetch数据类型
fetch默认返回的是一个元组类型的数据,如果你希望返回字典类型数据,怎么办?
utils = DBUtils() conn = utils.getConnection(host,port,user,password,
db,charset) cursor = conn.cursor() # 游标设置为字典类型 cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) # 查询数据 cursor.execute("SELECT invoice_id,invoice_type_id,party_id_from,party_id,status_id,invoice_date FROM invoice") row_1 = cursor.fetchone() utils.foreachMap(row_1) utils.commitAndClose(conn,cursor)
1.7 调用存储过程
utils = DBUtils() conn = utils.getConnection(host,port,user,password,db,charset) cursor = conn.cursor() # 游标设置为字典类型 cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) # 调用无参存储过程 等价于cursor.execute("call p2()") cursor.callproc('p2') # 调用有参存储过程 cursor.callproc('p1', args=("nicky",28)) # 获取执行完存储的参数,参数@开头 cursor.execute("select @p1,@_p1_1,") row_1 = cursor.fetchone() utils.foreachMap(row_1) utils.commitAndClose(conn,cursor)
1.8 防止pymysql SQL 注入
第一:字符串拼接造成的注入
比如:
utils = DBUtils() conn = utils.getConnection(host,port,user,password,db,charset) cursor = conn.cursor() user = "jacky' or '1'-- " deptno = "u1pass" sql = "select username,job from empinfo where username='%s' and deptno='%s'" % (user, deptno)# 拼接语句被构造成下面这样,永真条件,此时就注入成功了。因此要避免这种情况需使用pymysql提供的参数化查询。 # SELECT username,job From empinfo WHERE username='jacky' or 1'-- and deptno='%s'" % (user, deptno) row_count = cursor.execute(sql) row_1 = cursor.fetchone() utils.commitAndClose(conn,cursor)
解决办法: 使用pymysql提供的参数化查询
utils = DBUtils() conn = utils.getConnection(host,port,user,password,db,charset) cursor = conn.cursor() user = "jacky' or '1'-- " deptno = "u1pass" # 避免注入,使用pymysql提供的参数化语句 row_count = cursor.execute("select username,job from empinfo where username=%s and deptno=%s" % (user, deptno)) row_1 = cursor.fetchone() utils.commitAndClose(conn,cursor)
它在内部解析 的时候,会对特殊进行转义
第二:使用存mysql储过程动态执行SQL防注入
1.9 使用with简化连接过程
每次都连接关闭很麻烦,使用上下文管理,简化连接过程
import pymysql import contextlib #定义上下文管理器,连接后自动关闭连接 @contextlib.contextmanager def mysql(host, port, user, password, db, charset='utf8'):conn = pymysql.connect(host=host, port=port, user=user, password=password, db=db, charset=charset)cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)try:yield cursorfinally:conn.commit()cursor.close()conn.close()# 执行sql with mysql() as cursor:print(cursor)row_count = cursor.execute("select * from invoice")row_1 = cursor.fetchone()print(row_count, row_1)
二 Memcached 相关python操作
import memcache
class MemcachedUtils(object):
def __init__(self,hostname,port):
self.hostname = hostname
self.port = port
def getMemcachedClient(self,debug):
#{'192.168.3.40:11211'} 或者['192.168.3.40:11211s']
# _servers = {"%s:%s"%(self.hostname,self.port)}
_servers = ["%s:%s" % (self.hostname, self.port)]
c = memcache.Client(_servers,debug=debug)
return c
tools = MemcachedUtils("192.168.3.40",11211)
client = tools.getMemcachedClient(True)
# add(k,v):添加键值对, 不能添加重复的key,否则报错
# client.add('foo','bar')
# replace():更新键值对,如果key存在则更新,不存在则异常
client.replace('foo','balance')
# set(k,v):设置键值对,相当于添加,和add的区别在于,它不存在则添加,存在则修改
client.set('dname','科技部')
# set_multiu({k1:v1,k2:v2}):设置多个键值对,它不存在则添加,存在则修改
client.set_multi({'food':'noodle','foo':'hah'})
# append(k): 修改指定key的值,在该值后面追加内容
client.append('foo','_end')
# prepend(k):修改指定key的值,在该值前面插入内容
client.prepend('foo','start_')
# incr(k): 根据k值使得对应值加1
client.incr('num')
# decr(k): 根据k值使得对应值减1
client.decr('num')
# delete(k): 根据key删除键值对
client.delete('foo')
# delete([k1,k2]): 根据多个key,删除多个键值对
client.delete_multi('x','y')
# get(k):查询键值对
client.get('foo')
# get_multi([k1,k2]):批量查询
client.get(['k1','k2'])
'''
每次执行gets时,会从memcache中获取一个自增的数字,通过cas去修改gets的值时,
会携带之前获取的自增值和memcache中的自增值进行比较,如果相等,则可以提交,如
果不想等,那表示在gets和cas执行之间,又有其他人执行了gets(获取了缓冲的指定值
),如此一来有可能出现非正常数据,则不允许修改。
'''
v = client.gets('product_count')
# ...
# 如果有人在gets之后和cas之前修改了product_count,那么,下面的设置将会执行失败,剖出异常,从而避免非正常数据的产生
client.cas('product_count', "899")
三 Redis
import redis
'''
redis-py提供两个类Redis和StrictRedis用于实现Redis的命令,StrictRedis用于实现大部分官方的命令,
并使用官方的语法和命令,Redis是StrictRedis的子类,用于向后兼容旧版本的redis-py。
'''
r = redis.Redis(host="192.168.3.40",port=6379)
r.set('foo','bar')
print(r.get('foo'))
'''
redis 连接池
redis-py使用connection pool来管理对一个redis server的所有连接,避免每次建立、释放连接的开销。默认,
每个Redis实例都会维护一个自己的连接池。可以直接建立一个连接池,然后作为参数Redis,
这样就可以实现多个Redis实例共享一个连接池。
'''
pool = redis.ConnectionPool(host='192.168.3.40', port=6379)
r = redis.Redis(connection_pool=pool)
r.set('foo', 'bar1')
print(str(r.get('foo'),'utf8'))
3.1 string操作
'''
set(name, value, ex=None, px=None, nx=False, xx=False)
ex:过期时间(秒)
px: 过期时间(毫秒)
nx: 当只有Name不存在时,当前set操作才执行
xx: 只有当Name存在时,才执行当前操作
'''
r.set('foo','bar2',ex=1,xx=True)
# 结果为bar2
print(str(r.get('foo'),'utf8'))
r.set('foo','bar3',ex=1,nx=True)
# 结果为bar2
print(str(r.get('foo'),'utf8'))
'''
setnx(name,value): 不能存在才修改
setex(name,value,time): 设置过期时间(秒)
psetex(name, time_ms, value):设置过期时间(毫秒)
mset(*args, **kwargs): 批量设置值
mget:批量获取
mget('key1','key2') or mget(['key1','key2'])
getset(name, value): 设置新的值并获取原来的值
getrange(key, start, end)
'''
r.setnx('foo','bar3')
# 还是打印bar1,因为foo已经存在
print(str(r.get('foo'),'utf8'))
r.setex('address','NewYork Royal Road #4',1)
print(str(r.get('address'),'utf8'))
r.psetex('message',1000,'haha')
print(str(r.get('message'),'utf8'))
r.mset(name="Nicky",age=28,gender="male",hobby=["booking","sporting"])
r.mset({'key1':'value1','key2':'value2'})
resultList = r.mget(['name','age'])
resultList = r.mget('gender','hobby')
for result in resultList:
print(str(result,'utf8'))
x = r.getset('foo','new bar')
print(str(x,'utf8'))
# getrange(name,start,end): 根据字节获取子序列
r.set('name','郑涵月')
v = r.getrange('name',0,2) # 郑
# setrange(name, offset, value):修改字符串内容,从指定字符串索引开始向后替换(新值太长时,则向后添加)
r.setrange('name',3,'欣寒')
v = r.get('name')
# strlen(name): 返回name对应值的长度
len = r.strlen('name') # 9
# incr(self, name, amount=1):自增
r.set('age','28')
r.incr('age',amount=2) # age=30
# incrbyfloat(self, name, amount=1.0):浮点数自增
r.incrbyfloat('age',amount=2.5) # 32.5
# decr(self, name, amount=1) 自减
# r.decr('age',amount=2)
# append(key, value):在key值后面加上value
r.append('name',',我喜欢你')
print(str(r.get('name'),'utf-8'))
3.2 hash操作
对提供的字符串等进行hash操作,我们就可以根据得到的hash值去标记桶,只要hash值相同,就往这个桶里扔
# hset(key,field,value):将哈希表 key 中的字段 field 的值设为 value 。
r.hset('myhash','color','yellow')
r.hset('myhash','length','5')
r.hset('myhash','weight','20')
# hash表中批量设置键值对
r.hmset('myhash', {'country':'china','address':'四川成都'})
# 在hash表中根据key获取value
v = r.hget('myhash','address')
# 在hash表中批量获取
v = r.hmget('myhash',['color','length','length','weight'])
# hgetall(name):获取该hash表中所有value
v = r.hgetall('myhash')
# hlen(name) 获取hash表对应的hash中键值对的个数
print(r.hlen('myhash'))
# hkeys(name): 获取name对应的hash中所有的key的值
# hvals(name): 获取name对应的hash中所有的value的值
r.hkeys('myhash')
r.hvals('myhash')
# hexists(name, key):检查name对应的hash是否存在当前传入的key
r.hexists('myhash','color')
# hdel(name,*keys)将name对应的hash中指定key的键值对删除
r.hdel('myhahs',['k1','k2'])
# hincrby(name,key,amount=1):自增name对应的hash中的指定key的值,不存在则创建key=amount
# hincrbyfloat(name, key, amount=1.0) 同理
# hscan(name, cursor=0, match=None, count=None)
# 量式迭代获取,对于数据大的数据非常有用,hscan可以实现分片的获取数据,并非一次性将数据全部获取完,从而放置内存被撑爆
# name,redis的name
# cursor,游标(基于游标分批取获取数据)
# match,匹配指定key,默认None 表示所有的key
# count,每次分片最少获取个数,默认None表示采用Redis的默认分片个数
# 如:
# 第一次:cursor1, data1 = r.hscan('xx', cursor=0, match=None, count=None)
# 第二次:cursor2, data1 = r.hscan('xx', cursor=cursor1, match=None, count=None)
# ...
# 直到返回值cursor的值为0时,表示数据已经通过分片获取完毕
# hscan_iter(name, match=None, count=None):利用yield封装hscan创建生成器,实现分批去redis中获取数据
# match,匹配指定key,默认None 表示所有的key
# count,每次分片最少获取个数,默认None表示采用Redis的默认分片个数
3.3 list操作
r = redis.Redis(host="192.168.3.40",port=6379)
# lpush(name,values):在name对应的list中添加元素,每个新的元素都添加到列表的最左边
# rpush(name, values) 表示从右向左操作
# r.lpush('numList',11,22,33)
# r.rpush('numList',44,55,66)
# lpushx(name,value):
# rpushx(name, value) 表示从右向左操作
# r.lpushx('numList',11)
# llen(name):name对应的list元素的个数
v = r.llen('numList')
'''
linsert(name, where, refvalue, value)):在name对应的列表的某一个值前或后插入一个新值
# 参数:
name,redis的name
where,BEFORE或AFTER
refvalue,标杆值,即:在它前后插入数据
value,要插入的数据
'''
'''
r.lset(name, index, value):# r.lset(name, index, value):
# name,redis的name
# index,list的索引位置
# value,要设置的值
'''
r.lset('foo',2,'hah')
# r.lrem(name, value, num):在name对应的list中删除指定的值
# name,redis的name
# value,要删除的值
# num, num=0,删除列表中所有的指定值;
# num=2,从前到后,删除2个;
# num=-2,从后向前,删除2个
# lpop(name):在name对应的列表的左侧获取第一个元素并在列表中移除,返回值则是第一个元素
# rpop:从右往左操作
# lindex(name, index):在name对应的列表中根据索引获取列表元素
elements = r.lindex('numList',11)
# lrange(name, start, end)
# name,redis的name
# start,索引的起始位置
# end,索引结束位置
r.lrange('numList',0,2)
# ltrim(name, start, end):在name对应的列表中移除没有在start-end索引之间的值
r.ltrim('NUMlIST',2,3)
# rpoplpush(src, dst):从一个列表取出最右边的元素,同时将其添加至另一个列表的最左边
# src,要取数据的列表的name
# dst,要添加数据的列表的name
r.lpush('list1',20)
r.lpush('list2',10,30,40)
r.rpoplpush('list2','list1') # 结果就是[20 10]
# 从一个列表的右侧移除一个元素并将其添加到另一个列表的左侧
# src,取出并要移除元素的列表对应的name
# dst,要插入元素的列表对应的name
# r.brpoplpush('list1', 'list2', timeout=2)
# r.pop(src)从给定列表的末尾处取出一个元素,并从队列移除
# r.lpop('list2')
# 自定义增量迭代
def list_iter(r,name):
count = r.llen(name)
for index in range(count):
yield r.lindex(name,index)
def foreach(r,name):
elements = list_iter(r,name)
for item in elements:
print(str(item, "UTF-8") + " ")
foreach(r,'list1')
foreach(r,'list2')
3.4 set操作
'''
Set集合就是不允许重复的列表
'''
# sadd(name,values):向集合添加元素
r.sadd('文科','语文','数学','英语','文综')
r.sadd('理科','语文','数学','英语','理综')
# scard(name):获取name对应的集合中元素个数
print(r.scard('文科'))
# sdiff(keys, *args):在第一个name对应的集合中且不在其他name对应的集合的元素集合
elements = r.sdiff('文科','理科')
for i in elements:
print(str(i,"UTF-8"))
# sdiffstore(dest, keys, *args)获取第一个name对应的集合中且不在其他name对应的集合,再将其新加入到dest对应的集合中
r.sdiffstore('高一','文科','理科')
# sinter(keys, *args): 获取集合的交集
elements = r.sinter('文科','理科')
for i in elements:
print(str(i,"UTF-8"))
# sinterstore(dest, keys, *args):获取多个集合交集,然后放到一个新的目的集合
r.sinterstore('高一','文科','理科')
# sunion(keys, *args):获取多个集合的并集
r.sunion('文科','理科')
# sunionstore(dest,keys, *args):获取多一个name对应的集合的并集,并将结果保存到dest对应的集合中
r.sunionstore('高三','文科','理科')
# sismember(name, value): 检查value是否是name集合成员
r.sismember('文科','语文')
# smembers(name):获取某个集合所有成员
r.smembers('理科')
# smove(src, dst, value):将某一个成员从src移到dest集合中
r.smove('文科','理科','文综')
for i in r.smembers('理科'):
print(str(i,"UTF-8"))
# spop(name): 从集合的右侧(尾部)移除一个成员,并将其返回
r.spop('高一')
# srandmember(name, numbers):从name对应的集合中随机获取 numbers 个元素
r.srandmember('文科',2)
# 在name对应的集合中删除某些值
r.srem('高一', '语文','数学')
'''
用于增量迭代分批获取元素,避免内存消耗太大
sscan(name, cursor=0, match=None, count=None)
sscan_iter(name, match=None, count=None)
'''
elements = r.sscan('文科',count=2)
for item in elements[1]:
print(str(item,'UTF-8'))
3.5 有序集合
'''
有序集合,在集合的基础上,为每元素排序;元素的排序需要根据另外一个值来进行比较,
所以,对于有序集合,每一个元素有两个值,即:值和分数,分数专门用来做排序。
'''
# zadd(name, *args, **kwargs): 添加元素,且需要提供分数
r.zadd('animals','lions',1,'tiger',2,'bulk',3)
r.zadd('vegetables',tomato=1,eggplant=2)
# zcard(name):获取name对应的有序集合元素的数量
r.zcard('animals')
#zcount(name, min, max): 计算分数在[min,max]之间的元素的数量
r.zcount('animals',2,3)
# zincrby(name, value, amount):自增name对应的有序集合的 name 对应的分数
r.zincrby('vegetables','tomato',amount=40)
'''
可以按照范围获取数据
r.zrange( name, start, end, desc=False, withscores=False,score_cast_func=float)
# name,redis的name
# start,有序集合索引起始位置(非分数)
# end,有序集合索引结束位置(非分数),如果为-1,表示区全部数据,-2表示倒数第2为止
# desc,排序规则,默认按照分数从小到大排序
# withscores,是否获取元素的分数,默认只获取元素的值
# score_cast_func,对分数进行数据转换的函数
'''
#r.zadd('moutains','泰山',80,'黄山',85,'峨眉山',90,'衡山',75,'恒山',60,'武当山','75','青城山',70)
elements = r.zrange('moutains',start=0,end=-1,desc=True,score_cast_func=int)
'''
从大到小排序,不用我们自己指定是否降序
zrevrange(name, start, end, withscores=False, score_cast_func=float)
'''
elements = r.zrevrange('moutains',start=0,end=3,score_cast_func=int)
'''
zrangebyscore(name, min, max, start=None, num=None, withscores=False,score_cast_func=float)
按照分数范围获取name对应的有序集合的元素,自动排序
# name:redis的name
# min: 最小分数值
# max: 最大分数值
# start: 从分数的结果哪一个索引位开始
# num: 从索引位开始取多少个数据
'''
elements = r.zrangebyscore('moutains',min=75,max=100,start=1,num=2,score_cast_func=int)
'''
从大到小排序
zrevrangebyscore(name, max, min, start=None, num=None, withscores=False,score_cast_func=float)
'''
elements = r.zrevrangebyscore('moutains',min=75,max=100,start=1,num=2,score_cast_func=int)
# zrank(name, value) 获取某个值在 name对应的有序集合中的排行,升序
# zrevrank(name, value),从大到小排序,降序
rank = r.zrank('moutains','武当山')
rank = r.zrevrank('moutains','武当山')
# zrem(name,values):删除name对应的有序集合中值是values的成员
r.zrem('moutains',['武当山','青城山'])
# zremrangebyrank(name, min, max):根据rank进行范围删除
elements = r.zremrangebyrank('moutains',min=0,max=2)
# zremrangebyscore(name, min, max):根据分数范围删除
elements = r.zremrangebyrank('moutains',min=60,max=70)
# zscore(name, value):获取name对应有序集合中 value 对应的分数
r.zscore('moutains','峨眉山')
'''
zinterstore(dest, keys, aggregate=None)
# 获取两个有序集合的交集,如果遇到相同值不同分数,则按照aggregate进行操作
# aggregate的值为: SUM MIN MAX
zunionstore(dest, keys, aggregate=None)
# 获取两个有序集合的并集,如果遇到相同值不同分数,则按照aggregate进行操作
# aggregate的值为: SUM MIN MAX
'''
'''
zscan(name, cursor=0, match=None, count=None, score_cast_func=float)
zscan_iter(name, match=None, count=None,score_cast_func=float)
'''
3.6 其他命令
# 根据删除redis名字中的任意数据类型
r.delete('animal','animals')
# 检测redis的name是否存在
v = r.exists('animals')
# 根据正则获取redis的name
s = r.keys("*")
# 为某个redis的某个name设置超时时间
r.expire('mountains',1000)
# 对redis的name重命名为
r.rename('animal', 'animals')
# move(name, db))将redis的某个值移动到指定的db下
# 随机获取一个redis的name(不删除)
r.randomkey()
# type(name):获取name对应值的类型
3.7 管道
Redis python每一次执行请求都会在连接池申请连接,和归还连接,如果想要在一个请求中执行多个命令,你可以使用piepline,并且默认情况下一次pipline 是原子性操作。
pool = redis.ConnectionPool(host='192.168.3.40', port=6379)
r = redis.Redis(connection_pool=pool)
pipe = r.pipeline(transaction=True)
pipe.set('name', 'alex')
pipe.lpush('elements',12,3,4,5)
pipe.execute()
3.8 发布订阅
class MessageQueue:
def __init__(self):
self.__conn = redis.Redis(host="192.168.3.40",port=6379)
self.subscribe_channel = 'music'
self.publish_channel = 'music'
def publish(self, msg):
self.__conn.publish(self.publish_channel, msg)
return True
def subscribe(self):
pub = self.__conn.pubsub()
pub.subscribe(self.subscribe_channel)
pub.parse_response()
return pub
mq = MessageQueue()
sub = mq.subscribe()
while True:
msg = sub.parse_response()
print(msg)
from db.tmp import MessageQueue
mq = MessageQueue()
mq.public('你好美')
四 RabbitMQ
import pika
'''
对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。
'''
class MQTools(object):
def getConnection(self):
params = pika.ConnectionParameters(host='192.168.3.40',port=22222)
conn = pika.BlockingConnection(params)
return conn
class Producer(object):
def handle(self):
tools = MQTools()
conn = tools.getConnection()
channel = conn.channel()
channel.queue_declare(queue='animals')
channel.basic_publish(exchange='',routing_key='animals',body='tiger')
conn.close()
class Consumer(object):
def handle(self):
tools = MQTools()
conn = tools.getConnection()
channel = conn.channel()
channel.queue_declare(queue='animals')
channel.basic_consume(self.callback,queue='animals',no_ack=True)
channel.start_consuming()
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
'''
1、acknowledgment 消息不丢失
no-ack = False,如果消费者遇到情况(its channelis closed, connection is
closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中
'''
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.3.40'))
channel = connection.channel()
channel.queue_declare(queue='animals')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
import time
time.sleep(10)
print('ok')
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(callback,queue='animals',no_ack=False)
print(' [*] Waiting formessages. To exit press CTRL+C')
channel.start_consuming()
'''
2 durable 消息不丢失
'''
# 生产者
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.3.40'))
channel = connection.channel()
# make message persistent
channel.queue_declare(queue='animal', durable=True)
channel.basic_publish(exchange='',routing_key='animal',
body='lions!',properties=pika.BasicProperties(delivery_mode=2, # make message persistent
))
print(" [x] Sent'Hello World!'")
connection.close()
# 消费者
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel = connection.channel()
# make message persistent
channel.queue_declare(queue='hello', durable=True)
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
import time
time.sleep(10)
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(callback,queue='hello',no_ack=False)
print(' [*] Waiting formessages. To exit press CTRL+C')
channel.start_consuming()
'''
3、消息获取顺序
默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取奇数序列的任务,消费者1去队列中
获取偶数序列的任务。channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照奇偶数排列
'''
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel = connection.channel()
# make message persistent
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
import time
time.sleep(10)
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,queue='hello',no_ack=False)
print(' [*] Waiting formessages. To exit press CTRL+C')
channel.start_consuming()
'''
4、发布订阅
发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。
所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。
'''
# 发布者
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.3.40'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',routing_key='',body=message)
print(" [x] Sent%r" % message)
connection.close()
# 订阅者
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.3.40'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',type='fanout')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs',queue=queue_name)
print(' [*] Waiting forlogs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(callback,queue=queue_name,no_ack=True)
channel.start_consuming()
'''
5、关键字发送
之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,
即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据关键字
判定应该将数据发送至指定队列。
'''
#生产者
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.3.40'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message)
print(" [x] Sent%r:%r" % (severity, message))
connection.close()
# 消费者
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.3.40'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage:%s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
for severity in severities:
channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=severity)
print(' [*] Waiting forlogs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,queue=queue_name,no_ack=True)
channel.start_consuming()
'''
6、模糊匹配
exchange type = topic
在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,
exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列
# 表示可以匹配 0 个或多个单词
* 表示只能匹配一个单词
'''
# 生产者
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.3.40'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',routing_key=routing_key,body=message)
print(" [x] Sent%r:%r" % (routing_key, message))
connection.close()
# 消费者
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.3.40'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',type='topic')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage:%s [binding_key]...\n" % sys.argv[0])
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs',queue=queue_name,routing_key=binding_key)
print(' [*] Waiting forlogs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,queue=queue_name,no_ack=True)
channel.start_consuming()
Python之队列和数据库相关推荐
- python如何删除mysql数据库_python删除数据mysql数据库连接
Python学习之旅:访问MySQL数据库 Python学习之旅:访问MySQL数据库 MySQL是Web世界中使用最广泛的数据库服务器.为服务器端设计的数据库,能承受高并发访问. python如何使 ...
- python有序队列_Python 队列
所谓队列 队列是有序集合,添加操作发生在"尾部",移除操作则发生在"头部". 新元素从尾部进入 队列,然后一直向前移动到头部,直到成为下一个被移除的元素. 新添 ...
- python资料库-Python对接六大主流数据库(只需三步)
作为近两年来最火的编程语言的python,受到广大程序员的追捧必然是有其原因的,如果要挑出几点来讲的话,第一条那就python语法简洁,易上手,第二条呢? 便是python有着极其丰富的第三方的库. ...
- python资料库-Python对接六大主流数据库,只需三步
作为近两年来最火的编程语言的python,受到广大程序员的追捧必然是有其原因的,如果要挑出几点来讲的话,第一条那就python语法简洁,易上手,第二条呢? 便是python有着极其丰富的第三方的库. ...
- python资料库-Python操作三大主流数据库
学会使用的技术栈:python flask redis mongoDB mysql 第1章 数据库简介 简单介绍Mysql.数据库简介.导学篇 第2章 mysql基础 XAMPP 集成好的 最流行的P ...
- 【转载】Python对接六大主流数据库,只需三步
转载网址: https://developer.51cto.com/art/201907/600444.htm Mysql 安装pymysql Oracle python中对接oracle数据库,使用 ...
- Python应用实战系列-如何通过Python来操作Oracle数据库:cx_Oracle
最近需要将一批数据从csv文件中迁移到Oracle数据库中,打算用Python来实现,趁着这个机会,写一篇博客学习总结一些如何使用Python来操作Oracle数据库. 1 安装与导入 Python操 ...
- python 实现队列功能 queue insert() pop()
def calculate_detection_num(self, calcu_list, detect_num):"""计算一段次数内平均识别个数"" ...
- MongoDB数据库(8.Python中使用mongodb数据库以及pymongo模块用法)
在Python中使用MongoDB数据库,首先要下载pymongo模块 直接在命令行 pip install pymongo 就可以了 Python中使用pymongo模块对MongoDB数据 ...
最新文章
- 如何在dnn禁止复制的问题
- poj 1474 Video Surveillance - 求多边形有没有核
- JavaScript面向对象编程(1)-- 基础
- 30种优化查询速度的方法
- linux内核路由反向检查,反向路径过滤——reverse path filter
- 【Hibernate】could not instantiate class.. from tuple] with root cause
- Spring构造注入重载
- rds 如何学习数据库_如何将本地数据库迁移到云数据库 RDS 上?
- 详解HTML5网页结构
- 海龟交易法则06_掌握优势
- windows创建服务删除服务
- Arduino UNO测试BME280温湿度气压传感器
- C#开发实战1200例(第II卷)目录
- 日记侠:你的文章为什么阅读量会那么高?
- 电场刺激响应性和AIE水凝胶/调控发射波长及亮度AIE微球/AIE糖肽聚合物的研究
- 测试高考体育成绩的软件,高考体育成绩查询
- 国产男装「升级潮」下,九牧王、劲霸、海澜之家们顺利「上分」了吗?
- 海盗分赃问题-----简化问题,分而治之
- js 中国标准时间,时间戳 ,yyyy-mm-dd格式之前相互转换
- php 豆瓣isbn接口,ISBN书号查询
热门文章
- 山西专科学校计算机专业排名,河南单招计算机专业专科学校排名
- 在js中访问html页面,javascript – 在IE9的html页面中访问js里面的全局函数
- JavaScript变量高级定义之Object.defineProperty()方法讲解
- tomcat 在WIN10 上运行出现500错误的解决方法
- php包含文件不存在,PHP包含文件错误,服务器有该文件,直接访问提示不存在
- mysql binlog查看工具_数据同步工具otter(一)谈谈binlog和canal
- oracle ogg 12安装,Oracle GoldenGate Studio 12.2.1.3安装
- Python快速读取文件中指定的一行或多行
- python根据行名称生成二维数组
- 创建目录_聊聊Word创建目录那些事儿