How to limit execution time of a function call? [duplicate]
Как ограничить время выполнения вызова функции?
В моем коде есть вызов функции, связанной с сокетом, эта функция из другого модуля, таким образом, вне моего контроля, проблема в том, что иногда она блокируется на несколько часов, что совершенно неприемлемо, как я могу ограничить время выполнения функции в моем коде? Я предполагаю, что решение должно использовать другой поток.
Переведено автоматически
Ответ 1
Улучшением ответа @rik.the.vik было бы использование with оператора для предоставления функции timeout некоторого синтаксического сахара:
import signal from contextlib import contextmanager
try: with time_limit(10): long_function_call() except TimeoutException as e: print("Timed out!")
Ответ 2
Я не уверен, насколько кроссплатформенным это может быть, но использование signals и alarm может быть хорошим способом взглянуть на это. Немного поработав, вы могли бы сделать это полностью универсальным и пригодным для использования в любой ситуации.
Вот способ ограничить время выполнения функции в Linux / OSX. Это на случай, если вы не хотите использовать потоки и хотите, чтобы ваша программа ждала завершения функции или истечения срока действия.
from multiprocessing import Process from time import sleep
deff(time): sleep(time)
defrun_with_limited_time(func, args, kwargs, time): """Runs a function with time limit
:param func: The function to run :param args: The functions args, given as tuple :param kwargs: The functions keywords, given as dict :param time: The time limit in seconds :return: True if the function ended successfully. False if it was terminated. """ p = Process(target=func, args=args, kwargs=kwargs) p.start() p.join(time) if p.is_alive(): p.terminate() returnFalse
Я предпочитаю подход context manager, потому что он позволяет выполнять несколько операторов python в with time_limit инструкции. Поскольку в системе Windows нет SIGALARM, более переносимым и, возможно, более простым методом могло бы быть использование Timer
from contextlib import contextmanager import threading import _thread
@contextmanager deftime_limit(seconds, msg=''): timer = threading.Timer(seconds, lambda: _thread.interrupt_main()) timer.start() try: yield except KeyboardInterrupt: raise TimeoutException("Timed out for operation {}".format(msg)) finally: # if the action ends in specified time, timer is canceled timer.cancel()
import time # ends after 5 seconds with time_limit(5, 'sleep'): for i inrange(10): time.sleep(1)
# this will actually end after 10 seconds with time_limit(5, 'sleep'): time.sleep(10)
Ключевым приемом здесь является использование _thread.interrupt_main для прерывания основного потока из потока таймера. Одно предостережение заключается в том, что основной поток не всегда быстро реагирует на запрос, KeyboardInterrupt вызванный Timer. Например, time.sleep() вызывает системную функцию, поэтому a KeyboardInterrupt будет обработан после sleep вызова.