13 ответов:
можно использовать сигнал пакет, если вы работаете на UNIX:
In [1]: import signal # Register an handler for the timeout In [2]: def handler(signum, frame): ...: print "Forever is over!" ...: raise Exception("end of time") ...: # This function *may* run for an indetermined time... In [3]: def loop_forever(): ...: import time ...: while 1: ...: print "sec" ...: time.sleep(1) ...: ...: # Register the signal function handler In [4]: signal.signal(signal.SIGALRM, handler) Out[4]: 0 # Define a timeout for your function In [5]: signal.alarm(10) Out[5]: 0 In [6]: try: ...: loop_forever() ...: except Exception, exc: ...: print exc ....: sec sec sec sec sec sec sec sec Forever is over! end of time # Cancel the timer if the function returned before timeout # (ok, mine won't but yours maybe will :) In [7]: signal.alarm(0) Out[7]: 010 секунд после звонка
alarm.alarm(10)обработчик вызывается. Это вызывает исключение, которое можно перехватить из обычного кода Python.этот модуль не может работать с потоками (но тогда, кто же?)
отметим, что поскольку мы создаем исключение, когда происходит тайм-аут, оно может быть поймано и проигнорировано внутри функции, например одного такая функция:
def loop_forever(): while 1: print 'sec' try: time.sleep(10) except: continue
можно использовать
multiprocessing.Processсделать именно это.код
import multiprocessing import time # bar def bar(): for i in range(100): print "Tick" time.sleep(1) if __name__ == '__main__': # Start bar as a process p = multiprocessing.Process(target=bar) p.start() # Wait for 10 seconds or until process finishes p.join(10) # If thread is still active if p.is_alive(): print "running... let's kill it..." # Terminate p.terminate() p.join()
как я могу вызвать функцию или что я обернуть его так, что если это займет больше 5 секунд сценарий отменяет его?
я написал суть это решает этот вопрос / проблему с декоратором и
threading.Timer. Вот она с поломкой.импорт и настройки для совместимости
он был протестирован с Python 2 и 3. Он также должен работать под Unix/Linux и Windows.
сначала импорт. Они пытаются сохранить согласованность кода независимо от версии Python:
from __future__ import print_function import sys import threading from time import sleep try: import thread except ImportError: import _thread as threadиспользуйте независимый от версии код:
try: range, _print = xrange, print def print(*args, **kwargs): flush = kwargs.pop('flush', False) _print(*args, **kwargs) if flush: kwargs.get('file', sys.stdout).flush() except NameError: passтеперь мы импортировали нашу функциональность из стандартной библиотеки.
exit_afterоформителяДалее нам нужна функция для завершения
main()из дочернего потока:def quit_function(fn_name): # print to stderr, unbuffered in Python 2. print('{0} took too long'.format(fn_name), file=sys.stderr) sys.stderr.flush() # Python 3 stderr is likely buffered. thread.interrupt_main() # raises KeyboardInterruptа вот и сам оформитель:
def exit_after(s): ''' use as decorator to exit process if function takes longer than s seconds ''' def outer(fn): def inner(*args, **kwargs): timer = threading.Timer(s, quit_function, args=[fn.__name__]) timer.start() try: result = fn(*args, **kwargs) finally: timer.cancel() return result return inner return outerиспользование
и вот использования, что напрямую отвечает на ваш вопрос о выходе через 5 секунд!:
@exit_after(5) def countdown(n): print('countdown started', flush=True) for i in range(n, -1, -1): print(i, end=', ', flush=True) sleep(1) print('countdown finished')Demo:
>>> countdown(3) countdown started 3, 2, 1, 0, countdown finished >>> countdown(10) countdown started 10, 9, 8, 7, 6, countdown took too long Traceback (most recent call last): File "<stdin>", line 1, in <module> File "<stdin>", line 11, in inner File "<stdin>", line 6, in countdown KeyboardInterruptвторой вызов функции не завершится, вместо этого процесс должен выйти с обратной трассировкой!
KeyboardInterruptне всегда останавливает спящий потокобратите внимание, что сон не всегда будет прерван прерыванием клавиатуры, на Python 2 в Windows, например:
@exit_after(1) def sleep10(): sleep(10) print('slept 10 seconds') >>> sleep10() sleep10 took too long # Note that it hangs here about 9 more seconds Traceback (most recent call last): File "<stdin>", line 1, in <module> File "<stdin>", line 11, in inner File "<stdin>", line 3, in sleep10 KeyboardInterruptи вряд ли это может прервать выполнение кода расширения, если он явно не проверяет
PyErr_CheckSignals()см. Cython, Python и KeyboardInterrupt игнорируютсяя бы не стал спать поток больше секунды, в любом случае - это Эон в процессорном времени.
как я могу вызвать функцию или что я обернуть его так, что если это займет больше 5 секунд скрипт отменяет его и делает что-то еще?
, чтобы поймать его и сделать что-то еще, вы может поймать KeyboardInterrupt.
>>> try: ... countdown(10) ... except KeyboardInterrupt: ... print('do something else') ... countdown started 10, 9, 8, 7, 6, countdown took too long do something else
У меня есть другое предложение, которое является чистой функцией (с тем же API, что и потоковое предложение) и, похоже, работает нормально (на основе предложений по этому потоку)
def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None): import signal class TimeoutError(Exception): pass def handler(signum, frame): raise TimeoutError() # set the timeout handler signal.signal(signal.SIGALRM, handler) signal.alarm(timeout_duration) try: result = func(*args, **kwargs) except TimeoutError as exc: result = default finally: signal.alarm(0) return result
я наткнулся на этот поток при поиске тайм-аута вызова модульных тестов. Я не нашел ничего простого в ответах или пакетах сторонних производителей, поэтому я написал декоратор ниже, вы можете перейти прямо в код:
import multiprocessing.pool import functools def timeout(max_timeout): """Timeout decorator, parameter in seconds.""" def timeout_decorator(item): """Wrap the original function.""" @functools.wraps(item) def func_wrapper(*args, **kwargs): """Closure for function.""" pool = multiprocessing.pool.ThreadPool(processes=1) async_result = pool.apply_async(item, args, kwargs) # raises a TimeoutError if execution exceeds max_timeout return async_result.get(max_timeout) return func_wrapper return timeout_decoratorтогда это так же просто, как тайм-аут теста или любой функции вам нравится:
@timeout(5.0) # if execution takes longer than 5 seconds, raise a TimeoutError def test_base_regression(self): ...
есть много предложений, но ни один из них не используется одновременно.фьючерсы, которые, я думаю, являются наиболее разборчивым способом справиться с этим.
from concurrent.futures import ProcessPoolExecutor # Warning: this does not terminate function if timeout def timeout_five(fnc, *args, **kwargs): with ProcessPoolExecutor() as p: f = p.submit(fnc, *args, **kwargs) return f.result(timeout=5)супер просто читать и поддерживать.
мы создаем пул, отправляем один процесс, а затем ждем до 5 секунд, прежде чем поднимать TimeoutError, который вы могли бы поймать и обработать, как вам нужно.
Native для python 3.2+ и backported до 2.7 (pip install futures).
переключение между потоками и процессы так же просто, как замена
ProcessPoolExecutorСThreadPoolExecutor.Если вы хотите завершить процесс по таймауту, я бы предложил заглянуть в Pebble.
The
stopitпакет, найденный на pypi, похоже, хорошо справляется с таймаутами.мне нравится
@stopit.threading_timeoutableдекоратор, который добавляетtimeoutпараметр к украшенной функции, которая делает то, что вы ожидаете, он останавливает функцию.проверьте это на pypi:https://pypi.python.org/pypi/stopit
#!/usr/bin/python2 import sys, subprocess, threading proc = subprocess.Popen(sys.argv[2:]) timer = threading.Timer(float(sys.argv[1]), proc.terminate) timer.start() proc.wait() timer.cancel() exit(proc.returncode)
у меня была необходимость nestable временные прерывания (которые SIGALARM не может сделать), которые не будут заблокированы временем.сон (что не может сделать потоковый подход). Я закончил тем, что скопировал и слегка изменил код отсюда: http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/
сам код:
#!/usr/bin/python # lightly modified version of http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/ """alarm.py: Permits multiple SIGALRM events to be queued. Uses a `heapq` to store the objects to be called when an alarm signal is raised, so that the next alarm is always at the top of the heap. """ import heapq import signal from time import time __version__ = '$Revision: 2539 $'.split()[1] alarmlist = [] __new_alarm = lambda t, f, a, k: (t + time(), f, a, k) __next_alarm = lambda: int(round(alarmlist[0][0] - time())) if alarmlist else None __set_alarm = lambda: signal.alarm(max(__next_alarm(), 1)) class TimeoutError(Exception): def __init__(self, message, id_=None): self.message = message self.id_ = id_ class Timeout: ''' id_ allows for nested timeouts. ''' def __init__(self, id_=None, seconds=1, error_message='Timeout'): self.seconds = seconds self.error_message = error_message self.id_ = id_ def handle_timeout(self): raise TimeoutError(self.error_message, self.id_) def __enter__(self): self.this_alarm = alarm(self.seconds, self.handle_timeout) def __exit__(self, type, value, traceback): try: cancel(self.this_alarm) except ValueError: pass def __clear_alarm(): """Clear an existing alarm. If the alarm signal was set to a callable other than our own, queue the previous alarm settings. """ oldsec = signal.alarm(0) oldfunc = signal.signal(signal.SIGALRM, __alarm_handler) if oldsec > 0 and oldfunc != __alarm_handler: heapq.heappush(alarmlist, (__new_alarm(oldsec, oldfunc, [], {}))) def __alarm_handler(*zargs): """Handle an alarm by calling any due heap entries and resetting the alarm. Note that multiple heap entries might get called, especially if calling an entry takes a lot of time. """ try: nextt = __next_alarm() while nextt is not None and nextt <= 0: (tm, func, args, keys) = heapq.heappop(alarmlist) func(*args, **keys) nextt = __next_alarm() finally: if alarmlist: __set_alarm() def alarm(sec, func, *args, **keys): """Set an alarm. When the alarm is raised in `sec` seconds, the handler will call `func`, passing `args` and `keys`. Return the heap entry (which is just a big tuple), so that it can be cancelled by calling `cancel()`. """ __clear_alarm() try: newalarm = __new_alarm(sec, func, args, keys) heapq.heappush(alarmlist, newalarm) return newalarm finally: __set_alarm() def cancel(alarm): """Cancel an alarm by passing the heap entry returned by `alarm()`. It is an error to try to cancel an alarm which has already occurred. """ __clear_alarm() try: alarmlist.remove(alarm) heapq.heapify(alarmlist) finally: if alarmlist: __set_alarm()и пример использования:
import alarm from time import sleep try: with alarm.Timeout(id_='a', seconds=5): try: with alarm.Timeout(id_='b', seconds=2): sleep(3) except alarm.TimeoutError as e: print 'raised', e.id_ sleep(30) except alarm.TimeoutError as e: print 'raised', e.id_ else: print 'nope.'
вот небольшое улучшение для данного решения на основе потоков.
приведенный ниже код поддерживает исключения:
def runFunctionCatchExceptions(func, *args, **kwargs): try: result = func(*args, **kwargs) except Exception, message: return ["exception", message] return ["RESULT", result] def runFunctionWithTimeout(func, args=(), kwargs={}, timeout_duration=10, default=None): import threading class InterruptableThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) self.result = default def run(self): self.result = runFunctionCatchExceptions(func, *args, **kwargs) it = InterruptableThread() it.start() it.join(timeout_duration) if it.isAlive(): return default if it.result[0] == "exception": raise it.result[1] return it.result[1]вызов его с 5-секундным таймаутом:
result = timeout(remote_calculate, (myarg,), timeout_duration=5)
мы можем использовать сигналы то же самое. Я думаю, что приведенный ниже пример будет полезен для вас. Это очень просто по сравнению с потоками.
import signal def timeout(signum, frame): raise myException #this is an infinite loop, never ending under normal circumstances def main(): print 'Starting Main ', while 1: print 'in main ', #SIGALRM is only usable on a unix platform signal.signal(signal.SIGALRM, timeout) #change 5 to however many seconds you need signal.alarm(5) try: main() except myException: print "whoops"
большой, простой в использовании и надежный PyPi проект тайм-аут-оформителя (https://pypi.org/project/timeout-decorator/)
установка:
pip install timeout-decoratorиспользование:
import time import timeout_decorator @timeout_decorator.timeout(5) def mytest(): print "Start" for i in range(1,10): time.sleep(1) print "%d seconds have passed" % i if __name__ == '__main__': mytest()
timeout-decoratorне работают на windows system as, windows не поддерживаетsignalхорошо.если вы используете timeout-decorator в системе windows, вы получите следующее
AttributeError: module 'signal' has no attribute 'SIGALRM'некоторые предложили использовать
use_signals=False, но не работал для меня.автор @bitranox создал следующий пакет:
pip install https://github.com/bitranox/wrapt-timeout-decorator/archive/master.zipКод:
import time from wrapt_timeout_decorator import * @timeout(5) def mytest(message): print(message) for i in range(1,10): time.sleep(1) print('{} seconds have passed'.format(i)) def main(): mytest('starting') if __name__ == '__main__': main()дает следующее исключение:
TimeoutError: Function mytest timed out after 5 seconds
Comments