Каков наилучший способ многократного выполнения функции каждые x секунд в Python?
Я хочу повторно выполнять функцию в Python каждые 60 секунд навсегда (так же, как NSTimer В задаче с). Этот код будет работать как демон и фактически похож на вызов скрипта python каждую минуту с помощью cron, но без необходимости его настройки пользователем.
в этом вопросе о cron, реализованном в Python, решение, по-видимому, эффективно просто sleep () в течение x секунд. Мне не нужна такая расширенная функциональность, так возможно, что-то вроде этого будет работать
while True:
# Code executed here
time.sleep(60)
есть ли какой-либо проблемы с этим кодом?
15 ответов:
использовать sched модуль, который реализует планировщик событий общего назначения.
import sched, time s = sched.scheduler(time.time, time.sleep) def do_something(sc): print "Doing stuff..." # do your stuff s.enter(60, 1, do_something, (sc,)) s.enter(60, 1, do_something, (s,)) s.run()
просто заблокируйте свой цикл времени на системные часы. Простой.
import time starttime=time.time() while True: print "tick" time.sleep(60.0 - ((time.time() - starttime) % 60.0))
вы можете рассмотреть витая который является сетевой библиотекой python, которая реализует Схема Реактора.
from twisted.internet import task from twisted.internet import reactor timeout = 60.0 # Sixty seconds def doWork(): #do work here pass l = task.LoopingCall(doWork) l.start(timeout) # call every sixty seconds reactor.run()в то время как "while True: sleep(60)", вероятно, будет работать Twisted, вероятно, уже реализует многие функции, которые вам в конечном итоге понадобятся (демонизация, ведение журнала или обработка исключений, как указано bobince), и, вероятно, будет более надежным решением
если вы хотите, чтобы неблокирующий способ периодически выполнял вашу функцию, вместо блокирующего бесконечного цикла я бы использовал резьбовой таймер. Таким образом, ваш код может продолжать работать и выполнять другие задачи и по-прежнему вызывать вашу функцию каждые n секунд. Я использую эту технику много для печати информации о прогрессе на длинных, CPU / Disk / Network intensive задачах.
вот код, который я опубликовал в аналогичном вопросе, с start() и stop() управление:
from threading import Timer class RepeatedTimer(object): def __init__(self, interval, function, *args, **kwargs): self._timer = None self.interval = interval self.function = function self.args = args self.kwargs = kwargs self.is_running = False self.start() def _run(self): self.is_running = False self.start() self.function(*self.args, **self.kwargs) def start(self): if not self.is_running: self._timer = Timer(self.interval, self._run) self._timer.start() self.is_running = True def stop(self): self._timer.cancel() self.is_running = Falseиспользование:
from time import sleep def hello(name): print "Hello %s!" % name print "starting..." rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start() try: sleep(5) # your long-running job goes here... finally: rt.stop() # better in a try/finally block to make sure the program ends!характеристики:
- только стандартная библиотека, никаких внешних зависимостей
start()иstop()безопасно вызвать несколько раз, даже если таймер уже запущен/остановлен- вызываемая функция может иметь позиционные и именованные аргументы
- вы можете изменить
intervalв любое время, это будет эффективно после следующего запуска. То же самое дляargs,kwargsи дажеfunction!
более простой способ, мне кажется:
import time def executeSomething(): #code here time.sleep(60) while True: executeSomething()таким образом, ваш код выполняется, затем он ждет 60 секунд, затем он снова выполняется, ждет, выполняет и т. д... Не нужно усложнять вещи: D
вот обновление кода от MestreLion, которое позволяет избежать дрейфа с течением времени:
import threading import time class RepeatedTimer(object): def __init__(self, interval, function, *args, **kwargs): self._timer = None self.interval = interval self.function = function self.args = args self.kwargs = kwargs self.is_running = False self.next_call = time.time() self.start() def _run(self): self.is_running = False self.start() self.function(*self.args, **self.kwargs) def start(self): if not self.is_running: self.next_call += self.interval self._timer = threading.Timer(self.next_call - time.time(), self._run) self._timer.start() self.is_running = True def stop(self): self._timer.cancel() self.is_running = False
import time, traceback def every(delay, task): next_time = time.time() + delay while True: time.sleep(max(0, next_time - time.time())) try: task() except Exception: traceback.print_exc() # in production code you might want to have this instead of course: # logger.exception("Problem while executing repetitive task.") # skip tasks if we are behind schedule: next_time += (time.time() - next_time) // delay * delay + delay def foo(): print("foo", time.time()) every(5, foo)если вы хотите сделать это без блокировки оставшегося кода, Вы можете использовать это, чтобы позволить ему работать в своем собственном потоке:
import threading threading.Thread(target=lambda: every(5, foo)).start()это решение сочетает в себе несколько функций, которые редко встречаются в других решениях:
- обработка исключений: насколько это возможно на этом уровне, исключения обрабатываются правильно, т. е. регистрируются для целей отладки без прерывания нашей программы.
- нет цепочки: Общая цепочечная реализация (для планирования следующего события), которую вы найдете во многих ответах, хрупка в том аспекте, что если что-то пойдет не так в механизме планирования (
threading.Timerили что-то еще), это завершит цепочку. Казней больше не будет, даже если эта проблема уже исправлена. Простой цикл и ожидание с простымsleep()является гораздо более надежным в сравнении.- Не дрейф: мое решение держит точный след время, когда он должен работать. Нет никакого дрейфа в зависимости от времени выполнения (как и во многих других решениях).
- пропуск: мое решение будет пропускать задачи, если одно выполнение заняло слишком много времени (например, делать X каждые пять секунд, но X занял 6 секунд). Это стандартное поведение cron (и по уважительной причине). Многие другие решения тогда просто выполнить задачу несколько раз подряд без каких-либо задержек. В большинстве случаев (например, задачи очистки) это нежелательно. Если это и пожелал, просто использовать .
я столкнулся с аналогичной проблемой некоторое время назад. Может быть http://cronus.readthedocs.org может помочь?
для v0. 2, следующий фрагмент работает
import cronus.beat as beat beat.set_rate(2) # 2 Hz while beat.true(): # do some time consuming work here beat.sleep() # total loop duration would be 0.5 sec
основное различие между этим и cron заключается в том, что исключение убьет демона навсегда. Возможно, вы захотите обернуть с помощью ловца исключений и регистратора.
Я использую это, чтобы вызвать 60 событий в час с большинством событий, происходящих в то же количество секунд после целой минуты:
import math import time import random TICK = 60 # one minute tick size TICK_TIMING = 59 # execute on 59th second of the tick TICK_MINIMUM = 30 # minimum catch up tick size when lagging def set_timing(): now = time.time() elapsed = now - info['begin'] minutes = math.floor(elapsed/TICK) tick_elapsed = now - info['completion_time'] if (info['tick']+1) > minutes: wait = max(0,(TICK_TIMING-(time.time() % TICK))) print ('standard wait: %.2f' % wait) time.sleep(wait) elif tick_elapsed < TICK_MINIMUM: wait = TICK_MINIMUM-tick_elapsed print ('minimum wait: %.2f' % wait) time.sleep(wait) else: print ('skip set_timing(); no wait') drift = ((time.time() - info['begin']) - info['tick']*TICK - TICK_TIMING + info['begin']%TICK) print ('drift: %.6f' % drift) info['tick'] = 0 info['begin'] = time.time() info['completion_time'] = info['begin'] - TICK while 1: set_timing() print('hello world') #random real world event time.sleep(random.random()*TICK_MINIMUM) info['tick'] += 1 info['completion_time'] = time.time()в зависимости от реальных условий вы можете получить тики длины:
60,60,62,58,60,60,120,30,30,60,60,60,60,60...etc.но в конце 60 минут у вас будет 60 ТИКов; и большинство из них будет происходить при правильном смещении в ту минуту, которую вы предпочитаете.
в моей системе я получаю типичный дрейф
преимуществом этого метода является разрешение дрейфа часов; что может вызвать проблемы, если вы делаете такие вещи, как добавление одного элемента на ТИК, и вы ожидаете, что 60 элементов добавляются в час. Отсутствие учета дрейфа может привести к тому, что вторичные индикаторы, такие как скользящие средние, будут рассматривать данные слишком глубоко в прошлом, что приведет к ошибочному выходу.
один возможный ответ:
import time t=time.time() while True: if time.time()-t>10: #run your task here t=time.time()
например, отображать текущее местное время
import datetime import glib import logger def get_local_time(): current_time = datetime.datetime.now().strftime("%H:%M") logger.info("get_local_time(): %s",current_time) return str(current_time) def display_local_time(): logger.info("Current time is: %s", get_local_time()) return True # call every minute glib.timeout_add(60*1000, display_local_time)
Я использую метод Tkinter after (), который не "крадет игру" (например, sched модуль, который был представлен ранее), т. е. он позволяет другим вещам работать параллельно:
import Tkinter def do_something1(): global n1 n1 += 1 if n1 == 6: # (Optional condition) print "* do_something1() is done *"; return # Do your stuff here # ... print "do_something1() "+str(n1) tk.after(1000, do_something1) def do_something2(): global n2 n2 += 1 if n2 == 6: # (Optional condition) print "* do_something2() is done *"; return # Do your stuff here # ... print "do_something2() "+str(n2) tk.after(500, do_something2) tk = Tkinter.Tk(); n1 = 0; n2 = 0 do_something1() do_something2() tk.mainloop()
do_something1()иdo_something2()может работать параллельно и в любом интервале скорости. Здесь второй будет выполнен в два раза быстрее.Обратите внимание также, что я использовал простой счетчик в качестве условия для завершения любой функции. Вы можете использовать любой другой contition вам нравится или нет, если вы что a функция для запуска до завершения работы программы (например, часы).
''' tracking number of times it prints''' import threading global timeInterval count=0 def printit(): threading.Timer(timeInterval, printit).start() print( "Hello, World!") global count count=count+1 print(count) printit if __name__ == "__main__": timeInterval= int(input('Enter Time in Seconds:')) printit()
вот адаптированная версия кода от MestreLion. В дополнение к исходной функции, этот код:
1) добавьте first_interval, используемый для запуска таймера в определенное время (вызывающий должен вычислить first_interval и передать)
2) решите условие гонки в исходном коде. В исходном коде, если поток управления не смог отменить запущенный таймер ("остановите таймер и отмените выполнение действия таймера. Это будет работать только если таймер все еще находится в его этап ожидания."цитата из https://docs.python.org/2/library/threading.html), таймер будет работать бесконечно.
class RepeatedTimer(object): def __init__(self, first_interval, interval, func, *args, **kwargs): self.timer = None self.first_interval = first_interval self.interval = interval self.func = func self.args = args self.kwargs = kwargs self.running = False self.is_started = False def first_start(self): try: # no race-condition here because only control thread will call this method # if already started will not start again if not self.is_started: self.is_started = True self.timer = Timer(self.first_interval, self.run) self.running = True self.timer.start() except Exception as e: log_print(syslog.LOG_ERR, "timer first_start failed %s %s"%(e.message, traceback.format_exc())) raise def run(self): # if not stopped start again if self.running: self.timer = Timer(self.interval, self.run) self.timer.start() self.func(*self.args, **self.kwargs) def stop(self): # cancel current timer in case failed it's still OK # if already stopped doesn't matter to stop again if self.timer: self.timer.cancel() self.running = False
Comments