一 数据库

Python可以和数据库进行交互,这里也有一些交互的模块,比如MySQLdb、pymysql等,但是3.x 已经不再支持MySQLdb,所以你安装的时候会报错

我们以pymysql为例子:

1.1 创建连接

pymysql有几种创建连接的方式

import pymysql
conn = pymysql.Connect(host="localhost",port=3306,user="root",password="123456",database="employee",charset
='utf8')
conn1 = pymysql.connect(host="localhost",port=3306,user="root",password="123456",database="ofbiz",charset=
'utf8')
conn2 = pymysql.Connection(host="localhost",port=3306,user="root",password="123456",database="ofbiz",charset=
'utf8')

其实connect==Connection=Connect

1.2 简单的例子

# 创建游标
cursor = conn.cursor()# 查询操作,并返回收影响行数
effect_row = cursor.execute("select * from empinfo")# 更新操作,并返回受影响行数
effect_row = cursor.execute("update empinfo set DeptID = 3 where empID = %s", (1,))# 创建多条记录,并返回受影响行数,执行多次
effect_row = cursor.executemany("insert into deptinfo(DeptName) values(%s)", [("财务部"),("公关部")])
print(effect_row)# 提交,不然无法保存新建或者修改的数据
conn.commit()# 关闭游标
cursor.close()
# 关闭连接
conn.close()

1.3 获取查询数据

utils = DBUtils()
conn = utils.getConnection(host,port,user,password,
db,charset)
cursor = conn.cursor()
# 查询数据
cursor.execute("SELECT invoice_id,invoice_type_id,party_id_from,party_id,status_id,invoice_date FROM invoice")
# 获取剩余结果的第一行数据
row_1 = cursor.fetchone()
# 获取剩余结果前n行数据
rows_n = cursor.fetchmany(3)# 获取剩余结果所有数据
rows_all = cursor.fetchall()
utils.foreach(rows_all)
utils.commitAndClose(conn,cursor)

1.4 获取新创建数据的自增ID

utils = DBUtils()
conn = utils.getConnection(host,port,user,password,
db,charset)
cursor = conn.cursor()
effect_row = cursor.executemany("insert into deptinfo(DeptName) values(%s)", [("市场部"), ("广告部")])
new_id = cursor.lastrowid
print(new_id)
utils.commitAndClose(conn,cursor)

1.5 移动游标

utils = DBUtils()
conn = utils.getConnection(host,port,user,password,db,charset)
cursor = conn.cursor()
# 查询数据
cursor.execute("SELECT invoice_id,invoice_type_id,party_id_from,party_id,status_id,invoice_date FROM invoice")
# 移动游标:在fetch数据时按照顺序进行,可以使用cursor.scroll(num, mode)来移动游标位置,如
# 从0开始移动2行数据
cursor.scroll(2,"absolute")
row_1 = cursor.fetchone()
# 结果应该是第三行的数据 8002
utils.foreach(row_1)
# 从当前游标8003开始往下移动2行数据,
cursor.scroll(2, "relative")
# 获取剩余结果的第一行数据,即8005
row_1 = cursor.fetchone()
utils.foreach(row_1)
utils.commitAndClose(conn,cursor)

1.6 fetch数据类型

fetch默认返回的是一个元组类型的数据,如果你希望返回字典类型数据,怎么办?

utils = DBUtils()
conn = utils.getConnection(host,port,user,password,
db,charset)
cursor = conn.cursor()
# 游标设置为字典类型
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
# 查询数据
cursor.execute("SELECT invoice_id,invoice_type_id,party_id_from,party_id,status_id,invoice_date FROM invoice")
row_1 = cursor.fetchone()
utils.foreachMap(row_1)
utils.commitAndClose(conn,cursor)

1.7 调用存储过程

utils = DBUtils()
conn = utils.getConnection(host,port,user,password,db,charset)
cursor = conn.cursor()
# 游标设置为字典类型
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
# 调用无参存储过程  等价于cursor.execute("call p2()")
cursor.callproc('p2')
# 调用有参存储过程
cursor.callproc('p1', args=("nicky",28))
# 获取执行完存储的参数,参数@开头
cursor.execute("select @p1,@_p1_1,")
row_1 = cursor.fetchone()
utils.foreachMap(row_1)
utils.commitAndClose(conn,cursor)

1.8 防止pymysql SQL 注入

第一:字符串拼接造成的注入

比如:

utils = DBUtils()
conn = utils.getConnection(host,port,user,password,db,charset)
cursor = conn.cursor()
user = "jacky' or '1'-- "
deptno = "u1pass"
sql = "select username,job from empinfo where username='%s' and deptno='%s'" % (user, deptno)# 拼接语句被构造成下面这样,永真条件,此时就注入成功了。因此要避免这种情况需使用pymysql提供的参数化查询。
# SELECT username,job From empinfo WHERE username='jacky' or 1'-- and deptno='%s'" % (user, deptno)
row_count = cursor.execute(sql)
row_1 = cursor.fetchone()
utils.commitAndClose(conn,cursor)

解决办法: 使用pymysql提供的参数化查询

utils = DBUtils()
conn = utils.getConnection(host,port,user,password,db,charset)
cursor = conn.cursor()
user = "jacky' or '1'-- "
deptno = "u1pass"
# 避免注入,使用pymysql提供的参数化语句
row_count = cursor.execute("select username,job from empinfo where username=%s and deptno=%s" % (user, deptno))
row_1 = cursor.fetchone()
utils.commitAndClose(conn,cursor)

它在内部解析 的时候,会对特殊进行转义

第二:使用存mysql储过程动态执行SQL防注入

 

1.9 使用with简化连接过程

每次都连接关闭很麻烦,使用上下文管理,简化连接过程

 

import pymysql
import contextlib
#定义上下文管理器,连接后自动关闭连接
@contextlib.contextmanager
def mysql(host, port, user, password, db, charset='utf8'):conn = pymysql.connect(host=host, port=port, user=user, password=password, db=db, charset=charset)cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)try:yield cursorfinally:conn.commit()cursor.close()conn.close()# 执行sql
with mysql() as cursor:print(cursor)row_count = cursor.execute("select * from invoice")row_1 = cursor.fetchone()print(row_count, row_1)
 
 
二 Memcached 相关python操作

import memcache

class MemcachedUtils(object):
    def __init__(self,hostname,port):
        self.hostname = hostname
        self.port = port

def getMemcachedClient(self,debug):
        #{'192.168.3.40:11211'} 或者['192.168.3.40:11211s']
        # _servers = {"%s:%s"%(self.hostname,self.port)}
       
_servers = ["%s:%s" % (self.hostname, self.port)]
        c = memcache.Client(_servers,debug=debug)
        return c

tools = MemcachedUtils("192.168.3.40",11211)
client = tools.getMemcachedClient(True)

# add(k,v):添加键值对, 不能添加重复的key,否则报错
# client.add('foo','bar')

# replace():更新键值对,如果key存在则更新,不存在则异常
client.replace('foo','balance')

# set(k,v):设置键值对,相当于添加,和add的区别在于,它不存在则添加,存在则修改
client.set('dname','科技部')

# set_multiu({k1:v1,k2:v2}):设置多个键值对,它不存在则添加,存在则修改
client.set_multi({'food':'noodle','foo':'hah'})

# append(k): 修改指定key的值,在该值后面追加内容
client.append('foo','_end')
# prepend(k):修改指定key的值,在该值前面插入内容
client.prepend('foo','start_')

# incr(k): 根据k值使得对应值加1
client.incr('num')
# decr(k): 根据k值使得对应值减1
client.decr('num')

# delete(k): 根据key删除键值对
client.delete('foo')
# delete([k1,k2]): 根据多个key,删除多个键值对
client.delete_multi('x','y')

# get(k):查询键值对
client.get('foo')
# get_multi([k1,k2]):批量查询
client.get(['k1','k2'])

'''
每次执行gets时,会从memcache中获取一个自增的数字,通过cas去修改gets的值时,
会携带之前获取的自增值和memcache中的自增值进行比较,如果相等,则可以提交,如
果不想等,那表示在gets和cas执行之间,又有其他人执行了gets(获取了缓冲的指定值
),如此一来有可能出现非正常数据,则不允许修改。
'''
v = client.gets('product_count')
# ...
# 如果有人在gets之后和cas之前修改了product_count,那么,下面的设置将会执行失败,剖出异常,从而避免非正常数据的产生
client.cas('product_count', "899")

 
三 Redis

import redis

'''
redis-py提供两个类Redis和StrictRedis用于实现Redis的命令,StrictRedis用于实现大部分官方的命令,
并使用官方的语法和命令,Redis是StrictRedis的子类,用于向后兼容旧版本的redis-py。
'''

r = redis.Redis(host="192.168.3.40",port=6379)
r.set('foo','bar')
print(r.get('foo'))

'''
redis 连接池
redis-py使用connection pool来管理对一个redis server的所有连接,避免每次建立、释放连接的开销。默认,
每个Redis实例都会维护一个自己的连接池。可以直接建立一个连接池,然后作为参数Redis,
这样就可以实现多个Redis实例共享一个连接池。
'''
pool = redis.ConnectionPool(host='192.168.3.40', port=6379)
r = redis.Redis(connection_pool=pool)
r.set('foo', 'bar1')
print(str(r.get('foo'),'utf8'))

3.1 string操作

'''
set(name, value, ex=None, px=None, nx=False, xx=False)
ex:过期时间(秒)
px: 过期时间(毫秒)
nx: 当只有Name不存在时,当前set操作才执行
xx: 只有当Name存在时,才执行当前操作
'''
r.set('foo','bar2',ex=1,xx=True)
# 结果为bar2
print
(str(r.get('foo'),'utf8'))

r.set('foo','bar3',ex=1,nx=True)
# 结果为bar2
print
(str(r.get('foo'),'utf8'))

'''
setnx(name,value): 不能存在才修改
setex(name,value,time): 设置过期时间(秒)
psetex(name, time_ms, value):设置过期时间(毫秒)
mset(*args, **kwargs): 批量设置值
mget:批量获取
mget('key1','key2') or mget(['key1','key2'])
getset(name, value): 设置新的值并获取原来的值
getrange(key, start, end)
'''
r.setnx('foo','bar3')
# 还是打印bar1,因为foo已经存在
print
(str(r.get('foo'),'utf8'))

r.setex('address','NewYork Royal Road #4',1)
print(str(r.get('address'),'utf8'))

r.psetex('message',1000,'haha')
print(str(r.get('message'),'utf8'))

r.mset(name="Nicky",age=28,gender="male",hobby=["booking","sporting"])
r.mset({'key1':'value1','key2':'value2'})

resultList = r.mget(['name','age'])
resultList = r.mget('gender','hobby')
for result in resultList:
    print(str(result,'utf8'))

x = r.getset('foo','new bar')
print(str(x,'utf8'))

 

# getrange(name,start,end): 根据字节获取子序列
r.set('name','郑涵月')
v = r.getrange('name',0,2) # 郑

# setrange(name, offset, value):修改字符串内容,从指定字符串索引开始向后替换(新值太长时,则向后添加)
r.setrange('name',3,'欣寒')
v = r.get('name')

# strlen(name): 返回name对应值的长度
len = r.strlen('name') # 9

# incr(self, name, amount=1):自增
r.set('age','28')
r.incr('age',amount=2) # age=30

# incrbyfloat(self, name, amount=1.0):浮点数自增
r.incrbyfloat('age',amount=2.5) # 32.5

# decr(self, name, amount=1) 自减
# r.decr('age',amount=2)

# append(key, value):在key值后面加上value
r.append('name',',我喜欢你')
print(str(r.get('name'),'utf-8'))

 
3.2 hash操作
对提供的字符串等进行hash操作,我们就可以根据得到的hash值去标记桶,只要hash值相同,就往这个桶里扔

# hset(key,field,value):将哈希表 key 中的字段 field 的值设为 value 。
r.hset('myhash','color','yellow')
r.hset('myhash','length','5')
r.hset('myhash','weight','20')

# hash表中批量设置键值对
r.hmset('myhash', {'country':'china','address':'四川成都'})

# 在hash表中根据key获取value
v = r.hget('myhash','address')

# 在hash表中批量获取
v = r.hmget('myhash',['color','length','length','weight'])

# hgetall(name):获取该hash表中所有value
v = r.hgetall('myhash')

# hlen(name) 获取hash表对应的hash中键值对的个数
print
(r.hlen('myhash'))

# hkeys(name): 获取name对应的hash中所有的key的值
# hvals(name): 获取name对应的hash中所有的value的值
r.hkeys('myhash')
r.hvals('myhash')

# hexists(name, key):检查name对应的hash是否存在当前传入的key
r.hexists('myhash','color')

# hdel(name,*keys)将name对应的hash中指定key的键值对删除
r.hdel('myhahs',['k1','k2'])

# hincrby(name,key,amount=1):自增name对应的hash中的指定key的值,不存在则创建key=amount
# hincrbyfloat(name, key, amount=1.0) 同理

# hscan(name, cursor=0, match=None, count=None)
# 量式迭代获取,对于数据大的数据非常有用,hscan可以实现分片的获取数据,并非一次性将数据全部获取完,从而放置内存被撑爆
# name,redis的name
# cursor,游标(基于游标分批取获取数据)
# match,匹配指定key,默认None 表示所有的key
# count,每次分片最少获取个数,默认None表示采用Redis的默认分片个数
# 如:
# 第一次:cursor1, data1 = r.hscan('xx', cursor=0, match=None, count=None)
# 第二次:cursor2, data1 = r.hscan('xx', cursor=cursor1, match=None, count=None)
# ...
# 直到返回值cursor的值为0时,表示数据已经通过分片获取完毕

# hscan_iter(name, match=None, count=None):利用yield封装hscan创建生成器,实现分批去redis中获取数据
# match,匹配指定key,默认None 表示所有的key
# count,每次分片最少获取个数,默认None表示采用Redis的默认分片个数

 
3.3 list操作

r = redis.Redis(host="192.168.3.40",port=6379)

# lpush(name,values):在name对应的list中添加元素,每个新的元素都添加到列表的最左边
# rpush(name, values) 表示从右向左操作
# r.lpush('numList',11,22,33)
# r.rpush('numList',44,55,66)

# lpushx(name,value):
# rpushx(name, value) 表示从右向左操作
# r.lpushx('numList',11)

# llen(name):name对应的list元素的个数
v = r.llen('numList')

'''
linsert(name, where, refvalue, value)):在name对应的列表的某一个值前或后插入一个新值
# 参数:
name,redis的name
where,BEFORE或AFTER
refvalue,标杆值,即:在它前后插入数据
value,要插入的数据
'''

'''
r.lset(name, index, value):# r.lset(name, index, value):
# name,redis的name
    # index,list的索引位置
    # value,要设置的值
'''
r.lset('foo',2,'hah')

# r.lrem(name, value, num):在name对应的list中删除指定的值
# name,redis的name
    # value,要删除的值
    # num,  num=0,删除列表中所有的指定值;
           # num=2,从前到后,删除2个;
           # num=-2,从后向前,删除2个

# lpop(name):在name对应的列表的左侧获取第一个元素并在列表中移除,返回值则是第一个元素
# rpop:从右往左操作

# lindex(name, index):在name对应的列表中根据索引获取列表元素
elements = r.lindex('numList',11)

# lrange(name, start, end)
# name,redis的name
# start,索引的起始位置
# end,索引结束位置
r.lrange('numList',0,2)
# ltrim(name, start, end):在name对应的列表中移除没有在start-end索引之间的值
r.ltrim('NUMlIST',2,3)

# rpoplpush(src, dst):从一个列表取出最右边的元素,同时将其添加至另一个列表的最左边
# src,要取数据的列表的name
# dst,要添加数据的列表的name
r.lpush('list1',20)
r.lpush('list2',10,30,40)
r.rpoplpush('list2','list1') # 结果就是[20 10]

# 从一个列表的右侧移除一个元素并将其添加到另一个列表的左侧
# src,取出并要移除元素的列表对应的name
# dst,要插入元素的列表对应的name
# r.brpoplpush('list1', 'list2', timeout=2)

# r.pop(src)从给定列表的末尾处取出一个元素,并从队列移除
# r.lpop('list2')
# 自定义增量迭代
def list_iter(r,name):
    count = r.llen(name)
    for index in range(count):
        yield r.lindex(name,index)

def foreach(r,name):
    elements = list_iter(r,name)
    for item in elements:
        print(str(item, "UTF-8") + " ")
  
foreach(r,'list1')
foreach(r,'list2')

 
3.4 set操作

'''
Set集合就是不允许重复的列表
'''
# sadd(name,values):向集合添加元素
r.sadd('文科','语文','数学','英语','文综')
r.sadd('理科','语文','数学','英语','理综')

# scard(name):获取name对应的集合中元素个数
print
(r.scard('文科'))

# sdiff(keys, *args):在第一个name对应的集合中且不在其他name对应的集合的元素集合
elements = r.sdiff('文科','理科')
for i in elements:
    print(str(i,"UTF-8"))

# sdiffstore(dest, keys, *args)获取第一个name对应的集合中且不在其他name对应的集合,再将其新加入到dest对应的集合中
r.sdiffstore('高一','文科','理科')

# sinter(keys, *args): 获取集合的交集
elements = r.sinter('文科','理科')
for i in elements:
    print(str(i,"UTF-8"))

# sinterstore(dest, keys, *args):获取多个集合交集,然后放到一个新的目的集合
r.sinterstore('高一','文科','理科')

# sunion(keys, *args):获取多个集合的并集
r.sunion('文科','理科')

# sunionstore(dest,keys, *args):获取多一个name对应的集合的并集,并将结果保存到dest对应的集合中
r.sunionstore('高三','文科','理科')

# sismember(name, value): 检查value是否是name集合成员
r.sismember('文科','语文')

# smembers(name):获取某个集合所有成员
r.smembers('理科')

# smove(src, dst, value):将某一个成员从src移到dest集合中
r.smove('文科','理科','文综')

for i in r.smembers('理科'):
    print(str(i,"UTF-8"))

# spop(name): 从集合的右侧(尾部)移除一个成员,并将其返回
r.spop('高一')

# srandmember(name, numbers):从name对应的集合中随机获取 numbers 个元素
r.srandmember('文科',2)

# 在name对应的集合中删除某些值
r.srem('高一', '语文','数学')

'''
用于增量迭代分批获取元素,避免内存消耗太大
sscan(name, cursor=0, match=None, count=None)
sscan_iter(name, match=None, count=None)
'''
elements  = r.sscan('文科',count=2)
for item in elements[1]:
    print(str(item,'UTF-8'))

 
3.5 有序集合

'''
有序集合,在集合的基础上,为每元素排序;元素的排序需要根据另外一个值来进行比较,
所以,对于有序集合,每一个元素有两个值,即:值和分数,分数专门用来做排序。
'''

# zadd(name, *args, **kwargs): 添加元素,且需要提供分数
r.zadd('animals','lions',1,'tiger',2,'bulk',3)
r.zadd('vegetables',tomato=1,eggplant=2)

# zcard(name):获取name对应的有序集合元素的数量
r.zcard('animals')

#zcount(name, min, max): 计算分数在[min,max]之间的元素的数量
r.zcount('animals',2,3)

# zincrby(name, value, amount):自增name对应的有序集合的 name 对应的分数
r.zincrby('vegetables','tomato',amount=40)

'''
可以按照范围获取数据
r.zrange( name, start, end, desc=False, withscores=False,score_cast_func=float)
# name,redis的name
# start,有序集合索引起始位置(非分数)
# end,有序集合索引结束位置(非分数),如果为-1,表示区全部数据,-2表示倒数第2为止
# desc,排序规则,默认按照分数从小到大排序
# withscores,是否获取元素的分数,默认只获取元素的值
# score_cast_func,对分数进行数据转换的函数
'''
#r.zadd('moutains','泰山',80,'黄山',85,'峨眉山',90,'衡山',75,'恒山',60,'武当山','75','青城山',70)
elements = r.zrange('moutains',start=0,end=-1,desc=True,score_cast_func=int)

'''
从大到小排序,不用我们自己指定是否降序
zrevrange(name, start, end, withscores=False, score_cast_func=float)
'''
elements = r.zrevrange('moutains',start=0,end=3,score_cast_func=int)

'''
zrangebyscore(name, min, max, start=None, num=None, withscores=False,score_cast_func=float)
按照分数范围获取name对应的有序集合的元素,自动排序
# name:redis的name
# min: 最小分数值
# max: 最大分数值
# start: 从分数的结果哪一个索引位开始
# num: 从索引位开始取多少个数据
'''

elements = r.zrangebyscore('moutains',min=75,max=100,start=1,num=2,score_cast_func=int)

'''
从大到小排序
zrevrangebyscore(name, max, min, start=None, num=None, withscores=False,score_cast_func=float)
'''
elements = r.zrevrangebyscore('moutains',min=75,max=100,start=1,num=2,score_cast_func=int)

# zrank(name, value) 获取某个值在 name对应的有序集合中的排行,升序

# zrevrank(name, value),从大到小排序,降序
rank = r.zrank('moutains','武当山')
rank = r.zrevrank('moutains','武当山')

# zrem(name,values):删除name对应的有序集合中值是values的成员
r.zrem('moutains',['武当山','青城山'])

# zremrangebyrank(name, min, max):根据rank进行范围删除
elements = r.zremrangebyrank('moutains',min=0,max=2)

# zremrangebyscore(name, min, max):根据分数范围删除

elements = r.zremrangebyrank('moutains',min=60,max=70)

# zscore(name, value):获取name对应有序集合中 value 对应的分数
r.zscore('moutains','峨眉山')

'''
zinterstore(dest, keys, aggregate=None)
# 获取两个有序集合的交集,如果遇到相同值不同分数,则按照aggregate进行操作
# aggregate的值为:  SUM MIN  MAX

zunionstore(dest, keys, aggregate=None)
# 获取两个有序集合的并集,如果遇到相同值不同分数,则按照aggregate进行操作
# aggregate的值为:  SUM MIN  MAX
'''

'''
zscan(name, cursor=0, match=None, count=None, score_cast_func=float)
zscan_iter(name, match=None, count=None,score_cast_func=float)
'''

 
3.6 其他命令

# 根据删除redis名字中的任意数据类型
r.delete('animal','animals')

# 检测redis的name是否存在
v = r.exists('animals')

#  根据正则获取redis的name
s = r.keys("*")

# 为某个redis的某个name设置超时时间
r.expire('mountains',1000)

# 对redis的name重命名为
r.rename('animal', 'animals')

# move(name, db))将redis的某个值移动到指定的db下

# 随机获取一个redis的name(不删除)
r.randomkey()

# type(name):获取name对应值的类型

 
3.7 管道
Redis python每一次执行请求都会在连接池申请连接,和归还连接,如果想要在一个请求中执行多个命令,你可以使用piepline,并且默认情况下一次pipline 是原子性操作。

pool = redis.ConnectionPool(host='192.168.3.40', port=6379)
r = redis.Redis(connection_pool=pool)
pipe = r.pipeline(transaction=True)
pipe.set('name', 'alex')
pipe.lpush('elements',12,3,4,5)
pipe.execute()

 
3.8 发布订阅

class MessageQueue:

def __init__(self):
        self.__conn = redis.Redis(host="192.168.3.40",port=6379)
        self.subscribe_channel = 'music'
       
self.publish_channel = 'music'

def publish(self, msg):
        self.__conn.publish(self.publish_channel, msg)
        return True

def subscribe(self):
        pub = self.__conn.pubsub()
        pub.subscribe(self.subscribe_channel)
        pub.parse_response()
        return pub

mq = MessageQueue()
sub = mq.subscribe()
while True:
    msg = sub.parse_response()
    print(msg)

 
 
 

from db.tmp import MessageQueue

mq = MessageQueue()
mq.public('你好美')

 
 
四 RabbitMQ

import pika

'''
对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。
'''
class MQTools(object):
    def getConnection(self):
        params = pika.ConnectionParameters(host='192.168.3.40',port=22222)
        conn = pika.BlockingConnection(params)
        return conn

class Producer(object):
    def handle(self):
        tools = MQTools()
        conn = tools.getConnection()
        channel = conn.channel()
        channel.queue_declare(queue='animals')
        channel.basic_publish(exchange='',routing_key='animals',body='tiger')
        conn.close()

class Consumer(object):
    def handle(self):
        tools = MQTools()
        conn = tools.getConnection()
        channel = conn.channel()
        channel.queue_declare(queue='animals')
        channel.basic_consume(self.callback,queue='animals',no_ack=True)
        channel.start_consuming()

def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)

'''
1、acknowledgment 消息不丢失
no-ack = False,如果消费者遇到情况(its channelis closed, connection is
closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中
'''
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.3.40'))
channel = connection.channel()
channel.queue_declare(queue='animals')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    print('ok')
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,queue='animals',no_ack=False)
print(' [*] Waiting formessages. To exit press CTRL+C')
channel.start_consuming()

'''
2 durable   消息不丢失
'''
# 生产者
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.3.40'))
channel = connection.channel()

# make message persistent
channel.queue_declare(queue='animal', durable=True)
channel.basic_publish(exchange='',routing_key='animal',
                      body='lions!',properties=pika.BasicProperties(delivery_mode=2, # make message persistent
                     
))
print(" [x] Sent'Hello World!'")
connection.close()

# 消费者
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel = connection.channel()
# make message persistent
channel.queue_declare(queue='hello', durable=True)

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,queue='hello',no_ack=False)

print(' [*] Waiting formessages. To exit press CTRL+C')
channel.start_consuming()

'''
3、消息获取顺序
默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取奇数序列的任务,消费者1去队列中
获取偶数序列的任务。channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照奇偶数排列
'''

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel = connection.channel()

# make message persistent
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,queue='hello',no_ack=False)
print(' [*] Waiting formessages. To exit press CTRL+C')
channel.start_consuming()

'''
4、发布订阅
发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。
所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。
'''

# 发布者
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.3.40'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',routing_key='',body=message)
print(" [x] Sent%r" % message)
connection.close()

# 订阅者
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.3.40'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',type='fanout')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs',queue=queue_name)

print(' [*] Waiting forlogs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,queue=queue_name,no_ack=True)
channel.start_consuming()

'''
5、关键字发送
之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,
即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据关键字
 判定应该将数据发送至指定队列。
'''
#生产者
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.3.40'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message)
print(" [x] Sent%r:%r" % (severity, message))
connection.close()

# 消费者
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.3.40'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage:%s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=severity)

print(' [*] Waiting forlogs. To exit press CTRL+C')
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,queue=queue_name,no_ack=True)
channel.start_consuming()

'''
6、模糊匹配
exchange type = topic

在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,
exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列
# 表示可以匹配 0 个或多个单词
*  表示只能匹配一个单词
'''
# 生产者
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.3.40'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',routing_key=routing_key,body=message)
print(" [x] Sent%r:%r" % (routing_key, message))
connection.close()

# 消费者
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.3.40'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',type='topic')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage:%s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',queue=queue_name,routing_key=binding_key)

print(' [*] Waiting forlogs. To exit press CTRL+C')
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,queue=queue_name,no_ack=True)
channel.start_consuming()

 

Python之队列和数据库相关推荐

  1. python如何删除mysql数据库_python删除数据mysql数据库连接

    Python学习之旅:访问MySQL数据库 Python学习之旅:访问MySQL数据库 MySQL是Web世界中使用最广泛的数据库服务器.为服务器端设计的数据库,能承受高并发访问. python如何使 ...

  2. python有序队列_Python 队列

    所谓队列 队列是有序集合,添加操作发生在"尾部",移除操作则发生在"头部". 新元素从尾部进入 队列,然后一直向前移动到头部,直到成为下一个被移除的元素. 新添 ...

  3. python资料库-Python对接六大主流数据库(只需三步)

    作为近两年来最火的编程语言的python,受到广大程序员的追捧必然是有其原因的,如果要挑出几点来讲的话,第一条那就python语法简洁,易上手,第二条呢? 便是python有着极其丰富的第三方的库. ...

  4. python资料库-Python对接六大主流数据库,只需三步

    作为近两年来最火的编程语言的python,受到广大程序员的追捧必然是有其原因的,如果要挑出几点来讲的话,第一条那就python语法简洁,易上手,第二条呢? 便是python有着极其丰富的第三方的库. ...

  5. python资料库-Python操作三大主流数据库

    学会使用的技术栈:python flask redis mongoDB mysql 第1章 数据库简介 简单介绍Mysql.数据库简介.导学篇 第2章 mysql基础 XAMPP 集成好的 最流行的P ...

  6. 【转载】Python对接六大主流数据库,只需三步

    转载网址: https://developer.51cto.com/art/201907/600444.htm Mysql 安装pymysql Oracle python中对接oracle数据库,使用 ...

  7. Python应用实战系列-如何通过Python来操作Oracle数据库:cx_Oracle

    最近需要将一批数据从csv文件中迁移到Oracle数据库中,打算用Python来实现,趁着这个机会,写一篇博客学习总结一些如何使用Python来操作Oracle数据库. 1 安装与导入 Python操 ...

  8. python 实现队列功能 queue insert() pop()

    def calculate_detection_num(self, calcu_list, detect_num):"""计算一段次数内平均识别个数"" ...

  9. MongoDB数据库(8.Python中使用mongodb数据库以及pymongo模块用法)

    在Python中使用MongoDB数据库,首先要下载pymongo模块 直接在命令行   pip install pymongo   就可以了 Python中使用pymongo模块对MongoDB数据 ...

最新文章

  1. 如何在dnn禁止复制的问题
  2. poj 1474 Video Surveillance - 求多边形有没有核
  3. JavaScript面向对象编程(1)-- 基础
  4. 30种优化查询速度的方法
  5. linux内核路由反向检查,反向路径过滤——reverse path filter
  6. 【Hibernate】could not instantiate class.. from tuple] with root cause
  7. Spring构造注入重载
  8. rds 如何学习数据库_如何将本地数据库迁移到云数据库 RDS 上?
  9. 详解HTML5网页结构
  10. 海龟交易法则06_掌握优势
  11. windows创建服务删除服务
  12. Arduino UNO测试BME280温湿度气压传感器
  13. C#开发实战1200例(第II卷)目录
  14. 日记侠:你的文章为什么阅读量会那么高?
  15. 电场刺激响应性和AIE水凝胶/调控发射波长及亮度AIE微球/AIE糖肽聚合物的研究
  16. 测试高考体育成绩的软件,高考体育成绩查询
  17. 国产男装「升级潮」下,九牧王、劲霸、海澜之家们顺利「上分」了吗?
  18. 海盗分赃问题-----简化问题,分而治之
  19. js 中国标准时间,时间戳 ,yyyy-mm-dd格式之前相互转换
  20. php 豆瓣isbn接口,ISBN书号查询

热门文章

  1. 山西专科学校计算机专业排名,河南单招计算机专业专科学校排名
  2. 在js中访问html页面,javascript – 在IE9的html页面中访问js里面的全局函数
  3. JavaScript变量高级定义之Object.defineProperty()方法讲解
  4. tomcat 在WIN10 上运行出现500错误的解决方法
  5. php包含文件不存在,PHP包含文件错误,服务器有该文件,直接访问提示不存在
  6. mysql binlog查看工具_数据同步工具otter(一)谈谈binlog和canal
  7. oracle ogg 12安装,Oracle GoldenGate Studio 12.2.1.3安装
  8. Python快速读取文件中指定的一行或多行
  9. python根据行名称生成二维数组
  10. 创建目录_聊聊Word创建目录那些事儿