一、上下文管理

import  contextlib
@contextlib.contextmanager
def work_state(state_list,worker_thread):state_list.append(worker_thread)try:yieldfinally:state_list.remove(worker_thread)
free_list=[]
current_thread="alex"
with work_state(free_list,current_thread):print(123)print(456)#以下为执行结果:
123
456

代码执行步骤

上下文用于需要 close()方法的模块

import  contextlib
import  socket@contextlib.contextmanager
def context_socket(host,port):sk=socket.socket()sk.bind((host,port))sk.listen(5)try:yield skfinally:sk.close()
with context_socket('127.0.0.1',8888) as sock:print(sock)#以下为执行结果:
<socket.socket fd=224, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8888)>

二、redis 发布订阅

#redis2.py 主程序import  redis
class RedisHelper:def __init__(self):self.__conn=redis.Redis(host='192.168.11.87')def public(self,msg,chan):self.__conn.publish(chan,msg)return  Truedef subscribe(self,chan):pub=self.__conn.pubsub()pub.subscribe(chan)pub.parse_response()return  pub

订阅

import redis2obj= redis2.RedisHelper()
data=obj.subscribe('fm111.7')
print(data.parse_response())#接收到发布信息:
[b'message', b'fm111.7', b'aaaaaa']

发布

import redis2obj= redis2.RedisHelper()
obj.public('alex_db','f111.7')

三、RabbitMQ

import pika#生产者 发布
connection =pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.87'))channel = connection.channel()
channel.queue_declare(queue='hello_wuwenyu')                 #创建队列,存在则忽略
channel.basic_publish(exchange='', routing_key='hello_wuwenyu', body='Hello World') print("[x] Sent 'Hello World!'") connection.close
import pika#消费者 订阅
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.87'))
channel = connection.channel()
channel.queue_declare(queue='hello_wuwenyu')  #
def callback(ch,method,properties,body):print(" [x] Received %r" % body)
channel.basic_consume(callback,queue='hello_wuwenyu',no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()#接收到生产者发来的消息:
[*] Waiting for messages. To exit press CTRL+C
[x] Received b'Hello World'

  2 exchange 绑定多个队列

#

import pika#生产者 发布
import pika
import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.87'))
channel = connection.channel()channel.exchange_declare(exchange='logs_fanout',type='fanout')message = '456'
channel.basic_publish(exchange='logs_fanout',routing_key='',body=message)
print(" [x] Sent %r" % message)
connection.close()

import pika#订阅
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.87'))
channel = connection.channel()channel.exchange_declare(exchange='logs_fanout',type='fanout')# 随机创建队列
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
# 绑定
channel.queue_bind(exchange='logs_fanout',queue=queue_name)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):print(" [x] %r" % body)channel.basic_consume(callback,queue=queue_name,no_ack=True)channel.start_consuming()#执行多次消费端,随机产生多个队列,每个队列都接收到消息:
[*] Waiting for logs. To exit press CTRL+C[x] b'456'

关键字

#生产者  severity = 'info'      severity = 'errer'   执行两次
import pika
import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.87'))
channel = connection.channel()channel.exchange_declare(exchange='direct_logs_wuwenyu',type='direct')severity = 'info'     # severity = 'errer'
message = '123'
channel.basic_publish(exchange='direct_logs_wuwenyu',routing_key=severity,body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

#订阅 消费 客户端1
import pika
import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.87'))
channel = connection.channel()channel.exchange_declare(exchange='direct_logs_wuwenyu',type='direct')result = channel.queue_declare(exclusive=True)
queue_name = result.method.queueseverities =  ['error','info','warning']for severity in severities:channel.queue_bind(exchange='direct_logs_wuwenyu',queue=queue_name,routing_key=severity)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):print(" [x] %r:%r" % (method.routing_key, body))channel.basic_consume(callback,queue=queue_name,no_ack=True)channel.start_consuming()#接受到的消息:[*] Waiting for logs. To exit press CTRL+C[x] 'error':b'123'[x] 'info':b'123'

#订阅 消费 客户端2
import pika
import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.87'))
channel = connection.channel()channel.exchange_declare(exchange='direct_logs_wuwenyu',type='direct')result = channel.queue_declare(exclusive=True)
queue_name = result.method.queueseverities =  ['error',]for severity in severities:channel.queue_bind(exchange='direct_logs_wuwenyu',queue=queue_name,routing_key=severity)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):print(" [x] %r:%r" % (method.routing_key, body))channel.basic_consume(callback,queue=queue_name,no_ack=True)channel.start_consuming()
#接受到的消息:[*] Waiting for logs. To exit press CTRL+C[x] 'info':b'123'

四、SQLAlchemy

SQLAlchemy是Python编程语言下的一款ORM框架,该框架建立在数据库API之上,使用关系对象映射进行数据库操作,简言之便是:将对象转换成SQL,然后使用数据API执行SQL并获取执行结果。

Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如:

MySQL-Python
mysql+mysqldb://:@[:]/pymysql
mysql+pymysql://:@/[?]MySQL-Connector
mysql+mysqlconnector://:@[:]/cx_Oracle
oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]更多详见:http://docs.sqlalchemy.org/en/latest/dialects/index.html

步骤一:

使用 Engine/ConnectionPooling/Dialect 进行数据库操作,Engine使用ConnectionPooling连接数据库,然后再通过Dialect执行SQL语句。

#!/usr/bin/env python
# -*- coding:utf-8 -*-fromsqlalchemy importcreate_engineengine =create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5)engine.execute(
"INSERT INTO ts_test (a, b) VALUES ('2', 'v1')"
)engine.execute(
"INSERT INTO ts_test (a, b) VALUES (%s, %s)",
((555, "v1"),(666, "v1"),)
)
engine.execute(
"INSERT INTO ts_test (a, b) VALUES (%(id)s, %(name)s)",
id=999, name="v1"
)result =engine.execute('select * from ts_test')
result.fetchall()

事务操作

注:查看数据库连接:show status like 'Threads%';

步骤二:

使用 Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 进行数据库操作。Engine使用Schema Type创建一个特定的结构对象,之后通过SQL Expression Language将该对象转换成SQL语句,然后通过 ConnectionPooling 连接数据库,再然后通过 Dialect 执行SQL,并获取结果。

#!/usr/bin/env python
# -*- coding:utf-8 -*-fromsqlalchemy importcreate_engine, Table, Column, Integer, String, MetaData, ForeignKeymetadata =MetaData()user =Table('user', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(20)),
)color =Table('color', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(20)),
)
engine =create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5)metadata.create_all(engine)
# metadata.clear()
# metadata.remove()

增删改查

更多内容详见:

http://www.jianshu.com/p/e6bba189fcbd

http://docs.sqlalchemy.org/en/latest/core/expression_api.html

注:SQLAlchemy无法修改表结构,如果需要可以使用SQLAlchemy开发者开源的另外一个软件Alembic来完成。

步骤三:

使用 ORM/Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 所有组件对数据进行操作。根据类创建对象,对象转换成SQL,执行SQL。

#!/usr/bin/env python
# -*- coding:utf-8 -*-fromsqlalchemy.ext.declarative importdeclarative_base
fromsqlalchemy importColumn, Integer, String
fromsqlalchemy.orm importsessionmaker
fromsqlalchemy importcreate_engineengine =create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5)Base =declarative_base()classUser(Base):
__tablename__ ='users'
id=Column(Integer, primary_key=True)
name =Column(String(50))# 寻找Base的所有子类,按照子类的结构在数据库中生成对应的数据表信息
# Base.metadata.create_all(engine)Session =sessionmaker(bind=engine)
session =Session()# ########## 增 ##########
# u = User(id=2, name='sb')
# session.add(u)
# session.add_all([
#     User(id=3, name='sb'),
#     User(id=4, name='sb')
# ])
# session.commit()# ########## 删除 ##########
# session.query(User).filter(User.id > 2).delete()
# session.commit()# ########## 修改 ##########
# session.query(User).filter(User.id > 2).update({'cluster_id' : 0})
# session.commit()
# ########## 查 ##########
# ret = session.query(User).filter_by(name='sb').first()# ret = session.query(User).filter_by(name='sb').all()
# print ret# ret = session.query(User).filter(User.name.in_(['sb','bb'])).all()
# print ret# ret = session.query(User.name.label('name_label')).all()
# print ret,type(ret)# ret = session.query(User).order_by(User.id).all()
# print ret# ret = session.query(User).order_by(User.id)[1:3]
# print ret
# session.commit()

  

  

  

转载于:https://www.cnblogs.com/wudalang/p/5700242.html

上下文管理、redis发布订阅、RabbitMQ发布订阅、SQLAlchemy相关推荐

  1. 上下文管理、线程池、redis订阅和发布

    一:上下文管理: 对于一些对象在使用之后,需要关闭操作的.比如说:socket.mysql数据库连接.文件句柄等. 都可以用上下文来管理. 语法结构: 1 Typical usage: 2 3 @co ...

  2. Redis 进阶篇:发布订阅模式原理与运用

    Redis 通过 SUBSCRIBE,UNSUBSCRIBE和 PUBLISH 实现发布订阅消息传递模式,Redis 提供了两种模式实现,分别是「发布 / 订阅到频道」和「发布 \ 订阅到模式」. [ ...

  3. python redis订阅_python实现 redis订阅与发布

    订阅者可以订阅一个或多个频道,发布者向一个频道发送消息后,所有订阅这个频道的订阅者都将收到消息,而发布者也将收到一个数值,这个数值是收到消息的订阅者的数量.订阅者只能收到自它开始订阅后发布者所发布的消 ...

  4. nodejs redis 发布订阅_SpringBoot整合Redis,怎么实现发布/订阅?

    一.简介 1.发布订阅 SUBSCRIBE, UNSUBSCRIBE 和 PUBLISH 实现了 发布/订阅消息范例,发送者 (publishers) 不用编程就可以向特定的接受者发送消息 (subs ...

  5. redis之mq实现发布订阅模式

    https://github.com/smltq/spring-boot-demo/blob/master/mq-redis 概述 Redis不仅可作为缓存服务器,还可用作消息队列,本示例演示如何使用 ...

  6. Redis源码剖析(五)订阅与发布

    Redis提供了订阅和发布的功能,允许客户端订阅一个或多个频道,当其他客户端向某个频道发送消息时,服务器会将消息转发给所有订阅该频道的客户端 这一点有点像群聊的功能,一个客户端将消息发往群中(向某个频 ...

  7. StackExchange.Redis学习笔记(五) 发布和订阅

    StackExchange.Redis学习笔记(五) 发布和订阅 原文:StackExchange.Redis学习笔记(五) 发布和订阅 Redis命令中的Pub/Sub Redis在 2.0之后的版 ...

  8. redis(18)--发布和订阅

    目录 频道的订阅与退订 订阅频道 退订频道 模式的订阅与退订 订阅模式 退订模式 发送消息 查阅订阅信息 Redis 通过 PUBLISH . SUBSCRIBE,PSUBSCRIBE 等命令实现了订 ...

  9. Redis实现消息队列和订阅发布模式

    转载:https://www.cnblogs.com/qlqwjy/p/9763754.html 在项目中用到了redis作为缓存,再学习了ActiveMq之后想着用redis实现简单的消息队列,下面 ...

最新文章

  1. 学习javascript 非常好的博客
  2. background 距离右边固定距离
  3. 趣学python3(47)--尾递归
  4. 无法打开输入文件mysql_错误LNK1181,pip安装“无法打开输入文件”mysqlclient.lib'...
  5. 框架实现修改功能的原理_JAVA集合框架的特点及实现原理简介
  6. 第一百三十二期:MySQL系列:一句SQL,MySQL是怎么工作的?
  7. 菜鸟教程android布局,Android菜鸟级教程
  8. 用汇编的眼光看C++(之class构造、析构)
  9. Algorithm:贪心策略之区间调度问题
  10. 再获全球顶会ASPLOS认可:阿里云神龙凭什么打破物理机神话
  11. rhel6下,mysql 5.6.14 主从复制(也称mysql AB复制)环境配置[基于binlog]
  12. python数据框添加一列无列名_Pandas只使用列名创建空数据框
  13. MaxCompute/DataWorks权限问题排查建议
  14. 读书篇:《细说PHP》四、数组
  15. 如何运用创客匠人微信小程序实现引流拓客?
  16. windows下删除不掉文件夹:找不到该项目无法删除文件夹?
  17. Hi3519A调试记录
  18. 怎么大量转换图片格式为tiff
  19. HTML自动填充表格,word文档中如何实现表格自动填充
  20. 移动硬盘插服务器上坏了,移动硬盘接口坏了怎么办解决教程

热门文章

  1. IDEA——常用基础设置
  2. linux绑定team网卡,linux 实现双网卡绑定单个IP——team篇
  3. php post请求后端拿不到值_Ajax 提交POST后,后台php 无法获取$POST值
  4. java 对象视图框架_Stripes视图框架Java对象属性验证和prototype.js Ajax的测试
  5. Matlab guide菜单+快捷菜单的使用
  6. 如何用手机打开dcm格式图片_如何防止自己的图片被盗用?这 4 招教你优雅加水印...
  7. php相隔几分钟变换随机数,PHP怎么固定随机出号几分钟时间再变?
  8. python用户登录_python用户登录系统
  9. android 服务器返回302,Android WebView 内处理302重定向不跳转的解决
  10. 解决 sessionStroage 无法在多个标签页共享数据的问题