例如:

from multiprocessing import Pool

def f(x):

return x*x

pool = Pool(processes=4)

r=pool.map(f, range(100))

pool.close()

pool.join()

在spyder里运行直接没反应;在shell窗口里,直接报错,如下:

Process SpawnPoolWorker-15:

Traceback (most recent call last):

File "C:\Anaconda3\lib\multiprocessing\process.py", line 254, in _bootstr

self.run()

File "C:\Anaconda3\lib\multiprocessing\process.py", line 93, in run

self._target(*self._args, **self._kwargs)

File "C:\Anaconda3\lib\multiprocessing\pool.py", line 108, in worker

task = get()

File "C:\Anaconda3\lib\multiprocessing\queues.py", line 357, in get

return ForkingPickler.loads(res)

AttributeError: Can't get attribute 'f' on

解决:

Windows下面的multiprocessing跟Linux下面略有不同,Linux下面基于fork,fork之后所有的本地变量都复制一份,因此可以使用任意的全局变量;在Windows下面,多进程是通过启动新进程完成的,所有的全局变量都是重新初始化的,在运行过程中动态生成、修改过的全局变量是不能使用的。

multiprocessing内部使用pickling传递map的参数到不同的进程,当传递一个函数或类时,pickling将函数或者类用所在模块+函数/类名的方式表示,如果对端的Python进程无法在对应的模块中找到相应的函数或者类,就会出错。

当你在Interactive Console当中创建函数的时候,这个函数是动态添加到__main__模块中的,在重新启动的新进程当中不存在,所以会出错。

当不在Console中,而是在独立Python文件中运行时,你会遇到另一个问题:由于你下面调用multiprocessing的代码没有保护,在新进程加载这个模块的时候会重新执行这段代码,创建出新的multiprocessing池,无限调用下去。

解决这个问题的方法是永远把实际执行功能的代码加入到带保护的区域中:if __name__ == '__mian__':

补充知识:multiprocessing Pool的异常处理问题

multiprocessing.Pool开发多进程程序时,在某个子进程执行函数使用了mysql-python连接数据库,

由于程序设计问题,没有捕获到所有异常,导致某个异常错误直接抛到Pool中,导致整个Pool挂了,其异常错误如下所示:

Exception in thread Thread-3:

Traceback (most recent call last):

File "/usr/lib64/python2.7/threading.py", line 812, in __bootstrap_inner

self.run()

File "/usr/lib64/python2.7/threading.py", line 765, in run

self.__target(*self.__args, **self.__kwargs)

File "/usr/lib64/python2.7/multiprocessing/pool.py", line 376, in _handle_results

task = get()

File "/usr/lib/python2.7/site-packages/mysql/connector/errors.py", line 194, in __init__

'msg': self.msg.encode('utf8') if PY2 else self.msg

AttributeError: ("'int' object has no attribute 'encode'", ,

(2055, "2055: Lost Connection to MySQL '192.169.36.189:3306', system error: timed out", None))

本文档基于以上问题对multiprocessing.Pool以及python-mysql-connector的源码实现进行分析,以定位具体的错误原因。解决方法其实很简单,不要让异常抛到Pool里就行。

问题产生场景

python 版本centos7.3自带的2.7.5版本,或者最新的python-2.7.14

mysql-connector库,版本是2.0及以上,可到官网下载最新版:mysql-connector

问题发生的code其实可以简化为如下所示:

from multiprocessing import Pool, log_to_stderr

import logging

import mysql.connector

# open multiprocessing lib log

log_to_stderr(level=logging.DEBUG)

def func():

raise mysql.connector.Error("demo test", 100)

if __name__ == "__main__":

p = Pool(3)

res = p.apply_async(func)

res.get()

所以解决问题很简单,在func里加个try-except就可以了。但是如果你好奇为什么为出现AttributeError的异常,那么可以继续往下看。

Multiprocessing.Pool的实现

通过查看源码,大致上multiprocess.Pool的实现如下图所示:

当我们执行以下语句时,主进程会创建三个子线程:_handle_workers、_handle_results、_handle_tasks;同时会创建Pool(n)个数的worker子进程。主进程与各个worker子进程间的通信使用内部定义的Queue,其实就是Pipe管道通信,如上图的_taskqueue、_inqueue和_outqueue。

p = Pool(3)

res = p.apply_async(func)

res.get()

这三个子线程的作用是:

1. handle_workers线程管理worker进程,使进程池维持Pool(n)个worker进程数;

2. handle_tasks线程将用户的任务(包括job_id, 处理函数func等信息)传递到_inqueue中,子进程们竞争获取任务,然后运行相关函数,将结果放在_outqueue中,然后继续监听tasksqueue的任务列表。其实就是典型的生产消费问题。

3. handle_results线程监听_outQqueue的内容,有就拿到,通过字典_cache找到对应的job,将结果存储在*Result对象中,释放该job的信号量,表明job执行完毕。此后,就可以通过*Result.get()函数获取执行结果。

当我们调用p.apply_async 或者p.map时,其实就是创建了AsyncResult或者MapResult对象,然后将task放到_taskqueue中;调用*Result.get()方法等待task被worker子进程执行完成,获取执行结果。

在知道了multprocess.Pool的实现逻辑后,现在我们来探索下,当func将异常抛出时,Pool的worker是怎么处理的。下面的代码是pool.worker工作子进程的核心执行函数的简化版。

def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):

...

while xxx:

try:

task = get()

except:

...

job, i, func, args, kwds = task

try:

result = (True, func(*args, **kwds))

except Exception, e:

result = (False, e)

...

try:

put((job, i, result))

except Exception, e:

...

从代码中可以看到,在执行func时,如果func抛出异常,那么worker会将异常对象直接放入到_outqueue中,然后等待下一个task。也就是说,worker是可以处理异常的。

那么接下来看看_handle_result线程是怎么处理worker发过来的结果的。如下所示:

@staticmethod

def _handle_results(outqueue, get, cache):

while 1:

try:

task = get()

except (IOError, EOFError):

return

...

上述代码为_handle_result的主要处理逻辑,可以看到,它只对 IOError, EOFError进行了处理,也就是说,如果在get()时发生了其它异常错误,将导致_handle_result这个线程直接退出(而事实上的确如此)。既然_handle_result退出了,那么就没有动作来触发_cache中*Result对象释放信号量,则用户的执行流程就一直处于wait状态。这样,用户主进程就会一直卡在get()中,导致主流程执行不下去。

我们通过打开multiprocessing库的日志(log_to_stderr(level=logging.DEBUG)),然后修改multiprocessing.Pool中_handel_result的代码,加上一个except Exception,然后运行文章一开始的的异常代码,如下所示:

# multiprocessing : pool.py

#

class Pool(object):

@staticmethod

def _handle_results(outqueue, get, cache):

while 1:

try:

task = get()

except (IOError, EOFError):

return

except Exception:

debug("handle_result not catch Exceptions.")

return

...

控制台如果输出"handle_result not catch Exceptions.",表明_handle_results没有catch到所有的异常。而实际上,真的是由于task = get()这句话抛异常了。

那么,_outqueue.get()方法做了什么。深入查看源码,发现get()方法其实就是os.pipe的read/write方法,但是做了一些处理吧。其内部实现大致如下:

def Pipe(duplex=True):

...

fd1, fd2 = os.pipe()

c1 = _multiprocessing.Connection(fd1, writable=False) # get

c2 = _multiprocessing.Connection(fd2, readable=False) # put

return c1, c2

_multiprocessing.Connection内部使用了C的实现,就不再深入了,否则会就越来越复杂了。它内部应该使用了pickle库,在put时将对象实例pickle(也就是序列化吧),然后在get时将实例unpikcle,重新生成实例对象。具体可查看python官方文档关于pickle的介绍(包括object可pickle的条件以及在unpickle时调用的方法等)。不管如何,就是实例在get,即unpickle的过程出错了。

'msg': self.msg.encode('utf8') if PY2 else self.msg

AttributeError: 'int' object has no attribute 'encode'

从上述错误日志中可以看到,表明在重构时msg参数传入了int类型变量。就是说在unpickle阶段,Mysql Error重新实例化时执行了__init__()方法,但是传参错误了。为了验证这一现象,我将MySql Error的__init__()进行简化,最终确认到self.args的赋值上,即Exception及其子类在unpickle时会调用__init__()方法,并将self.args作为参数列表传递给__init__()。

通过以下代码可以简单的验证问题:

import os

from multiprocessing import Pipe

class DemoError(Exception):

def __init__(msg, errno):

print "msg: %s, errno: %s" % (msg, errno)

self.args = ("aa", "bb")

def func():

raise DemoError("demo test", 100)

r, w = Pipe(duplex=False)

try:

result = (True, func(1))

except Exception, e:

result = (False, e)

print "send result"

w.send(result)

print "get result"

res = r.recv()

print "finished."

日志会在recv调用时打印 msg: aa, errno: bb,表明recv异常类Exception时会将self.args作为参数传入init()函数中。而Mysql的Error类重写self.args变量,而且顺序不对,导致msg在执行编码时出错。MySql Error的实现简化如下:

class Error(Exception):

def __init__(self, msg=None, errno=None, values=None, sqlstate=None):

super(Error, self).__init__()

...

if self.msg and self.errno != -1:

fields = {

'errno': self.errno,

'msg': self.msg.encode('utf-8') if PY2 else self.msg

}

...

self.args = (self.errno, self._full_msg, self.sqlstate)

可以看到,mysql Error中的self.args与__init__(msg, errno, values, sqlstate)的顺序不一,因此self.args第一个参数errno传给了msg,导致AttributeError。至于self.args是什么,简单查了下,是Exception类中定义的,一般用__str__或者__repr__方法的输出,python官方文档不建议overwrite。

总结

好吧,说了这么多,通过问题的追踪,我们也基本上了解清楚multiprocessing.Pool库的实现了。事实上,也很难说是谁的bug,是两者共同作用下出现的。不管如何,希望在用到multiprocessing库时,特别与Pipe相关时,谨慎点使用,最好的不要让异常跑到multiprocess中处理,应该在func中将所有的异常处理掉,如果有自己定于的异常类,请最好保证self.args的顺序与__init__()的顺序一致。同时,网上好像也听说使用multprocessing和subprocess库出现问题,或许也是这个异常抛出的问题,毕竟suprocessError定义与Exception好像有些区别。

以上这篇解决windows下python3使用multiprocessing.Pool出现的问题就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持脚本之家。

python进程池win出错_解决windows下python3使用multiprocessing.Pool出现的问题相关推荐

  1. 在windows下python3使用multiprocessing.Pool时出现的问题

    例如: from multiprocessing import Pool def f(x): return x*x pool = Pool(processes=4) r=pool.map(f, ran ...

  2. python进程池win出错,python进程池multiprocessing.Pool运行错误:

    错误: Traceback (most recent call last): File "", line 1, in File "C:\Program Files\Ana ...

  3. python2没有pip命令_解决Windows下python和pip命令无法使用的问题

    一. python命令找不到 安装python之后经常会出现下面的问题 , python命令找不到,这是因为Windows的环境变量中没有定义python的安装路径 这个时候我们先找到python的安 ...

  4. 关闭终端php就退出进程_解决windows下php-cgi进程经常自动关闭

    php-cgi在linux中有fpm管理,Apache不是采用cgi这种模式,于是乎就出现了,在windows下,采用nginx时,开启的php-cgi监听非常不稳定,经常奔溃关闭.现在解决了这个问题 ...

  5. python更新包列表出错_解决pycharm无法获取安装包文件列表

    Pycharm 中Available packages nothing to show Manage Repositories: https://pypi.python.org/pypi https: ...

  6. python 进程池 等待数量_【2020Python修炼记】python并发编程(六)补充—进程池和线程池...

    1. 2. 为啥要有 进程池和线程池 进程池来控制进程数目,比如httpd的进程模式,规定最小进程数和最大进程数 3.创建进程池的类pool 如果指定numprocess为3,则进程池会从无到有创建三 ...

  7. sublime运行python代码python没显示_解决windows下Sublime Text 2 运行 PyQt 不显示的方法分享...

    解决方案 搜了一下,找到一个 Linux 下的解决方案,如下所示: Sublime Text2 运行pySide/pyQt程序的问题 Ctrl-B后,界面不会弹出来,但是后台进程里面有"py ...

  8. python 进程池pool使用详解

    和选用线程池来关系多线程类似,当程序中设置到多进程编程时,Python 提供了更好的管理多个进程的方式,就是使用进程池. 在利用 Python 进行系统管理的时候,特别是同时操作多个文件目录,或者远程 ...

  9. Python 进程池 multiprocessing.Pool - Python零基础入门教程

    目录 一.Python 进程池 multiprocessing.Pool 介绍 二.Python 进程池 multiprocessing.Pool 使用 三.猜你喜欢 零基础 Python 学习路线推 ...

最新文章

  1. 【c++】基本数据类型
  2. 第三章 处理机调度与死锁
  3. Rtx 实时通知实现
  4. 企业形象广告的几个突破要点
  5. 数据结构----依据出栈顺序判断所需的最少栈空间
  6. 西工大18秋《C语言程序设计》平时作业,西工大18秋C语言程序设计平时作业答案...
  7. Item 27 避免使用ICloneable接口
  8. jpype测试报错,找不到类raise _RUNTIMEEXCEPTION.PYEXC(Class %s not found % name)
  9. WebRAY权小文:产品就是工程师的尊严
  10. 解决 谷歌chrome浏览器开启麦克风
  11. java程序中的异常404 505错误
  12. python自动上传百度网盘_树莓派使用百度云盘自动上传存储监控照片
  13. 翻译来自HiDDeN网络架构-Lifeifei
  14. HDU 6441 Find Integer
  15. Launcher进程启动流程
  16. python 绘制 3D 曲面
  17. MACOS 苹果系统 微信多开
  18. std::ifstream实例
  19. python数据分析-matplotlib可视化
  20. N71005-第五周

热门文章

  1. html5 ul下的li重叠解决,html – 如何仅在嵌套的ul中悬停当前的li?
  2. 玄元剑仙手游最新服务器,玄元剑仙最新
  3. 为什么java button 不能用 显示红色_Java中的整型包装类值的比较为什么不能用==比较?原因是因为缓存...
  4. java 数组以逗号分隔_在java中使用逗号分隔符拆分字符串数组
  5. 食物和计算机中一样的英语,关于电脑和食物的英语口语
  6. php网页留言本过程,PHP实现简单留言本功能代码示例
  7. com.sun.jersey.api.client.ClientHandlerException: java.net.ConnectException: Connection refused
  8. reactive streams的Mono及Flux
  9. IntelliJ IDEA统计项目代码行数
  10. Spring Cloud Config 使用总结