Использование модуля "подпроцесс" с таймаутом
вот код Python для запуска произвольной команды, возвращающей его stdout данные, или вызвать исключение для ненулевых кодов выхода:
proc = subprocess.Popen(
cmd,
stderr=subprocess.STDOUT, # Merge stdout and stderr
stdout=subprocess.PIPE,
shell=True)
communicate используется для ожидания завершения процесса:
stdoutdata, stderrdata = proc.communicate()
The subprocess модуль не поддерживает тайм-аут-возможность убить процесс, выполняемый на более чем X секунд, поэтому communicate может занять целую вечность, чтобы бежать.
что такое простой способ реализации таймаутов в Python программа предназначена для работы на Windows и Linux?
27 ответов:
В Python 3.3+:
from subprocess import STDOUT, check_output output = check_output(cmd, stderr=STDOUT, timeout=seconds)
output- это байтовая строка, содержащая объединенные данные stdout, stderr команды.этот код вызывает
CalledProcessErrorв ненулевом состоянии выхода, как указано в тексте вопроса, в отличие отproc.communicate()метод.Я убрал
shell=Trueпотому что он часто используется без необходимости. Вы всегда можете добавить его обратно, еслиcmdдействительно требует этого. Если вы добавитеshell=Trueт. е. если дочерний процесс порождает своих собственных потомков;check_output()может вернуться намного позже, чем указывает тайм-аут, см. сбой таймаута подпроцесса.функция тайм-аута доступна на Python 2.х через
subprocess32backport модуля подпроцесса 3.2+.
Я не знаю много о низкоуровневых деталях, но, учитывая, что в в Python 2.6 API-интерфейса обеспечивает возможность ждать потоков и завершите процессы, как насчет запуска процесса в отдельном нить?
import subprocess, threading class Command(object): def __init__(self, cmd): self.cmd = cmd self.process = None def run(self, timeout): def target(): print 'Thread started' self.process = subprocess.Popen(self.cmd, shell=True) self.process.communicate() print 'Thread finished' thread = threading.Thread(target=target) thread.start() thread.join(timeout) if thread.is_alive(): print 'Terminating process' self.process.terminate() thread.join() print self.process.returncode command = Command("echo 'Process started'; sleep 2; echo 'Process finished'") command.run(timeout=3) command.run(timeout=1)вывод этого фрагмента в моей машине:
Thread started Process started Process finished Thread finished 0 Thread started Process started Terminating process Thread finished -15где видно, что при первом выполнении процесс закончено правильно (код возврата 0), в то время как во втором процесс был завершен (код возврата -15).
Я не тестировал в windows; но, помимо обновления примера команда, я думаю, что это должно работать, так как я не нашел в документация все, что говорит этот поток.присоединиться или обработать.прекратить не поддерживаемый.
ответ jcollado может быть упрощен с помощью потоков.Таймер класс:
import shlex from subprocess import Popen, PIPE from threading import Timer def run(cmd, timeout_sec): proc = Popen(shlex.split(cmd), stdout=PIPE, stderr=PIPE) timer = Timer(timeout_sec, proc.kill) try: timer.start() stdout, stderr = proc.communicate() finally: timer.cancel() # Examples: both take 1 second run("sleep 1", 5) # process ends normally at 1 second run("sleep 5", 1) # timeout happens at 1 second
Если вы на Unix,
import signal ... class Alarm(Exception): pass def alarm_handler(signum, frame): raise Alarm signal.signal(signal.SIGALRM, alarm_handler) signal.alarm(5*60) # 5 minutes try: stdoutdata, stderrdata = proc.communicate() signal.alarm(0) # reset the alarm except Alarm: print "Oops, taking too long!" # whatever else
вот решение Алекса Мартелли как модуль с правильным процессом убийства. Другие подходы не работают, потому что они не используют proc.общаться.)( Поэтому, если у вас есть процесс, который производит много выходных данных, он заполнит свой выходной буфер, а затем заблокирует, пока вы не прочитаете что-то из него.
from os import kill from signal import alarm, signal, SIGALRM, SIGKILL from subprocess import PIPE, Popen def run(args, cwd = None, shell = False, kill_tree = True, timeout = -1, env = None): ''' Run a command with a timeout after which it will be forcibly killed. ''' class Alarm(Exception): pass def alarm_handler(signum, frame): raise Alarm p = Popen(args, shell = shell, cwd = cwd, stdout = PIPE, stderr = PIPE, env = env) if timeout != -1: signal(SIGALRM, alarm_handler) alarm(timeout) try: stdout, stderr = p.communicate() if timeout != -1: alarm(0) except Alarm: pids = [p.pid] if kill_tree: pids.extend(get_process_children(p.pid)) for pid in pids: # process might have died before getting to this line # so wrap to avoid OSError: no such process try: kill(pid, SIGKILL) except OSError: pass return -9, '', '' return p.returncode, stdout, stderr def get_process_children(pid): p = Popen('ps --no-headers -o pid --ppid %d' % pid, shell = True, stdout = PIPE, stderr = PIPE) stdout, stderr = p.communicate() return [int(p) for p in stdout.split()] if __name__ == '__main__': print run('find /', shell = True, timeout = 3) print run('find', shell = True)
я модифицировал sussudio ответ. Теперь функция возвращает: (
returncode,stdout,stderr,timeout) -stdoutиstderrдекодируется в строку utf-8def kill_proc(proc, timeout): timeout["value"] = True proc.kill() def run(cmd, timeout_sec): proc = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE) timeout = {"value": False} timer = Timer(timeout_sec, kill_proc, [proc, timeout]) timer.start() stdout, stderr = proc.communicate() timer.cancel() return proc.returncode, stdout.decode("utf-8"), stderr.decode("utf-8"), timeout["value"]
удивлен, что никто не упоминал об использовании
timeout
timeout 5 ping -c 3 somehostэто не будет работать для каждого случая использования, очевидно, но если вы имеете дело с простым скриптом, это трудно превзойти.
также доступно как gtimeout в coreutils через
homebrewдля пользователей mac.
другой вариант-записать во временный файл, чтобы предотвратить блокировку stdout вместо необходимости опроса с помощью communicate (). Это сработало для меня, где другие ответы не сделали; например, на windows.
outFile = tempfile.SpooledTemporaryFile() errFile = tempfile.SpooledTemporaryFile() proc = subprocess.Popen(args, stderr=errFile, stdout=outFile, universal_newlines=False) wait_remaining_sec = timeout while proc.poll() is None and wait_remaining_sec > 0: time.sleep(1) wait_remaining_sec -= 1 if wait_remaining_sec <= 0: killProc(proc.pid) raise ProcessIncompleteError(proc, timeout) # read temp streams from start outFile.seek(0); errFile.seek(0); out = outFile.read() err = errFile.read() outFile.close() errFile.close()
timeoutподдержка bycall()иcommunicate()в модуле подпроцесса (начиная с Python3. 3):import subprocess subprocess.call("command", timeout=20, shell=True)это вызовет команду и вызовет исключение
subprocess.TimeoutExpiredесли команда не завершается через 20 секунд.
затем вы можете обработать исключение, чтобы продолжить свой код, что-то вроде:
try: subprocess.call("command", timeout=20, shell=True) except subprocess.TimeoutExpired: # insert code hereнадеюсь, что это помогает.
вот мое решение, я использовал поток и событие:
import subprocess from threading import Thread, Event def kill_on_timeout(done, timeout, proc): if not done.wait(timeout): proc.kill() def exec_command(command, timeout): done = Event() proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) watcher = Thread(target=kill_on_timeout, args=(done, timeout, proc)) watcher.daemon = True watcher.start() data, stderr = proc.communicate() done.set() return data, stderr, proc.returncodeдействие:
In [2]: exec_command(['sleep', '10'], 5) Out[2]: ('', '', -9) In [3]: exec_command(['sleep', '10'], 11) Out[3]: ('', '', 0)
решение, которое я использую, - это префикс команды оболочки с timelimit. Если команда занимает слишком много времени, timelimit остановит ее, и у Popen будет код возврата, установленный timelimit. Если это > 128, это означает, что timelimit убил процесс.
см. также подпроцесс python с таймаутом и большим выходом (>64K)
я добавил решение с резьбой от
jcolladoв мой модуль Python easyprocess.установка:
pip install easyprocessпример:
from easyprocess import Proc # shell is not supported! stdout=Proc('ping localhost').call(timeout=1.5).stdout print stdout
Если вы используете python 2, попробуйте
import subprocess32 try: output = subprocess32.check_output(command, shell=True, timeout=3) except subprocess32.TimeoutExpired as e: print e
я реализовал то, что я мог собрать из нескольких из них. Это работает в Windows, и поскольку это Вики-сообщество, я думаю, что я также поделюсь своим кодом:
class Command(threading.Thread): def __init__(self, cmd, outFile, errFile, timeout): threading.Thread.__init__(self) self.cmd = cmd self.process = None self.outFile = outFile self.errFile = errFile self.timed_out = False self.timeout = timeout def run(self): self.process = subprocess.Popen(self.cmd, stdout = self.outFile, \ stderr = self.errFile) while (self.process.poll() is None and self.timeout > 0): time.sleep(1) self.timeout -= 1 if not self.timeout > 0: self.process.terminate() self.timed_out = True else: self.timed_out = Falseзатем из другого класса или файл:
outFile = tempfile.SpooledTemporaryFile() errFile = tempfile.SpooledTemporaryFile() executor = command.Command(c, outFile, errFile, timeout) executor.daemon = True executor.start() executor.join() if executor.timed_out: out = 'timed out' else: outFile.seek(0) errFile.seek(0) out = outFile.read() err = errFile.read() outFile.close() errFile.close()
Как только вы поймете полный процесс запуска машин в *unix, вы легко найдете более простое решение:
рассмотрим этот простой пример, как сделать timeoutable communicate() meth с помощью select.выберите () (доступный alsmost everythere на *nix в настоящее время). Это также может быть написано с помощью epoll / poll / kqueue, но выберите.вариант select () может быть хорошим примером для вас. И основные ограничения выбора.выберите () (Скорость и 1024 max fds) не применимы для вашего задача.
это работает под *nix, не создает потоки, не использует сигналы, может быть lauched из любого потока (не только основной), и достаточно быстро, чтобы прочитать 250 МБ/с данных из stdout на моей машине (i5 2.3 ghz).
существует проблема в присоединении stdout/stderr в конце связи. Если у вас есть огромный выход программы, это может привести к большой памяти. Но вы можете вызвать communicate () несколько раз с меньшими таймаутами.
class Popen(subprocess.Popen): def communicate(self, input=None, timeout=None): if timeout is None: return subprocess.Popen.communicate(self, input) if self.stdin: # Flush stdio buffer, this might block if user # has been writing to .stdin in an uncontrolled # fashion. self.stdin.flush() if not input: self.stdin.close() read_set, write_set = [], [] stdout = stderr = None if self.stdin and input: write_set.append(self.stdin) if self.stdout: read_set.append(self.stdout) stdout = [] if self.stderr: read_set.append(self.stderr) stderr = [] input_offset = 0 deadline = time.time() + timeout while read_set or write_set: try: rlist, wlist, xlist = select.select(read_set, write_set, [], max(0, deadline - time.time())) except select.error as ex: if ex.args[0] == errno.EINTR: continue raise if not (rlist or wlist): # Just break if timeout # Since we do not close stdout/stderr/stdin, we can call # communicate() several times reading data by smaller pieces. break if self.stdin in wlist: chunk = input[input_offset:input_offset + subprocess._PIPE_BUF] try: bytes_written = os.write(self.stdin.fileno(), chunk) except OSError as ex: if ex.errno == errno.EPIPE: self.stdin.close() write_set.remove(self.stdin) else: raise else: input_offset += bytes_written if input_offset >= len(input): self.stdin.close() write_set.remove(self.stdin) # Read stdout / stderr by 1024 bytes for fn, tgt in ( (self.stdout, stdout), (self.stderr, stderr), ): if fn in rlist: data = os.read(fn.fileno(), 1024) if data == '': fn.close() read_set.remove(fn) tgt.append(data) if stdout is not None: stdout = ''.join(stdout) if stderr is not None: stderr = ''.join(stderr) return (stdout, stderr)
Вы можете сделать это с помощью
selectimport subprocess from datetime import datetime from select import select def call_with_timeout(cmd, timeout): started = datetime.now() sp = subprocess.Popen(cmd, stdout=subprocess.PIPE) while True: p = select([sp.stdout], [], [], timeout) if p[0]: p[0][0].read() ret = sp.poll() if ret is not None: return ret if (datetime.now()-started).total_seconds() > timeout: sp.kill() return None
Я не знаю, почему это не упоминается, но с Python 3.5, есть новый
subprocess.runуниверсальная команда (которая предназначена для заменыcheck_call,check_output...) и который имеетtimeoutпараметр также.подпроцесс.запуска(параметр args, *, ввода=нет, входной сигнал=нет, стандартный вывод=Нет, поток stderr=нет, Шелл=false, то текущий рабочий каталог=нет, таймаут=нет, проверить=false, то кодировка=нет ошибок=нет)
Run the command described by args. Wait for command to complete, then return a CompletedProcess instance.вызывает
subprocess.TimeoutExpiredисключения, когда тайм-аут истeкший.
Я использовал killableprocess успешно на Windows, Linux и Mac. Если вы используете Cygwin Python, вам понадобится версия осаф killableprocess потому что в противном случае собственные процессы Windows не будут убиты.
хотя я не смотрел на него широко, это оформителя Я нашел в ActiveState, кажется, очень полезно для такого рода вещей. Вместе с
subprocess.Popen(..., close_fds=True), по крайней мере, я готов к shell-скриптов в Python.
есть идея подкласса Popen класса и расширить его с помощью некоторых простых методов декораторов. Назовем это ExpirablePopen.
from logging import error from subprocess import Popen from threading import Event from threading import Thread class ExpirablePopen(Popen): def __init__(self, *args, **kwargs): self.timeout = kwargs.pop('timeout', 0) self.timer = None self.done = Event() Popen.__init__(self, *args, **kwargs) def __tkill(self): timeout = self.timeout if not self.done.wait(timeout): error('Terminating process {} by timeout of {} secs.'.format(self.pid, timeout)) self.kill() def expirable(func): def wrapper(self, *args, **kwargs): # zero timeout means call of parent method if self.timeout == 0: return func(self, *args, **kwargs) # if timer is None, need to start it if self.timer is None: self.timer = thr = Thread(target=self.__tkill) thr.daemon = True thr.start() result = func(self, *args, **kwargs) self.done.set() return result return wrapper wait = expirable(Popen.wait) communicate = expirable(Popen.communicate) if __name__ == '__main__': from subprocess import PIPE print ExpirablePopen('ssh -T [email protected]', stdout=PIPE, timeout=1).communicate()
у меня была проблема, что я хотел завершить многопоточный подпроцесс, если это заняло больше времени, чем заданная длина таймаута. Я хотел установить тайм-аут в
Popen(), но это не сработало. Тогда я понял, чтоPopen().wait()равнаcall()и поэтому у меня возникла идея установить тайм-аут внутри.wait(timeout=xxx)метод, который, наконец, работал. Таким образом, я решил это так:import os import sys import signal import subprocess from multiprocessing import Pool cores_for_parallelization = 4 timeout_time = 15 # seconds def main(): jobs = [...YOUR_JOB_LIST...] with Pool(cores_for_parallelization) as p: p.map(run_parallel_jobs, jobs) def run_parallel_jobs(args): # Define the arguments including the paths initial_terminal_command = 'C:\Python34\python.exe' # Python executable function_to_start = 'C:\temp\xyz.py' # The multithreading script final_list = [initial_terminal_command, function_to_start] final_list.extend(args) # Start the subprocess and determine the process PID subp = subprocess.Popen(final_list) # starts the process pid = subp.pid # Wait until the return code returns from the function by considering the timeout. # If not, terminate the process. try: returncode = subp.wait(timeout=timeout_time) # should be zero if accomplished except subprocess.TimeoutExpired: # Distinguish between Linux and Windows and terminate the process if # the timeout has been expired if sys.platform == 'linux2': os.kill(pid, signal.SIGTERM) elif sys.platform == 'win32': subp.terminate() if __name__ == '__main__': main()
добавление команды Linux
timeoutэто не плохое решение, и это сработало для меня.cmd = "timeout 20 "+ cmd subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) (output, err) = p.communicate()
к сожалению, я связан очень строгой политикой по раскрытию исходного кода моим работодателем, поэтому я не могу предоставить фактический код. Но, на мой вкус, лучшее решение заключается в создании подкласса переопределение
Popen.wait()опрашивать вместо того, чтобы ждать бесконечно, иPopen.__init__принять параметр timeout. Как только вы это сделаете, все остальныеPopenметоды (которые называютwait) будет работать, как ожидалось, в том числеcommunicate.
https://pypi.python.org/pypi/python-subprocess2 предоставляет расширения для модуля подпроцесса, которые позволяют ждать до определенного периода времени, в противном случае завершить.
Итак, чтобы дождаться завершения процесса до 10 секунд, в противном случае убейте:
pipe = subprocess.Popen('...') timeout = 10 results = pipe.waitOrTerminate(timeout)это совместимо как с windows, так и с unix. "результаты" - это словарь, он содержит "код возврата", который является возвращением приложения (или нет, если его нужно было убить), а также "actionTaken". который будет "SUBPROCESS2_PROCESS_COMPLETED", если процесс завершен нормально, или маска "SUBPROCESS2_PROCESS_TERMINATED" и SUBPROCESS2_PROCESS_KILLED в зависимости от предпринятых действий (см. документацию для получения полной информации)
Это решение убивает дерево процессов в случае shell=True, передает параметры процессу (или нет), имеет таймаут и получает stdout, stderr и вывод процесса обратного вызова (он использует psutil для kill_proc_tree). Это было основано на нескольких решениях, опубликованных в SO, включая Jcollado. Posting в ответ на комментарии Anson и jradice в ответе jcollado. Протестировано в Windows Srvr 2012 и Ubuntu 14.04. Обратите внимание, что для Ubuntu вам нужно изменить родителя.дети.(..) звоните родителям.get_children(...).
def kill_proc_tree(pid, including_parent=True): parent = psutil.Process(pid) children = parent.children(recursive=True) for child in children: child.kill() psutil.wait_procs(children, timeout=5) if including_parent: parent.kill() parent.wait(5) def run_with_timeout(cmd, current_dir, cmd_parms, timeout): def target(): process = subprocess.Popen(cmd, cwd=current_dir, shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE) # wait for the process to terminate if (cmd_parms == ""): out, err = process.communicate() else: out, err = process.communicate(cmd_parms) errcode = process.returncode thread = Thread(target=target) thread.start() thread.join(timeout) if thread.is_alive(): me = os.getpid() kill_proc_tree(me, including_parent=False) thread.join()
import subprocess, optparse, os, sys, re, datetime, threading, time, glob, shutil, xml.dom.minidom, traceback class OutputManager: def __init__(self, filename, mode, console, logonly): self.con = console self.logtoconsole = True self.logtofile = False if filename: try: self.f = open(filename, mode) self.logtofile = True if logonly == True: self.logtoconsole = False except IOError: print (sys.exc_value) print ("Switching to console only output...\n") self.logtofile = False self.logtoconsole = True def write(self, data): if self.logtoconsole == True: self.con.write(data) if self.logtofile == True: self.f.write(data) sys.stdout.flush() def getTimeString(): return time.strftime("%Y-%m-%d", time.gmtime()) def runCommand(command): ''' Execute a command in new thread and return the stdout and stderr content of it. ''' try: Output = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True).communicate()[0] except Exception as e: print ("runCommand failed :%s" % (command)) print (str(e)) sys.stdout.flush() return None return Output def GetOs(): Os = "" if sys.platform.startswith('win32'): Os = "win" elif sys.platform.startswith('linux'): Os = "linux" elif sys.platform.startswith('darwin'): Os = "mac" return Os def check_output(*popenargs, **kwargs): try: if 'stdout' in kwargs: raise ValueError('stdout argument not allowed, it will be overridden.') # Get start time. startTime = datetime.datetime.now() timeoutValue=3600 cmd = popenargs[0] if sys.platform.startswith('win32'): process = subprocess.Popen( cmd, stdout=subprocess.PIPE, shell=True) elif sys.platform.startswith('linux'): process = subprocess.Popen( cmd , stdout=subprocess.PIPE, shell=True ) elif sys.platform.startswith('darwin'): process = subprocess.Popen( cmd , stdout=subprocess.PIPE, shell=True ) stdoutdata, stderrdata = process.communicate( timeout = timeoutValue ) retcode = process.poll() #################################### # Catch crash error and log it. #################################### OutputHandle = None try: if retcode >= 1: OutputHandle = OutputManager( 'CrashJob_' + getTimeString() + '.txt', 'a+', sys.stdout, False) OutputHandle.write( cmd ) print (stdoutdata) print (stderrdata) sys.stdout.flush() except Exception as e: print (str(e)) except subprocess.TimeoutExpired: #################################### # Catch time out error and log it. #################################### Os = GetOs() if Os == 'win': killCmd = "taskkill /FI \"IMAGENAME eq {0}\" /T /F" elif Os == 'linux': killCmd = "pkill {0)" elif Os == 'mac': # Linux, Mac OS killCmd = "killall -KILL {0}" runCommand(killCmd.format("java")) runCommand(killCmd.format("YouApp")) OutputHandle = None try: OutputHandle = OutputManager( 'KillJob_' + getTimeString() + '.txt', 'a+', sys.stdout, False) OutputHandle.write( cmd ) except Exception as e: print (str(e)) except Exception as e: for frame in traceback.extract_tb(sys.exc_info()[2]): fname,lineno,fn,text = frame print "Error in %s on line %d" % (fname, lineno)
просто пытался написать что-то попроще.
#!/usr/bin/python from subprocess import Popen, PIPE import datetime import time popen = Popen(["/bin/sleep", "10"]); pid = popen.pid sttime = time.time(); waittime = 3 print "Start time %s"%(sttime) while True: popen.poll(); time.sleep(1) rcode = popen.returncode now = time.time(); if [ rcode is None ] and [ now > (sttime + waittime) ] : print "Killing it now" popen.kill()
Comments