上下文管理、线程池、redis订阅和发布
一:上下文管理:
对于一些对象在使用之后,需要关闭操作的。比如说:socket、mysql数据库连接、文件句柄等。
都可以用上下文来管理。
语法结构:
1 Typical usage: 2 3 @contextmanager 4 def some_generator(<arguments>): 5 <setup> 6 try: 7 yield <value> 8 finally: 9 <cleanup> 10 11 This makes this: 12 13 with some_generator(<arguments>) as <variable>: 14 <body>
code:
1 import socket 2 import contextlib 3 4 5 @contextlib.contextmanager 6 def sock_server(host,port): 7 sk=socket.socket() 8 sk.bind((host,port)) 9 sk.listen(4) 10 try: 11 yield sk 12 finally: 13 sk.close() 14 15 with sock_server("127.0.0.1",22) as soc: 16 print(soc)
执行顺序:
解释:python从上到下依次解释:
1、当到with的时候,执行with内socket_server("127.0.0.1",22),跳到
2、被contextlib.contextmanager装饰的函数。
3、依次执行函数socket_server到yield 并把sk返回给第4步的sco变量
4、然后执行with下面的代码块,执行print语句。
5、当with语句的代码块执行完。跳到第3步的yeild。
6、执行finally语句里的代码块。
二:线程池(threadpool)
自己版本:
1 #!/bin/env python 2 #author:evil_liu 3 #date:2016-7-21 4 #description: thread pool 5 6 import threading 7 import time 8 import queue 9 10 class Thread_Poll: 11 ''' 12 功能:该类主要实现多线程,以及线程复用。 13 ''' 14 def __init__(self,task_num,max_size): 15 ''' 16 功能:该函数是初始化线程池对象。 17 :param task_num: 任务数量。 18 :param max_size: 线程数量。 19 :return:无。 20 ''' 21 self.task_num=task_num 22 self.max_size=max_size 23 self.q=queue.Queue(task_num)#设置任务队列的。 24 self.thread_list=[] 25 self.res_q=queue.Queue()#设置结果队列。 26 27 def run(self,func,i,call_back=None): 28 ''' 29 功能:该函数是线程池运行主函数。 30 :param func: 传入任务主函数。 31 :param *args: 任务函数参数,需要是元组形式。 32 :param call_back: 回调函数。 33 :return: 无。 34 ''' 35 if len(self.thread_list)<self.max_size:#如果目前线程数小于我们定义的线程的个数,进行创建。 36 self.creat_thread() 37 misson=(func,i,call_back)#往任务队列放任务。 38 self.q.put(misson) 39 40 def creat_thread(self): 41 ''' 42 功能:该函数主要是创建线程,并调用call方法。 43 :return: 无。 44 ''' 45 t=threading.Thread(target=self.call)#创建线程 46 t.start() 47 48 def call(self): 49 ''' 50 功能:该函数是线程循环执行任务函数。 51 :return: 无。 52 ''' 53 cur_thread=threading.currentThread 54 self.thread_list.append(cur_thread) 55 event=self.q.get() 56 while True: 57 func,args,cal_ba=event#获取任务函数。 58 try: 59 res=func(*args)#执行任务函数。注意参数形式是元组形式。 60 flag="OK" 61 except Exception as e: 62 print(e) 63 res=False 64 flag="fail" 65 self.res(res,flag)#调用回调函数,将执行结果返回到队列中。 66 try: 67 event=self.q.get(timeout=2)#如果任务队列为空,获取任务超时2s超过2s线程停止执行任务,并退出。 68 except Exception: 69 self.thread_list.remove(cur_thread) 70 break 71 def res(self,res,status): 72 ''' 73 功能:该方法主要是将执行结果方法队列中。 74 :param res: 任务函数的执行结果。 75 :param status: 执行任务函数的结果,成功还是失败。 76 :return: 无。 77 ''' 78 da_res=(res,status) 79 self.res_q.put(da_res) 80 81 def task(x,y): 82 ''' 83 功能:该函数主要需要执行函数。 84 :param x: 参数。 85 :return: 返回值1,表示执行成功。 86 ''' 87 print(x) 88 return x+y 89 def wri_fil(x): 90 ''' 91 功能:该函数主要讲结果队列中的结果写入文件中。 92 :param x: 任务长度。 93 :return: 无。 94 ''' 95 while True:#将执行结果,从队列中获取结果并将结果写入文件中。 96 time.sleep(1) 97 if pool.res_q.qsize()==x:#当队列当前的长度等于任务执行次数,表示任务执行完成。 98 with open('1.txt','w') as f1: 99 for i in range(pool.res_q.qsize()): 100 try: 101 data=pool.res_q.get(timeout=2) 102 f1.write('mission result:%s,status:%s\n'%data) 103 except Exception: 104 break 105 break 106 else: 107 continue 108 if __name__ == '__main__': 109 pool=Thread_Poll(10,5)#初始化线程池对象。 110 for i in range(10):#循环任务。 111 pool.run(task,(1,2)) 112 wri_fil(10)
老师版本:注意老师在创建线程的时候,如果此时任务队列中没有任务的时候,不会创建其他线程。在线程执行完任务之后,将线程加入空闲线程的列表中,然后让当前线程去队列里获取任务,利用queue里的get()方法阻塞的作用的,如果一直阻塞的话,
然后表示空闲的列表中的加入的线程 一直有,此时表示创建线程数已经满足任务需求,如果不阻塞则空闲线程列表里没有空余线程。而是获取任务,执行任务。
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 import queue 5 import threading 6 import contextlib 7 import time 8 9 StopEvent = object() 10 11 12 class ThreadPool(object): 13 14 def __init__(self, max_num, max_task_num = None): 15 if max_task_num: 16 self.q = queue.Queue(max_task_num) 17 else: 18 self.q = queue.Queue() 19 self.max_num = max_num 20 self.cancel = False 21 self.terminal = False 22 self.generate_list = [] 23 self.free_list = [] 24 25 def run(self, func, args, callback=None): 26 """ 27 线程池执行一个任务 28 :param func: 任务函数 29 :param args: 任务函数所需参数 30 :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数) 31 :return: 如果线程池已经终止,则返回True否则None 32 """ 33 if self.cancel: 34 return 35 if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: 36 self.generate_thread() 37 w = (func, args, callback,) 38 self.q.put(w) 39 40 def generate_thread(self): 41 """ 42 创建一个线程 43 """ 44 t = threading.Thread(target=self.call) 45 t.start() 46 47 def call(self): 48 """ 49 循环去获取任务函数并执行任务函数 50 """ 51 current_thread = threading.currentThread() 52 self.generate_list.append(current_thread) 53 54 event = self.q.get() 55 while event != StopEvent: 56 57 func, arguments, callback = event 58 try: 59 result = func(*arguments) 60 success = True 61 except Exception as e: 62 success = False 63 result = None 64 65 if callback is not None: 66 try: 67 callback(success, result) 68 except Exception as e: 69 pass 70 71 with self.worker_state(self.free_list, current_thread): 72 if self.terminal: 73 event = StopEvent 74 else: 75 event = self.q.get() 76 else: 77 78 self.generate_list.remove(current_thread) 79 80 def close(self): 81 """ 82 执行完所有的任务后,所有线程停止 83 """ 84 self.cancel = True 85 full_size = len(self.generate_list) 86 while full_size: 87 self.q.put(StopEvent) 88 full_size -= 1 89 90 def terminate(self): 91 """ 92 无论是否还有任务,终止线程 93 """ 94 self.terminal = True 95 96 while self.generate_list: 97 self.q.put(StopEvent) 98 99 self.q.queue.clear() 100 101 @contextlib.contextmanager 102 def worker_state(self, state_list, worker_thread): 103 """ 104 用于记录线程中正在等待的线程数 105 """ 106 state_list.append(worker_thread) 107 try: 108 yield 109 finally: 110 state_list.remove(worker_thread) 111 112 113 114 # How to use 115 116 117 pool = ThreadPool(5) 118 119 def callback(status, result): 120 # status, execute action status 121 # result, execute action return value 122 pass 123 124 125 def action(i): 126 print(i) 127 128 for i in range(30): 129 ret = pool.run(action, (i,), callback) 130 131 time.sleep(5) 132 print(len(pool.generate_list), len(pool.free_list)) 133 print(len(pool.generate_list), len(pool.free_list)) 134 # pool.close() 135 # pool.terminate()
转载于:https://www.cnblogs.com/evilliu/p/5724933.html
上下文管理、线程池、redis订阅和发布相关推荐
- C#多线程学习(四) 多线程的自动管理(线程池) (转载系列)——继续搜索引擎研究...
在多线程的程序中,经常会出现两种情况: 一种情况: 应用程序中,线程把大部分的时间花费在等待状态,等待某个事件发生,然后才能给予响应 这一般使用ThreadPo ...
- Redis订阅与发布原理
记录下来以后方便回忆,需要时随时翻阅 Redis 订阅与发布 原理 client->pubsub_channels 是客户端维护的一个以dict结构的维护的订阅频道哈希表,VAL是NULL,不需 ...
- [转]C#多线程学习(四) 多线程的自动管理(线程池)
在多线程的程序中,经常会出现两种情况: 一种情况: 应用程序中,线程把大部分的时间花费在等待状态,等待某个事件发生,然后才能给予响应 这一般使用ThreadPo ...
- C#多线程学习(四) 多线程的自动管理(线程池)
在多线程的程序中,经常会出现两种情况: 一种情况: 应用程序中,线程把大部分的时间花费在等待状态,等待某个事件发生,然后才能给予响应 这一般使用ThreadPo ...
- php redis消息订阅与发布_PHP实现redis订阅和发布(用于异步任务处理)
搜索热词 1.概念 名称及含义 channel频道:生产者和消费者直接操作的对象 publish生产者:向channel发送消息 subscribe消费者:订阅一个或多个channel psubscr ...
- redis订阅和发布 消息推送php,redis发布订阅什么用
Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息. Redis 客户端可以订阅任意数量的频道. (推荐学习:Redis视频教程) Redis ...
- Redis 订阅与发布
PUBLISH channel msg将信息 message 发送到指定的频道 channelSUBSCRIBE channel [channel ...]订阅频道,可以同时订阅多个频道UNSUBSC ...
- python redis订阅_python实现 redis订阅与发布
订阅者可以订阅一个或多个频道,发布者向一个频道发送消息后,所有订阅这个频道的订阅者都将收到消息,而发布者也将收到一个数值,这个数值是收到消息的订阅者的数量.订阅者只能收到自它开始订阅后发布者所发布的消 ...
- Redis订阅和发布(实操教学)
什么是Redis发布订阅 Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息.Redis 客户端可以订阅任意数量的频道. 例1:c ...
最新文章
- dedecms 实现分页填写页码直接跳转到分页 【dedecms】
- mycat是什么_MYCAT学习2
- Android Studio编译问题-Error:Could not find org.jetbrains.trove4j
- 理解 e.clientX,e.clientY e.pageX
- ZZULIOJ 1108: 打印数字图形(函数专题)
- HTML5八大特性助力移动WebApp开发
- 设置VS2008 快捷键 快速注释
- python圈出车牌字符_Python+OpenCV实现车牌字符分割和识别
- SQL Server datetime数据类型设计与优化误区
- c语言报刊杂志订阅系统,中国报刊杂志大全_报刊大全_报刊杂志订阅
- Servlet规范简介
- e531网卡驱动linux,联想e531网卡驱动下载-联想e531笔记本无线网卡驱动v6.30.223.201 官方版 - 极光下载站...
- ant design of vue中表格列内容过长,需要截取并且鼠标滑过悬浮显示全部内容
- 寒月聊健康;手指麻 刺痛 无力你可能是腕管综合症
- 戴尔390计算机电源问题,DELL笔记本电脑电源适配器无电压输出故障
- GLES2.0中文API-glCopyTexSubImage2D
- python office插件_Office 数据可视化8大实用插件推荐
- 编译Qt/E时遇到incorrect register `%rbp' used with `l' suffix问题的解决办法
- RabbitMQ 消费者确认(Consumer Acknowledgements)
- 最简单DIY基于ESP8266的智能彩灯⑥(Android开发通过WIFI控制彩灯实现表白神器)
热门文章
- Python jsonpath库的使用:解析json并将结果保存到文件
- Hadoop 启动hdfs和yarn的命令
- Java客户端操作elasticsearch--创建索引(集群模式下)
- 【软考-软件设计师】计算机系统知识概览
- Collections工具类常用API使用示例
- U-Boot 之三 U-Boot 源码文件解析及移植过程详解
- Oracle 原理:DML触发器和数据库触发器
- JVM运行时结构、Java内存管理、JVM实例、HotSpot VM对象的创建、内存布局和访问定位
- springboot后台怎么获取前端传过来的excel_开源商城系统包含后台管理和手机端
- 批处理命令 / set