Как ограничить время выполнения вызова функции в Python



в моем коде есть вызов функции, связанной с сокетом, эта функция из другого модуля, таким образом, вне моего контроля, проблема в том, что она блокируется в течение нескольких часов, что совершенно неприемлемо, как я могу ограничить время выполнения функции из моего кода? Я думаю, что решение должно использовать другой поток.

1108   9  

9 ответов:

Я не уверен, насколько кросс-платформенным это может быть, но использование сигналов и тревоги может быть хорошим способом взглянуть на это. С небольшой работой вы могли бы сделать это полностью общим, а также и использовать в любой ситуации.

http://docs.python.org/library/signal.html

Так что ваш код будет выглядеть примерно так.

import signal

def signal_handler(signum, frame):
    raise Exception("Timed out!")

signal.signal(signal.SIGALRM, signal_handler)
signal.alarm(10)   # Ten seconds
try:
    long_function_call()
except Exception, msg:
    print "Timed out!"

улучшение на @rik.этот.ответ Вика был бы использовать with сообщении чтобы дать функции тайм-аута некоторый синтаксический сахар:

import signal
from contextlib import contextmanager

class TimeoutException(Exception): pass

@contextmanager
def time_limit(seconds):
    def signal_handler(signum, frame):
        raise TimeoutException("Timed out!")
    signal.signal(signal.SIGALRM, signal_handler)
    signal.alarm(seconds)
    try:
        yield
    finally:
        signal.alarm(0)


try:
    with time_limit(10):
        long_function_call()
except TimeoutException as e:
    print("Timed out!")

вот Linux / OSX способ ограничить время выполнения функции. Это в случае, если вы не хотите использовать потоки и хотите, чтобы ваша программа ждала, пока функция не закончится или не истечет срок действия.

from multiprocessing import Process
from time import sleep

def f(time):
    sleep(time)


def run_with_limited_time(func, args, kwargs, time):
    """Runs a function with time limit

    :param func: The function to run
    :param args: The functions args, given as tuple
    :param kwargs: The functions keywords, given as dict
    :param time: The time limit in seconds
    :return: True if the function ended successfully. False if it was terminated.
    """
    p = Process(target=func, args=args, kwargs=kwargs)
    p.start()
    p.join(time)
    if p.is_alive():
        p.terminate()
        return False

    return True


if __name__ == '__main__':
    print run_with_limited_time(f, (1.5, ), {}, 2.5) # True
    print run_with_limited_time(f, (3.5, ), {}, 2.5) # False

Я предпочитаю подход контекстного менеджера, потому что он позволяет выполнять несколько операторов python в пределах with time_limit заявление. Потому что система windows не имеет SIGALARM, более портативный и, возможно, более простой метод может использовать Timer

from contextlib import contextmanager
import threading
import _thread

class TimeoutException(Exception):
    def __init__(self, msg=''):
        self.msg = msg

@contextmanager
def time_limit(seconds, msg=''):
    timer = threading.Timer(seconds, lambda: _thread.interrupt_main())
    timer.start()
    try:
        yield
    except KeyboardInterrupt:
        raise TimeoutException("Timed out for operation {}".format(msg))
    finally:
        # if the action ends in specified time, timer is canceled
        timer.cancel()

import time
# ends after 5 seconds
with time_limit(5, 'sleep'):
    for i in range(10):
        time.sleep(1)

# this will actually end after 10 seconds
with time_limit(5, 'sleep'):
    time.sleep(10)

ключевой метод здесь является использование _thread.interrupt_main чтобы прервать основной поток из потока таймера. Один нюанс заключается в том, что основной поток не всегда отвечает на KeyboardInterrupt поднял Timer быстро. Например, time.sleep() вызывает системную функцию so a KeyboardInterrupt будет обрабатываться после sleep звонок.

делать это из обработчика сигналов опасно: вы можете быть внутри обработчика исключений в момент возникновения исключения и оставить вещи в сломанном состоянии. Например,

def function_with_enforced_timeout():
  f = open_temporary_file()
  try:
   ...
  finally:
   here()
   unlink(f.filename)

Если ваш исключение здесь(), временный файл никогда не будет удален.

решение здесь заключается в том, что асинхронные исключения должны быть отложены до тех пор, пока код не будет находиться внутри кода обработки исключений (блок except или finally), но Python этого не делает что.

обратите внимание, что это ничего не прерывает при выполнении собственного кода; он будет прерывать его только при возврате функции, поэтому это может не помочь в этом конкретном случае. (Сам SIGALRM может прервать вызов, который блокирует,но код сокета обычно просто повторяет попытку после EINTR.)

делать это с потоками-лучшая идея, так как она более портативна, чем сигналы. Поскольку вы запускаете рабочий поток и блокируете его до тех пор, пока он не закончится, нет ни одного из обычный параллелизм беспокоит. К сожалению, нет способа асинхронно доставить исключение в другой поток в Python (другие API потоков могут это сделать). Он также будет иметь ту же проблему с отправкой исключения во время обработчика исключений и потребует того же исправления.

вы не должны использовать потоки. Вы можете использовать другой процесс для выполнения блокирующей работы, например, возможно, используя подпроцесс модуль. Если вы хотите разделить структуры данных между различными частями вашей программы, то витая это отличная библиотека для того, чтобы дать себе контроль над этим, и я бы рекомендовал ее, если вы заботитесь о блокировке и ожидаете, что у вас будет много проблем. Плохая новость с Twisted заключается в том, что вам нужно переписать свой код, чтобы избежать блокировки, и существует Справедливая кривая обучения.

вы можете используйте потоки, чтобы избежать блокировки, но я бы расценил это как последнее средство, так как оно подвергает вас целому миру боли. Прочитайте хорошую книгу о параллелизме, прежде чем даже думать об использовании потоков в производстве, например, "параллельные системы"Джина Бэкона. Я работаю с кучей людей, которые делают действительно классные высокопроизводительные вещи с потоками, и мы не вводим потоки в проекты, если они нам действительно не нужны.

единственный "безопасный" способ сделать это, на любом языке, заключается в использовании вторичного процесса для выполнения этого тайм-аута, иначе вам нужно построить свой код таким образом, чтобы он безопасно тайм-аут сам по себе, например, проверяя время, прошедшее в цикле или аналогичном. Если изменение метода не является опцией, потока будет недостаточно.

Почему? Потому что вы рискуете оставить вещи в плохом состоянии, когда вы делаете. Если поток просто убит в середине метода, блокировки удерживаются и т. д. будет просто держитесь и не можете освободиться.

Итак, посмотрите на процесс, сделайте не посмотрите на путь потока.

вот функция тайм-аута, которую я нашел через google, и она работает для меня.

From: http://code.activestate.com/recipes/473878/

def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
    '''This function will spwan a thread and run the given function using the args, kwargs and 
    return the given default value if the timeout_duration is exceeded 
    ''' 
    import threading
    class InterruptableThread(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
            self.result = default
        def run(self):
            try:
                self.result = func(*args, **kwargs)
            except:
                self.result = default
    it = InterruptableThread()
    it.start()
    it.join(timeout_duration)
    if it.isAlive():
        return it.result
    else:
        return it.result   

Я обычно предпочитаю использовать contextmanager, как это предлагает @josh-lee

но если кто-то заинтересован в том, чтобы это было реализовано в качестве декоратора, вот альтернатива.

вот как это будет выглядеть:

import time
from timeout import timeout

class Test(object):
    @timeout(2)
    def test_a(self, foo, bar):
        print foo
        time.sleep(1)
        print bar
        return 'A Done'

    @timeout(2)
    def test_b(self, foo, bar):
        print foo
        time.sleep(3)
        print bar
        return 'B Done'

t = Test()
print t.test_a('python', 'rocks')
print t.test_b('timing', 'out')

и это timeout.py модуль:

import threading

class TimeoutError(Exception):
    pass

class InterruptableThread(threading.Thread):
    def __init__(self, func, *args, **kwargs):
        threading.Thread.__init__(self)
        self._func = func
        self._args = args
        self._kwargs = kwargs
        self._result = None

    def run(self):
        self._result = self._func(*self._args, **self._kwargs)

    @property
    def result(self):
        return self._result


class timeout(object):
    def __init__(self, sec):
        self._sec = sec

    def __call__(self, f):
        def wrapped_f(*args, **kwargs):
            it = InterruptableThread(f, *args, **kwargs)
            it.start()
            it.join(self._sec)
            if not it.is_alive():
                return it.result
            raise TimeoutError('execution expired')
        return wrapped_f

вывод:

python
rocks
A Done
timing
Traceback (most recent call last):
  ...
timeout.TimeoutError: execution expired
out

обратите внимание, что даже если TimeoutError бросается, украшенный метод будет продолжать работать в другом потоке. Если бы вы могли также хотите, чтобы этот поток был "остановлен" см.:есть ли способ убить поток в Python?

Comments

    Ничего не найдено.