我正在使用子流程模块来启动子流程并连接到其输出流(stdout)。 我希望能够在其stdout上执行非阻塞读取。 有没有一种方法可以使.readline成为非阻塞状态,或者在调用.readline之前检查流中是否有数据? 我希望这是可移植的,或者至少要在Windows和Linux下工作。

这是我目前的操作方式(如果没有可用数据, .readline.readline上阻塞):

p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()

#1楼

免责声明:这仅适用于龙卷风

您可以通过将fd设置为非阻塞,然后使用ioloop来注册回调来实现。 我将其包装在一个名为tornado_subprocess的鸡蛋中,您可以通过PyPI安装它:

easy_install tornado_subprocess

现在您可以执行以下操作:

import tornado_subprocess
import tornado.ioloopdef print_res( status, stdout, stderr ) :print status, stdout, stderrif status == 0:print "OK:"print stdoutelse:print "ERROR:"print stderrt = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()

您也可以将其与RequestHandler一起使用

class MyHandler(tornado.web.RequestHandler):def on_done(self, status, stdout, stderr):self.write( stdout )self.finish()@tornado.web.asynchronousdef get(self):t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )t.start()

#2楼

现有解决方案对我不起作用(详细信息如下)。 最终可行的方法是使用read(1)实现readline(基于此答案 )。 后者不会阻止:

from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming threadnextline = Nonebuf = ''while True:#--- extract line using read(1)out = myprocess.stdout.read(1)if out == '' and myprocess.poll() != None: breakif out != '':buf += outif out == '\n':nextline = bufbuf = ''if not nextline: continueline = nextlinenextline = None#--- do whatever you want with line hereprint 'Line is:', linemyprocess.stdout.close()myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(dcmpid,)) #output-consuming thread
p1.daemon = True
p1.start()#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop threadmyprocess.kill()myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finishp1.join()

为什么现有解决方案不起作用:

  1. 需要readline的解决方案(包括基于Queue的解决方案)始终会阻塞。 很难(不可能?)杀死执行readline的线程。 它只会在创建它的过程完成时被杀死,而不会在产生输出的过程被杀死时被杀死。
  2. 正如anonnn指出的那样,将低级fcntl与高级别readline调用混合可能无法正常工作。
  3. 使用select.poll()很简洁,但是根据python文档,它在Windows上不起作用。
  4. 使用第三方库似乎无法胜任此任务,并增加了其他依赖性。

#3楼

我已经基于JF Sebastian的解决方案创建了一个库。 您可以使用它。

https://github.com/cenkalti/什么


#4楼

编辑:此实现仍会阻止。 请改用JFSebastian的答案 。

我尝试了 最佳答案 ,但是线程代码的额外风险和维护令人担忧。

\n

通过 io模块 (仅限于2.6),我发现BufferedReader。 这是我的无线程,非阻塞解决方案。

import io
from subprocess import PIPE, Popenp = Popen(['myprogram.exe'], stdout=PIPE)SLEEP_DELAY = 0.001# Create an io.BufferedReader on the file descriptor for stdout
with io.open(p.stdout.fileno(), 'rb', closefd=False) as buffer:while p.poll() == None:time.sleep(SLEEP_DELAY)while '\n' in bufferedStdout.peek(bufferedStdout.buffer_size):line = buffer.readline()# do stuff with the line# Handle any remaining output after the process has endedwhile buffer.peek():line = buffer.readline()# do stuff with the line

#5楼

这是我的代码,用于捕获子流程ASAP的每个输出,包括部分行。 它同时抽水,并且以几乎正确的顺序抽出stdout和stderr。

经过测试并在Python 2.7 linux&Windows上正确工作。

#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():if (len(argv) > 1) and (argv[-1] == "-sub-"):import time, sysprint "Application runned!"time.sleep(2)print "Slept 2 second"time.sleep(1)print "Slept 1 additional second",time.sleep(2)sys.stderr.write("Stderr output after 5 seconds")print "Eol on stdin"sys.stderr.write("Eol on stderr\n")time.sleep(1)print "Wow, we have end of work!",else:os.environ["PYTHONUNBUFFERED"]="1"try:p = Popen( argv + ["-sub-"],bufsize=0, # line-bufferedstdin=PIPE, stdout=PIPE, stderr=PIPE )except WindowsError, W:if W.winerror==193:p = Popen( argv + ["-sub-"],shell=True, # Try to run via shellbufsize=0, # line-bufferedstdin=PIPE, stdout=PIPE, stderr=PIPE )else:raiseinp = Queue.Queue()sout = io.open(p.stdout.fileno(), 'rb', closefd=False)serr = io.open(p.stderr.fileno(), 'rb', closefd=False)def Pump(stream, category):queue = Queue.Queue()def rdr():while True:buf = stream.read1(8192)if len(buf)>0:queue.put( buf )else:queue.put( None )returndef clct():active = Truewhile active:r = queue.get()try:while True:r1 = queue.get(timeout=0.005)if r1 is None:active = Falsebreakelse:r += r1except Queue.Empty:passinp.put( (category, r) )for tgt in [rdr, clct]:th = Thread(target=tgt)th.setDaemon(True)th.start()Pump(sout, 'stdout')Pump(serr, 'stderr')while p.poll() is None:# App still workingtry:chan,line = inp.get(timeout = 1.0)if chan=='stdout':print "STDOUT>>", line, "<?<"elif chan=='stderr':print " ERROR==", line, "=?="except Queue.Empty:passprint "Finish"if __name__ == '__main__':__main__()

#6楼

我经常遇到类似的问题。 我经常编写的Python程序需要具有执行一些主要功能的能力,同时还要从命令行(stdin)接受用户输入。 仅将用户输入处理功能放在另一个线程中并不能解决问题,因为readline()阻塞并且没有超时。 如果主要功能已经完成,并且不再需要等待进一步的用户输入,我通常希望我的程序退出,但是不能这样做,因为readline()仍然在另一个线程中等待行。 我发现此问题的解决方案是使用fcntl模块使stdin成为非阻塞文件:

import fcntl
import os
import sys# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)# user input handling thread
while mainThreadIsRunning:try: input = sys.stdin.readline()except: continuehandleInput(input)

在我看来,这比使用选择或信号模块来解决此问题要干净一些,但是再说一次,它仅适用于UNIX ...


#7楼

从JF Sebastian的答案以及其他几个方面的工作出发,我组成了一个简单的子流程管理器。 它提供对请求的非阻塞读取,以及并行运行多个进程。 它不使用任何操作系统特定的调用(据我所知),因此应该可以在任何地方使用。

它可以从pypi获得,因此只需pip install shelljob 。 有关示例和完整文档,请参考项目页面 。


#8楼

Python 3.4为异步IO引入了新的临时API - asyncio模块 。

该方法类似于@Bryan Ward的基于twisted的答案 -定义协议,并在数据准备好后立即调用其方法:

#!/usr/bin/env python3
import asyncio
import osclass SubprocessProtocol(asyncio.SubprocessProtocol):def pipe_data_received(self, fd, data):if fd == 1: # got stdout data (bytes)print(data)def connection_lost(self, exc):loop.stop() # end loop.run_forever()if os.name == 'nt':loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windowsasyncio.set_event_loop(loop)
else:loop = asyncio.get_event_loop()
try:loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol, "myprogram.exe", "arg1", "arg2"))loop.run_forever()
finally:loop.close()

请参阅docs中的“子流程” 。

有一个高级接口asyncio.create_subprocess_exec()返回Process对象 ,该对象允许使用StreamReader.readline()协程 (使用async / await Python 3.5+语法 )异步读取一行:

#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closingasync def readline_and_kill(*args):# start child processprocess = await asyncio.create_subprocess_exec(*args, stdout=PIPE)# read line (sequence of bytes ending with b'\n') asynchronouslyasync for line in process.stdout:print("got line:", line.decode(locale.getpreferredencoding(False)))breakprocess.kill()return await process.wait() # wait for the child process to exitif sys.platform == "win32":loop = asyncio.ProactorEventLoop()asyncio.set_event_loop(loop)
else:loop = asyncio.get_event_loop()with closing(loop):sys.exit(loop.run_until_complete(readline_and_kill("myprogram.exe", "arg1", "arg2")))

readline_and_kill()执行以下任务:

  • 启动子进程,将其标准输出重定向到管道
  • 从子进程的stdout异步读取一行
  • 杀死子进程
  • 等待它退出

如有必要,每个步骤可能会受到超时秒数的限制。


#9楼

我最近偶然发现了一个相同的问题,我需要在非阻塞模式下一次从流中读取一行(在子进程中运行尾部),我想避免下一个问题:不要刻录cpu,不要按一个字节读取流(就像readline一样),等等

这是我的实现https://gist.github.com/grubberr/5501e1a9760c3eab5e0a它不支持Windows(民意测验),不处理EOF,但是对我来说很好


#10楼

为什么要打扰线程和队列? 与readline()不同,BufferedReader.read1()不会阻塞等待\\ r \\ n,如果有任何输出进入,它将返回ASAP。

#!/usr/bin/python
from subprocess import Popen, PIPE, STDOUT
import iodef __main__():try:p = Popen( ["ping", "-n", "3", "127.0.0.1"], stdin=PIPE, stdout=PIPE, stderr=STDOUT )except: print("Popen failed"); quit()sout = io.open(p.stdout.fileno(), 'rb', closefd=False)while True:buf = sout.read1(1024)if len(buf) == 0: breakprint buf,if __name__ == '__main__':__main__()

#11楼

在我的情况下,我需要一个日志记录模块,该模块可以捕获后台应用程序的输出并对其进行扩充(添加时间戳,颜色等)。

我最后得到一个执行实际I / O的后台线程。 以下代码仅适用于POSIX平台。 我剥去了不必要的部分。

如果有人打算长期使用此野兽,请考虑管理开放描述符。 就我而言,这不是一个大问题。

# -*- python -*-
import fcntl
import threading
import sys, os, errno
import subprocessclass Logger(threading.Thread):def __init__(self, *modules):threading.Thread.__init__(self)try:from select import epoll, EPOLLINself.__poll = epoll()self.__evt = EPOLLINself.__to = -1except:from select import poll, POLLINprint 'epoll is not available'self.__poll = poll()self.__evt = POLLINself.__to = 100self.__fds = {}self.daemon = Trueself.start()def run(self):while True:events = self.__poll.poll(self.__to)for fd, ev in events:if (ev&self.__evt) != self.__evt:continuetry:self.__fds[fd].run()except Exception, e:print edef add(self, fd, log):assert not self.__fds.has_key(fd)self.__fds[fd] = logself.__poll.register(fd, self.__evt)class log:logger = Logger()def __init__(self, name):self.__name = nameself.__piped = Falsedef fileno(self):if self.__piped:return self.writeself.read, self.write = os.pipe()fl = fcntl.fcntl(self.read, fcntl.F_GETFL)fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK)self.fdRead = os.fdopen(self.read)self.logger.add(self.read, self)self.__piped = Truereturn self.writedef __run(self, line):self.chat(line, nl=False)def run(self):while True:try: line = self.fdRead.readline()except IOError, exc:if exc.errno == errno.EAGAIN:returnraiseself.__run(line)def chat(self, line, nl=True):if nl: nl = '\n'else: nl = ''sys.stdout.write('[%s] %s%s' % (self.__name, line, nl))def system(command, param=[], cwd=None, env=None, input=None, output=None):args = [command] + paramp = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0)p.wait()ls = log('ls')
ls.chat('go')
system("ls", ['-l', '/'], output=ls)date = log('date')
date.chat('go')
system("date", output=date)

#12楼

这是一个支持python中的非阻塞读取和后台写入的模块:

https://pypi.python.org/pypi/python-nonblock

提供功能,

nonblock_read将从流中读取数据(如果有),否则返回一个空字符串(如果流在另一侧关闭并且已读取所有可能的数据,则返回None)

您还可以考虑使用python-subprocess2模块,

https://pypi.python.org/pypi/python-subprocess2

这将添加到子流程模块。 因此,在从“ subprocess.Popen”返回的对象上添加了附加方法runInBackground。 这将启动一个线程并返回一个对象,该对象将在将内容写入stdout / stderr时自动填充,而不会阻塞您的主线程。

请享用!


#13楼

在此添加答案,因为它提供了在Windows和Unix上设置非阻塞管道的功能。

所有ctypes细节都要感谢@techtonik的回答 。

在Unix和Windows系统上都可以使用经过稍微修改的版本。

  • 与Python3兼容(仅需要很小的更改)
  • 包括posix版本,并定义要使用的异常。

这样,您可以对Unix和Windows代码使用相同的功能和异常。

# pipe_non_blocking.py (module)
"""
Example use:p = subprocess.Popen(command,stdout=subprocess.PIPE,)pipe_non_blocking_set(p.stdout.fileno())try:data = os.read(p.stdout.fileno(), 1)except PortableBlockingIOError as ex:if not pipe_non_blocking_is_error_blocking(ex):raise ex
"""__all__ = ("pipe_non_blocking_set","pipe_non_blocking_is_error_blocking","PortableBlockingIOError",)import osif os.name == "nt":def pipe_non_blocking_set(fd):# Constant could define globally but avoid polluting the name-space# thanks to: https://stackoverflow.com/questions/34504970import msvcrtfrom ctypes import windll, byref, wintypes, WinError, POINTERfrom ctypes.wintypes import HANDLE, DWORD, BOOLLPDWORD = POINTER(DWORD)PIPE_NOWAIT = wintypes.DWORD(0x00000001)def pipe_no_wait(pipefd):SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleStateSetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]SetNamedPipeHandleState.restype = BOOLh = msvcrt.get_osfhandle(pipefd)res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)if res == 0:print(WinError())return Falsereturn Truereturn pipe_no_wait(fd)def pipe_non_blocking_is_error_blocking(ex):if not isinstance(ex, PortableBlockingIOError):return Falsefrom ctypes import GetLastErrorERROR_NO_DATA = 232return (GetLastError() == ERROR_NO_DATA)PortableBlockingIOError = OSError
else:def pipe_non_blocking_set(fd):import fcntlfl = fcntl.fcntl(fd, fcntl.F_GETFL)fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)return Truedef pipe_non_blocking_is_error_blocking(ex):if not isinstance(ex, PortableBlockingIOError):return Falsereturn TruePortableBlockingIOError = BlockingIOError

为了避免读取不完整的数据,我最终编写了自己的readline生成器(该生成器返回每行的字节字符串)。

它是一个发电机,因此您可以例如...

def non_blocking_readlines(f, chunk=1024):"""Iterate over lines, yielding b'' when nothings leftor when new data is not yet available.stdout_iter = iter(non_blocking_readlines(process.stdout))line = next(stdout_iter)  # will be a line or b''."""import osfrom .pipe_non_blocking import (pipe_non_blocking_set,pipe_non_blocking_is_error_blocking,PortableBlockingIOError,)fd = f.fileno()pipe_non_blocking_set(fd)blocks = []while True:try:data = os.read(fd, chunk)if not data:# case were reading finishes with no trailing newlineyield b''.join(blocks)blocks.clear()except PortableBlockingIOError as ex:if not pipe_non_blocking_is_error_blocking(ex):raise exyield b''continuewhile True:n = data.find(b'\n')if n == -1:breakyield b''.join(blocks) + data[:n + 1]data = data[n + 1:]blocks.clear()blocks.append(data)

#14楼

这无阻塞读的版本不需要特殊的模块,并在大多数Linux发行版的工作外的开箱。

import os
import sys
import time
import fcntl
import subprocessdef async_read(fd):# set non-blocking flag while preserving old flagsfl = fcntl.fcntl(fd, fcntl.F_GETFL)fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)# read char until EOF hitwhile True:try:ch = os.read(fd.fileno(), 1)# EOFif not ch: break                                                                                                                                                              sys.stdout.write(ch)except OSError:# waiting for data be available on fdpassdef shell(args, async=True):# merge stderr and stdoutproc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)if async: async_read(proc.stdout)sout, serr = proc.communicate()return (sout, serr)if __name__ == '__main__':cmd = 'ping 8.8.8.8'sout, serr = shell(cmd.split())

#15楼

一种解决方案是使另一个进程执行该进程的读取,或者使该进程的线程超时。

这是超时功能的线程版本:

http://code.activestate.com/recipes/473878/

但是,您需要在输入stdout时对其进行阅读吗? 另一个解决方案可能是将输出转储到文件中,并等待使用p.wait()完成该过程。

f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()str = open('myprogram_output.txt','r').read()

#16楼

选择模块可帮助您确定下一个有用输入的位置。

但是,您几乎总是对单独的线程感到满意。 一个阻止读取标准输入,另一种则在您不希望阻止的位置进行。


#17楼

这是在子进程中运行交互式命令的示例,并且stdout是使用伪终端进行交互的。 您可以参考: https : //stackoverflow.com/a/43012138/3555925

#!/usr/bin/env python
# -*- coding: utf-8 -*-import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popencommand = 'bash'
# command = 'docker run -it --rm centos /bin/bash'.split()# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()# use os.setsid() make it run in a new process group, or bash job control will not be enabled
p = Popen(command,preexec_fn=os.setsid,stdin=slave_fd,stdout=slave_fd,stderr=slave_fd,universal_newlines=True)while p.poll() is None:r, w, e = select.select([sys.stdin, master_fd], [], [])if sys.stdin in r:d = os.read(sys.stdin.fileno(), 10240)os.write(master_fd, d)elif master_fd in r:o = os.read(master_fd, 10240)if o:os.write(sys.stdout.fileno(), o)# restore tty settings back
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)

#18楼

尝试使用asyncproc模块。 例如:

import os
from asyncproc import Process
myProc = Process("myprogram.app")while True:# check to see if process has endedpoll = myProc.wait(os.WNOHANG)if poll != None:break# print any new outputout = myProc.read()if out != "":print out

该模块负责S.Lott建议的所有线程。


#19楼

我的问题有点不同,因为我想从正在运行的进程中同时收集stdout和stderr,但最终还是一样,因为我想在小部件中生成生成的输出。

我不想诉诸使用Queues或其他线程的许多建议的解决方法,因为它们对于执行诸如运行另一个脚本并收集其输出之类的常见任务不是必需的。

阅读建议的解决方案和python文档后,我通过以下实现解决了我的问题。 是的,它仅适用于POSIX,因为我正在使用select函数调用。

我同意这些文档令人困惑,并且对于这种常见的脚本编写任务而言,实现很尴尬。 我认为python的较早版本对Popen默认设置不同,解释也不同,因此造成了很多混乱。 这对于Python 2.7.12和3.5.2似乎都很好。

关键是将bufsize=1设置为行缓冲,然后将universal_newlines=True作为文本文件而不是二进制文件进行处理,而二进制文件似乎在设置bufsize=1时成为默认文件。

class workerThread(QThread):def __init__(self, cmd):QThread.__init__(self)self.cmd = cmdself.result = None           ## return codeself.error = None            ## flag indicates an errorself.errorstr = ""           ## info message about the errordef __del__(self):self.wait()DEBUG("Thread removed")def run(self):cmd_list = self.cmd.split(" ")   try:cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None, universal_newlines=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE)except OSError:self.error = 1self.errorstr = "Failed to execute " + self.cmdERROR(self.errorstr)finally:VERBOSE("task started...")import selectwhile True:try:r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])if cmd.stderr in r:line = cmd.stderr.readline()if line != "":line = line.strip()self.emit(SIGNAL("update_error(QString)"), line)if cmd.stdout in r:line = cmd.stdout.readline()if line == "":breakline = line.strip()self.emit(SIGNAL("update_output(QString)"), line)except IOError:passcmd.wait()self.result = cmd.returncodeif self.result < 0:self.error = 1self.errorstr = "Task terminated by signal " + str(self.result)ERROR(self.errorstr)returnif self.result:self.error = 1self.errorstr = "exit code " + str(self.result)ERROR(self.errorstr)returnreturn

ERROR,DEBUG和VERBOSE只是将输出打印到终端的宏。

该解决方案的IMHO 99.99%有效,因为它仍然使用阻塞的readline函数,因此我们认为子过程很好并且可以输出完整的行。

我欢迎反馈以改进解决方案,因为我还是Python的新手。


#20楼

该解决方案使用select模块从IO流中“读取任何可用数据”。 此功能最初会阻塞,直到有可用数据为止,然后仅读取可用数据,并且不会进一步阻塞。

考虑到它使用select模块,因此仅在Unix上有效。

该代码完全符合PEP8。

import selectdef read_available(input_stream, max_bytes=None):"""Blocks until any data is available, then all available data is then read and returned.This function returns an empty string when end of stream is reached.Args:input_stream: The stream to read from.max_bytes (int|None): The maximum number of bytes to read. This function may return fewer bytes than this.Returns:str"""# Prepare local variablesinput_streams = [input_stream]empty_list = []read_buffer = ""# Initially block for input using 'select'if len(select.select(input_streams, empty_list, empty_list)[0]) > 0:# Poll read-readiness using 'select'def select_func():return len(select.select(input_streams, empty_list, empty_list, 0)[0]) > 0# Create while function based on parametersif max_bytes is not None:def while_func():return (len(read_buffer) < max_bytes) and select_func()else:while_func = select_funcwhile True:# Read single byte at a timeread_data = input_stream.read(1)if len(read_data) == 0:# End of streambreak# Append byte to string bufferread_buffer += read_data# Check if more data is availableif not while_func():break# Return read bufferreturn read_buffer

#21楼

使用select&read(1)。

import subprocess     #no new requirements
def readAllSoFar(proc, retVal=''): while (select.select([proc.stdout],[],[],0)[0]!=[]):   retVal+=proc.stdout.read(1)return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():print (readAllSoFar(p))

对于类似readline()的:

lines = ['']
while not p.poll():lines = readAllSoFar(p, lines[-1]).split('\n')for a in range(len(lines)-1):print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):print a

#22楼

fcntlselectasyncproc在这种情况下将无济于事。

不管使用什么操作系统,一种可靠地读取流而不阻塞的可靠方法是使用Queue.get_nowait()

import sys
from subprocess import PIPE, Popen
from threading  import Threadtry:from queue import Queue, Empty
except ImportError:from Queue import Queue, Empty  # python 2.xON_POSIX = 'posix' in sys.builtin_module_namesdef enqueue_output(out, queue):for line in iter(out.readline, b''):queue.put(line)out.close()p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()# ... do other things here# read line without blocking
try:  line = q.get_nowait() # or q.get(timeout=.1)
except Empty:print('no output yet')
else: # got line# ... do something with line

#23楼

我还遇到了Jesse所描述的问题,并像Bradley , Andy和其他人一样使用“选择”来解决该问题,但是在阻塞模式下避免了繁忙的循环。 它使用虚拟管道作为伪造的标准输入。 选择将阻塞并等待标准输入或管道准备就绪。 按下键时,stdin会取消选择的阻塞,并且可以使用read(1)检索键值。 当不同的线程写入管道时,管道将解除对选择的阻塞,并且可以将其视为对stdin的需求已结束的指示。 这是一些参考代码:

import sys
import os
from select import select# -------------------------------------------------------------------------
# Set the pipe (fake stdin) to simulate a final key stroke
# which will unblock the select statement
readEnd, writeEnd = os.pipe()
readFile = os.fdopen(readEnd)
writeFile = os.fdopen(writeEnd, "w")# -------------------------------------------------------------------------
def getKey():# Wait for stdin or pipe (fake stdin) to be readydr,dw,de = select([sys.__stdin__, readFile], [], [])# If stdin is the one ready then read it and return valueif sys.__stdin__ in dr:return sys.__stdin__.read(1)   # For Windows use ----> getch() from module msvcrt# Must finishelse:return None# -------------------------------------------------------------------------
def breakStdinRead():writeFile.write(' ')writeFile.flush()# -------------------------------------------------------------------------
# MAIN CODE# Get key stroke
key = getKey()# Keyboard input
if key:# ... do your stuff with the key value# Faked keystroke
else:# ... use of stdin finished# -------------------------------------------------------------------------
# OTHER THREAD CODEbreakStdinRead()

#24楼

我有原始发问者的问题,但不希望调用线程。 我将Jesse的解决方案与管道中的直接read()以及我自己的用于行读取的缓冲区处理程序混合在一起(但是,我的子进程ping总是写完整的行<系统页面大小)。 通过仅阅读通过gobject注册的io手表,可以避免繁忙的等待。 这些天,我通常在gobject MainLoop中运行代码以避免线程。

def set_up_ping(ip, w):
# run the sub-process
# watch the resultant pipe
p = subprocess.Popen(['/bin/ping', ip], stdout=subprocess.PIPE)
# make stdout a non-blocking file
fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
stdout_gid = gobject.io_add_watch(p.stdout, gobject.IO_IN, w)
return stdout_gid # for shutting down

观察者是

def watch(f, *other):
print 'reading',f.read()
return True

并且主程序设置ping,然后调用gobject邮件循环。

def main():
set_up_ping('192.168.1.8', watch)
# discard gid as unused here
gobject.MainLoop().run()

其他任何工作都附加到gobject中的回调中。


#25楼

在现代Python中,情况要好得多。

这是一个简单的子程序“ hello.py”:

#!/usr/bin/env python3while True:i = input()if i == "quit":breakprint(f"hello {i}")

和一个与之交互的程序:

import asyncioasync def main():proc = await asyncio.subprocess.create_subprocess_exec("./hello.py", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE)proc.stdin.write(b"bob\n")print(await proc.stdout.read(1024))proc.stdin.write(b"alice\n")print(await proc.stdout.read(1024))proc.stdin.write(b"quit\n")await proc.wait()asyncio.run(main())

打印出来:

b'hello bob\n'
b'hello alice\n'

请注意,实际模式(在此处以及在相关问题中,几乎所有先前的答案也是如此)是将子级的stdout文件描述符设置为非阻塞,然后在某种选择循环中对其进行轮询。 这些天,当然,该循环是由asyncio提供的。


#26楼

我添加此问题以读取一些subprocess.Popen stdout。 这是我的非阻塞读取解决方案:

import fcntldef non_block_read(output):fd = output.fileno()fl = fcntl.fcntl(fd, fcntl.F_GETFL)fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)try:return output.read()except:return ""# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test\n'

#27楼

您可以在Twisted中非常轻松地执行此操作。 根据您现有的代码库,这可能不是那么容易使用,但是如果您正在构建一个扭曲的应用程序,那么类似这样的事情将变得微不足道。 您创建一个ProcessProtocol类,并覆盖outReceived()方法。 扭曲(取决于所使用的反应堆)通常只是一个很大的select()循环,其中安装了用于处理来自不同文件描述符(通常是网络套接字)的数据的回调。 因此, outReceived()方法仅安装用于处理来自STDOUT数据的回调。 演示此行为的简单示例如下:

from twisted.internet import protocol, reactorclass MyProcessProtocol(protocol.ProcessProtocol):def outReceived(self, data):print dataproc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()

Twisted文档对此有一些很好的信息。

如果您围绕Twisted构建整个应用程序,它将与本地或远程的其他进程进行异步通信,就像这样非常优雅。 另一方面,如果您的程序不是基于Twisted构建的,那么它实际上并没有那么大的帮助。 希望这对其他读者有帮助,即使它不适用于您的特定应用程序。

在Python中对子进程进行非阻塞读取相关推荐

  1. linux 管道非阻塞,在Linux中管道上的非阻塞读取

    可以在管道上进行非阻塞I / O吗? fcntl无法设置O_NONBLOCK. Linux编程接口的页面918包括一个表'从管道读取n个字节或FIFO(p)'的语义.此表列出了管道和FIFO的行为,其 ...

  2. python中的进程池:multiprocessing.Pool()

    python中的进程池: 我们可以写出自己希望进程帮助我们完成的任务,然后把任务批量交给进程池 进程池帮助我们创建进程完成任务,不需要我们管理. 进程池:利用multiprocessing 下的Poo ...

  3. python文件读取方法read(size)的含义是_在Python中可使用read([size])来读取文件中的数据,如果参数size省略,则读取文件中的()。...

    [单选题]李明在他所属的公司工作五年,每天都很认真地处理繁杂的事情,同事们都夸他认真,但是依然没有建树,这是因为: [多选题]品牌标志的作用表现在 [单选题]新产品开发的第一个阶段是_______. ...

  4. python文件读取方法read(size)的含义是_在Python中可使用read([size])来读取文件中的数据,如果参数size省略,则读取文件中的()。(4.0分)_学小易找答案...

    [单选题]文本文件存储的是(),由若干文本行组成,通常每行以换行符 '\n' 结尾.(4.0分) [单选题]()属性是返回被打开文件的访问模式.(4.0分) [单选题]重力坝是由砼或( )修筑而成的大 ...

  5. 在Python中FITS格式文件数据的读取 (转载)

    在Python中FITS格式文件数据的读取 (转载) 前言 \space\space\space\space     FITS(Flexible Image Transport System)格式文件 ...

  6. Python 中的进程、线程、协程、同步、异步、回调(一)

    在进一步之前,让我们先回顾一下各种上下文切换技术. 不过首先说明一点术语.当我们说"上下文"的时候,指的是程序在执行中的一个状态.通常我们会用调用栈来表示这个状态--栈记载了每个调 ...

  7. [Python]再学 socket 之非阻塞 Server

    再学 socket 之非阻塞 Server 本文是基于 python2.7 实现,运行于 Mac 系统下 本篇文章是上一篇初探 socket 的续集, 上一篇文章介绍了:如何建立起一个基本的 sock ...

  8. 用于Python中的进程和系统监视的跨平台库psutil

    最近平台运行,出现一些问题,考虑如何设置监控.发现一个Python库,感觉非常实用.分享一下. psutil(Process and system实用程序)是一个跨平台库,用于检索运行过程和系统利用( ...

  9. Python中的进程和线程(20)

    进程和线程 进程 创建多进程 进程和全局变量 传递参数和进程号 进程的状态 进程之间通信 put() get()方法 生产者和消费者 进程子类化 生产者和消费者子类化 进程池 线程 线程子类化 共享全 ...

最新文章

  1. 全“芯”关注用户需求 AMD“超轻薄笔记本”杀出重围
  2. [Python]urllib库的简单应用-实现北航宿舍自动上网
  3. 日期格式转换 java 2016-09-03T00:00:00.000+08:00
  4. 秒杀系统流量削峰,这事应该怎么做?
  5. DES算法C语言实现
  6. 如何与室友相处?2017-12-08
  7. 免费下载|《云原生时代下的App开发》走进阿里云一站式应用研发平台EMAS
  8. oracle blob查重,如何解决oracle blob字段 的乱码问题
  9. ArcGIS Pro 简明教程(3)数据编辑
  10. 用vue-cli+iview做项目不兼容ie问题
  11. 计算机桌面如何分区,讲解电脑如何分区
  12. 宝马屏幕共享android,宝马屏幕共享功能怎么用
  13. java计算机毕业设计高校多媒体设备报修管理系统源码+mysql数据库+系统+lw文档+部署
  14. 电路与电子技术课程设计报告(正弦、方波-三角波、可调矩形波、可调锯齿波发生器)
  15. 基于JavaWeb的网上订餐网站设计与实现 毕业论文+任务书+外文翻译及原文+答辩PPT+项目源码及数据库文件
  16. 用c语言求五位回文数,C语言求回文数(详解版)
  17. 高瓴投的澳斯康生物冲刺科创板:年营收4.5亿 丢掉与康希诺合作
  18. 简单四步抓取腾讯视频MP4文件
  19. 闪动的TextView
  20. Java每天10道面试题,跟我走,offer有!(十)

热门文章

  1. 【剑指offer-Java版】40数组中只出现一次的数字
  2. Visual Studio 2012 Ultimate 下载和密钥以及配置文件的导入
  3. Android10.0 日志系统分析(三)-logd、logcat读写日志源码分析-[Android取经之路]
  4. jquery mobile实例
  5. oauth2 单点登录_Spring Security Oauth2和Spring Boot实现单点登录
  6. uniapp自定义顶部导航组件
  7. MySQL如何修改密码
  8. Jzoj4729 道路修建
  9. 清北学堂模拟赛d5t4 套路
  10. winform控件大小改变是防止背景重绘导致的闪烁