Запуск определенного кода каждые n секунд [дубликат]
этот вопрос уже есть ответ здесь:
есть ли способ, например, распечатать Hello World! каждые n секунд?
Например, программа будет проходить через любой код, который у меня был, а затем, как только это было 5 секунд (с time.sleep()) оно будет выполнять этот код. Я бы использовал это для обновления файла, а не для печати Hello World.
например:
startrepeat("print('Hello World')", .01) # Repeats print('Hello World') ever .01 seconds
for i in range(5):
print(i)
>> Hello World!
>> 0
>> 1
>> 2
>> Hello World!
>> 3
>> Hello World!
>> 4
7 ответов:
import threading def printit(): threading.Timer(5.0, printit).start() print "Hello, World!" printit() # continue with the rest of your code
мой скромный взгляд на эту тему, обобщение ответа Алекса Мартелли, с управлением start () и stop ():
from threading import Timer class RepeatedTimer(object): def __init__(self, interval, function, *args, **kwargs): self._timer = None self.interval = interval self.function = function self.args = args self.kwargs = kwargs self.is_running = False self.start() def _run(self): self.is_running = False self.start() self.function(*self.args, **self.kwargs) def start(self): if not self.is_running: self._timer = Timer(self.interval, self._run) self._timer.start() self.is_running = True def stop(self): self._timer.cancel() self.is_running = Falseиспользование:
from time import sleep def hello(name): print "Hello %s!" % name print "starting..." rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start() try: sleep(5) # your long-running job goes here... finally: rt.stop() # better in a try/finally block to make sure the program ends!характеристики:
- только стандартная библиотека, никаких внешних зависимостей
start()иstop()безопасно вызвать несколько раз, даже если таймер уже запущен/остановлен- вызываемая функция может иметь позиционные и именованные аргументы
- вы можете изменить
intervalв любое время, это будет эффективно после следующего запуска. То же самое дляargs,kwargsи дажеfunction!
сохраните себе шизофренический эпизод и используйте расширенный планировщик Python: http://pythonhosted.org/APScheduler
код такой простой:
from apscheduler.scheduler import Scheduler sched = Scheduler() sched.start() def some_job(): print "Every 10 seconds" sched.add_interval_job(some_job, seconds = 10) .... sched.shutdown()
def update(): import time while True: print 'Hello World!' time.sleep(5)это будет работать как функция. Элемент
while True:заставляет его работать вечно. Вы всегда можете взять его из функции, если вам нужно.
вот простой пример, совместимый с APScheduler 3.00+:
# note that there are many other schedulers available from apscheduler.schedulers.background import BackgroundScheduler sched = BackgroundScheduler() def some_job(): print('Every 10 seconds') # seconds can be replaced with minutes, hours, or days sched.add_job(some_job, 'interval', seconds=10) sched.start() ... sched.shutdown()кроме того, вы можете использовать следующее. В отличие от многих альтернатив, этот таймер будет выполнять нужный код каждый n секунд точно (независимо от времени, которое требуется для выполнения кода). Так что это отличный вариант, если вы не можете позволить себе дрейфовать.
import time from threading import Event, Thread class RepeatedTimer: """Repeat `function` every `interval` seconds.""" def __init__(self, interval, function, *args, **kwargs): self.interval = interval self.function = function self.args = args self.kwargs = kwargs self.start = time.time() self.event = Event() self.thread = Thread(target=self._target) self.thread.start() def _target(self): while not self.event.wait(self._time): self.function(*self.args, **self.kwargs) @property def _time(self): return self.interval - ((time.time() - self.start) % self.interval) def stop(self): self.event.set() self.thread.join() # start timer timer = RepeatedTimer(10, print, 'Hello world') # stop timer timer.stop()
вот версия, которая не создает новый поток каждый
nсекунд:from threading import Event, Thread def call_repeatedly(interval, func, *args): stopped = Event() def loop(): while not stopped.wait(interval): # the first call is in `interval` secs func(*args) Thread(target=loop).start() return stopped.setсобытие используется для остановки повторений:
cancel_future_calls = call_repeatedly(5, print, "Hello, World") # do something else here... cancel_future_calls() # stop future calls
вы можете запустить отдельный поток, единственной обязанностью которого является подсчет в течение 5 секунд, обновить файл, повторить. Вы не хотели бы, чтобы этот отдельный поток вмешивался в ваш основной поток.
Comments