Запуск определенного кода каждые n секунд [дубликат]



этот вопрос уже есть ответ здесь:



есть ли способ, например, распечатать Hello World! каждые n секунд?
Например, программа будет проходить через любой код, который у меня был, а затем, как только это было 5 секунд (с time.sleep()) оно будет выполнять этот код. Я бы использовал это для обновления файла, а не для печати Hello World.



например:



startrepeat("print('Hello World')", .01) # Repeats print('Hello World') ever .01 seconds

for i in range(5):
print(i)

>> Hello World!
>> 0
>> 1
>> 2
>> Hello World!
>> 3
>> Hello World!
>> 4
656   7  

7 ответов:

import threading

def printit():
  threading.Timer(5.0, printit).start()
  print "Hello, World!"

printit()

# continue with the rest of your code

мой скромный взгляд на эту тему, обобщение ответа Алекса Мартелли, с управлением start () и stop ():

from threading import Timer

class RepeatedTimer(object):
    def __init__(self, interval, function, *args, **kwargs):
        self._timer     = None
        self.interval   = interval
        self.function   = function
        self.args       = args
        self.kwargs     = kwargs
        self.is_running = False
        self.start()

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        if not self.is_running:
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self.is_running = True

    def stop(self):
        self._timer.cancel()
        self.is_running = False

использование:

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!

характеристики:

  • только стандартная библиотека, никаких внешних зависимостей
  • start() и stop() безопасно вызвать несколько раз, даже если таймер уже запущен/остановлен
  • вызываемая функция может иметь позиционные и именованные аргументы
  • вы можете изменить interval в любое время, это будет эффективно после следующего запуска. То же самое для args,kwargs и даже function!

сохраните себе шизофренический эпизод и используйте расширенный планировщик Python: http://pythonhosted.org/APScheduler

код такой простой:

from apscheduler.scheduler import Scheduler

sched = Scheduler()
sched.start()

def some_job():
    print "Every 10 seconds"

sched.add_interval_job(some_job, seconds = 10)

....
sched.shutdown()
def update():
    import time
    while True:
        print 'Hello World!'
        time.sleep(5)

это будет работать как функция. Элемент while True: заставляет его работать вечно. Вы всегда можете взять его из функции, если вам нужно.

вот простой пример, совместимый с APScheduler 3.00+:

# note that there are many other schedulers available
from apscheduler.schedulers.background import BackgroundScheduler

sched = BackgroundScheduler()

def some_job():
    print('Every 10 seconds')

# seconds can be replaced with minutes, hours, or days
sched.add_job(some_job, 'interval', seconds=10)
sched.start()

...

sched.shutdown()

кроме того, вы можете использовать следующее. В отличие от многих альтернатив, этот таймер будет выполнять нужный код каждый n секунд точно (независимо от времени, которое требуется для выполнения кода). Так что это отличный вариант, если вы не можете позволить себе дрейфовать.

import time
from threading import Event, Thread

class RepeatedTimer:

    """Repeat `function` every `interval` seconds."""

    def __init__(self, interval, function, *args, **kwargs):
        self.interval = interval
        self.function = function
        self.args = args
        self.kwargs = kwargs
        self.start = time.time()
        self.event = Event()
        self.thread = Thread(target=self._target)
        self.thread.start()

    def _target(self):
        while not self.event.wait(self._time):
            self.function(*self.args, **self.kwargs)

    @property
    def _time(self):
        return self.interval - ((time.time() - self.start) % self.interval)

    def stop(self):
        self.event.set()
        self.thread.join()


# start timer
timer = RepeatedTimer(10, print, 'Hello world')

# stop timer
timer.stop()

вот версия, которая не создает новый поток каждый n секунд:

from threading import Event, Thread

def call_repeatedly(interval, func, *args):
    stopped = Event()
    def loop():
        while not stopped.wait(interval): # the first call is in `interval` secs
            func(*args)
    Thread(target=loop).start()    
    return stopped.set

событие используется для остановки повторений:

cancel_future_calls = call_repeatedly(5, print, "Hello, World")
# do something else here...
cancel_future_calls() # stop future calls

посмотреть улучшить текущую реализацию setInterval python

вы можете запустить отдельный поток, единственной обязанностью которого является подсчет в течение 5 секунд, обновить файл, повторить. Вы не хотели бы, чтобы этот отдельный поток вмешивался в ваш основной поток.

Comments

    Ничего не найдено.