一:上下文管理:

对于一些对象在使用之后,需要关闭操作的。比如说:socket、mysql数据库连接、文件句柄等。

都可以用上下文来管理。

语法结构:

 1     Typical usage:
 2
 3         @contextmanager
 4         def some_generator(<arguments>):
 5             <setup>
 6             try:
 7                 yield <value>
 8             finally:
 9                 <cleanup>
10
11     This makes this:
12
13         with some_generator(<arguments>) as <variable>:
14             <body>

code:

 1 import socket
 2 import contextlib
 3
 4
 5 @contextlib.contextmanager
 6 def sock_server(host,port):
 7     sk=socket.socket()
 8     sk.bind((host,port))
 9     sk.listen(4)
10     try:
11         yield sk
12     finally:
13         sk.close()
14
15 with sock_server("127.0.0.1",22) as soc:
16     print(soc)

执行顺序:

解释:python从上到下依次解释:

1、当到with的时候,执行with内socket_server("127.0.0.1",22),跳到

2、被contextlib.contextmanager装饰的函数。

3、依次执行函数socket_server到yield 并把sk返回给第4步的sco变量

4、然后执行with下面的代码块,执行print语句。

5、当with语句的代码块执行完。跳到第3步的yeild。

6、执行finally语句里的代码块。

二:线程池(threadpool)

自己版本:

  1 #!/bin/env python
  2 #author:evil_liu
  3 #date:2016-7-21
  4 #description: thread pool
  5
  6 import threading
  7 import time
  8 import queue
  9
 10 class Thread_Poll:
 11     '''
 12     功能:该类主要实现多线程,以及线程复用。
 13     '''
 14     def __init__(self,task_num,max_size):
 15         '''
 16         功能:该函数是初始化线程池对象。
 17         :param task_num: 任务数量。
 18         :param max_size: 线程数量。
 19         :return:无。
 20         '''
 21         self.task_num=task_num
 22         self.max_size=max_size
 23         self.q=queue.Queue(task_num)#设置任务队列的。
 24         self.thread_list=[]
 25         self.res_q=queue.Queue()#设置结果队列。
 26
 27     def run(self,func,i,call_back=None):
 28         '''
 29         功能:该函数是线程池运行主函数。
 30         :param func: 传入任务主函数。
 31         :param *args: 任务函数参数,需要是元组形式。
 32         :param call_back: 回调函数。
 33         :return: 无。
 34         '''
 35         if len(self.thread_list)<self.max_size:#如果目前线程数小于我们定义的线程的个数,进行创建。
 36             self.creat_thread()
 37         misson=(func,i,call_back)#往任务队列放任务。
 38         self.q.put(misson)
 39
 40     def creat_thread(self):
 41         '''
 42         功能:该函数主要是创建线程,并调用call方法。
 43         :return: 无。
 44         '''
 45         t=threading.Thread(target=self.call)#创建线程
 46         t.start()
 47
 48     def call(self):
 49         '''
 50         功能:该函数是线程循环执行任务函数。
 51         :return: 无。
 52         '''
 53         cur_thread=threading.currentThread
 54         self.thread_list.append(cur_thread)
 55         event=self.q.get()
 56         while True:
 57             func,args,cal_ba=event#获取任务函数。
 58             try:
 59                 res=func(*args)#执行任务函数。注意参数形式是元组形式。
 60                 flag="OK"
 61             except Exception as e:
 62                 print(e)
 63                 res=False
 64                 flag="fail"
 65             self.res(res,flag)#调用回调函数,将执行结果返回到队列中。
 66             try:
 67                 event=self.q.get(timeout=2)#如果任务队列为空,获取任务超时2s超过2s线程停止执行任务,并退出。
 68             except Exception:
 69                 self.thread_list.remove(cur_thread)
 70                 break
 71     def res(self,res,status):
 72         '''
 73         功能:该方法主要是将执行结果方法队列中。
 74         :param res: 任务函数的执行结果。
 75         :param status: 执行任务函数的结果,成功还是失败。
 76         :return: 无。
 77         '''
 78         da_res=(res,status)
 79         self.res_q.put(da_res)
 80
 81 def task(x,y):
 82     '''
 83     功能:该函数主要需要执行函数。
 84     :param x: 参数。
 85     :return: 返回值1,表示执行成功。
 86     '''
 87     print(x)
 88     return x+y
 89 def wri_fil(x):
 90     '''
 91     功能:该函数主要讲结果队列中的结果写入文件中。
 92     :param x: 任务长度。
 93     :return: 无。
 94     '''
 95     while True:#将执行结果,从队列中获取结果并将结果写入文件中。
 96         time.sleep(1)
 97         if  pool.res_q.qsize()==x:#当队列当前的长度等于任务执行次数,表示任务执行完成。
 98             with open('1.txt','w') as f1:
 99                 for i in  range(pool.res_q.qsize()):
100                     try:
101                         data=pool.res_q.get(timeout=2)
102                         f1.write('mission result:%s,status:%s\n'%data)
103                     except Exception:
104                         break
105             break
106         else:
107             continue
108 if __name__ == '__main__':
109     pool=Thread_Poll(10,5)#初始化线程池对象。
110     for i in range(10):#循环任务。
111         pool.run(task,(1,2))
112     wri_fil(10)

老师版本:注意老师在创建线程的时候,如果此时任务队列中没有任务的时候,不会创建其他线程。在线程执行完任务之后,将线程加入空闲线程的列表中,然后让当前线程去队列里获取任务,利用queue里的get()方法阻塞的作用的,如果一直阻塞的话,

然后表示空闲的列表中的加入的线程 一直有,此时表示创建线程数已经满足任务需求,如果不阻塞则空闲线程列表里没有空余线程。而是获取任务,执行任务。

  1 #!/usr/bin/env python
  2 # -*- coding:utf-8 -*-
  3
  4 import queue
  5 import threading
  6 import contextlib
  7 import time
  8
  9 StopEvent = object()
 10
 11
 12 class ThreadPool(object):
 13
 14     def __init__(self, max_num, max_task_num = None):
 15         if max_task_num:
 16             self.q = queue.Queue(max_task_num)
 17         else:
 18             self.q = queue.Queue()
 19         self.max_num = max_num
 20         self.cancel = False
 21         self.terminal = False
 22         self.generate_list = []
 23         self.free_list = []
 24
 25     def run(self, func, args, callback=None):
 26         """
 27         线程池执行一个任务
 28         :param func: 任务函数
 29         :param args: 任务函数所需参数
 30         :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
 31         :return: 如果线程池已经终止,则返回True否则None
 32         """
 33         if self.cancel:
 34             return
 35         if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
 36             self.generate_thread()
 37         w = (func, args, callback,)
 38         self.q.put(w)
 39
 40     def generate_thread(self):
 41         """
 42         创建一个线程
 43         """
 44         t = threading.Thread(target=self.call)
 45         t.start()
 46
 47     def call(self):
 48         """
 49         循环去获取任务函数并执行任务函数
 50         """
 51         current_thread = threading.currentThread()
 52         self.generate_list.append(current_thread)
 53
 54         event = self.q.get()
 55         while event != StopEvent:
 56
 57             func, arguments, callback = event
 58             try:
 59                 result = func(*arguments)
 60                 success = True
 61             except Exception as e:
 62                 success = False
 63                 result = None
 64
 65             if callback is not None:
 66                 try:
 67                     callback(success, result)
 68                 except Exception as e:
 69                     pass
 70
 71             with self.worker_state(self.free_list, current_thread):
 72                 if self.terminal:
 73                     event = StopEvent
 74                 else:
 75                     event = self.q.get()
 76         else:
 77
 78             self.generate_list.remove(current_thread)
 79
 80     def close(self):
 81         """
 82         执行完所有的任务后,所有线程停止
 83         """
 84         self.cancel = True
 85         full_size = len(self.generate_list)
 86         while full_size:
 87             self.q.put(StopEvent)
 88             full_size -= 1
 89
 90     def terminate(self):
 91         """
 92         无论是否还有任务,终止线程
 93         """
 94         self.terminal = True
 95
 96         while self.generate_list:
 97             self.q.put(StopEvent)
 98
 99         self.q.queue.clear()
100
101     @contextlib.contextmanager
102     def worker_state(self, state_list, worker_thread):
103         """
104         用于记录线程中正在等待的线程数
105         """
106         state_list.append(worker_thread)
107         try:
108             yield
109         finally:
110             state_list.remove(worker_thread)
111
112
113
114 # How to use
115
116
117 pool = ThreadPool(5)
118
119 def callback(status, result):
120     # status, execute action status
121     # result, execute action return value
122     pass
123
124
125 def action(i):
126     print(i)
127
128 for i in range(30):
129     ret = pool.run(action, (i,), callback)
130
131 time.sleep(5)
132 print(len(pool.generate_list), len(pool.free_list))
133 print(len(pool.generate_list), len(pool.free_list))
134 # pool.close()
135 # pool.terminate()

转载于:https://www.cnblogs.com/evilliu/p/5724933.html

上下文管理、线程池、redis订阅和发布相关推荐

  1. C#多线程学习(四) 多线程的自动管理(线程池) (转载系列)——继续搜索引擎研究...

    在多线程的程序中,经常会出现两种情况: 一种情况:   应用程序中,线程把大部分的时间花费在等待状态,等待某个事件发生,然后才能给予响应                   这一般使用ThreadPo ...

  2. Redis订阅与发布原理

    记录下来以后方便回忆,需要时随时翻阅 Redis 订阅与发布 原理 client->pubsub_channels 是客户端维护的一个以dict结构的维护的订阅频道哈希表,VAL是NULL,不需 ...

  3. [转]C#多线程学习(四) 多线程的自动管理(线程池)

    在多线程的程序中,经常会出现两种情况: 一种情况:   应用程序中,线程把大部分的时间花费在等待状态,等待某个事件发生,然后才能给予响应                   这一般使用ThreadPo ...

  4. C#多线程学习(四) 多线程的自动管理(线程池)

    在多线程的程序中,经常会出现两种情况: 一种情况:   应用程序中,线程把大部分的时间花费在等待状态,等待某个事件发生,然后才能给予响应                   这一般使用ThreadPo ...

  5. php redis消息订阅与发布_PHP实现redis订阅和发布(用于异步任务处理)

    搜索热词 1.概念 名称及含义 channel频道:生产者和消费者直接操作的对象 publish生产者:向channel发送消息 subscribe消费者:订阅一个或多个channel psubscr ...

  6. redis订阅和发布 消息推送php,redis发布订阅什么用

    Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息. Redis 客户端可以订阅任意数量的频道. (推荐学习:Redis视频教程) Redis ...

  7. Redis 订阅与发布

    PUBLISH channel msg将信息 message 发送到指定的频道 channelSUBSCRIBE channel [channel ...]订阅频道,可以同时订阅多个频道UNSUBSC ...

  8. python redis订阅_python实现 redis订阅与发布

    订阅者可以订阅一个或多个频道,发布者向一个频道发送消息后,所有订阅这个频道的订阅者都将收到消息,而发布者也将收到一个数值,这个数值是收到消息的订阅者的数量.订阅者只能收到自它开始订阅后发布者所发布的消 ...

  9. Redis订阅和发布(实操教学)

    什么是Redis发布订阅 Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息.Redis 客户端可以订阅任意数量的频道. 例1:c ...

最新文章

  1. dedecms 实现分页填写页码直接跳转到分页 【dedecms】
  2. mycat是什么_MYCAT学习2
  3. Android Studio编译问题-Error:Could not find org.jetbrains.trove4j
  4. 理解 e.clientX,e.clientY e.pageX
  5. ZZULIOJ 1108: 打印数字图形(函数专题)
  6. HTML5八大特性助力移动WebApp开发
  7. 设置VS2008 快捷键 快速注释
  8. python圈出车牌字符_Python+OpenCV实现车牌字符分割和识别
  9. SQL Server datetime数据类型设计与优化误区
  10. c语言报刊杂志订阅系统,中国报刊杂志大全_报刊大全_报刊杂志订阅
  11. Servlet规范简介
  12. e531网卡驱动linux,联想e531网卡驱动下载-联想e531笔记本无线网卡驱动v6.30.223.201 官方版 - 极光下载站...
  13. ant design of vue中表格列内容过长,需要截取并且鼠标滑过悬浮显示全部内容
  14. 寒月聊健康;手指麻 刺痛 无力你可能是腕管综合症
  15. 戴尔390计算机电源问题,DELL笔记本电脑电源适配器无电压输出故障
  16. GLES2.0中文API-glCopyTexSubImage2D
  17. python office插件_Office 数据可视化8大实用插件推荐
  18. 编译Qt/E时遇到incorrect register `%rbp' used with `l' suffix问题的解决办法
  19. RabbitMQ 消费者确认(Consumer Acknowledgements)
  20. 最简单DIY基于ESP8266的智能彩灯⑥(Android开发通过WIFI控制彩灯实现表白神器)

热门文章

  1. Python jsonpath库的使用:解析json并将结果保存到文件
  2. Hadoop 启动hdfs和yarn的命令
  3. Java客户端操作elasticsearch--创建索引(集群模式下)
  4. 【软考-软件设计师】计算机系统知识概览
  5. Collections工具类常用API使用示例
  6. U-Boot 之三 U-Boot 源码文件解析及移植过程详解
  7. Oracle 原理:DML触发器和数据库触发器
  8. JVM运行时结构、Java内存管理、JVM实例、HotSpot VM对象的创建、内存布局和访问定位
  9. springboot后台怎么获取前端传过来的excel_开源商城系统包含后台管理和手机端
  10. 批处理命令 / set