как получить возвращаемое значение из потока в Python?
как получить значение 'foo', который возвращается из потока?
from threading import Thread
def foo(bar):
print 'hello {}'.format(bar)
return 'foo'
thread = Thread(target=foo, args=('world!',))
thread.start()
ret = thread.join()
print ret
один очевидный способ сделать это, как показано выше, возвращает None.
20 ответов:
FWIW, the
multiprocessingмодуль имеет хороший интерфейс для этого с помощьюPoolкласса. И если вы хотите придерживаться темы, а не процессов, вы можете просто использоватьmultiprocessing.pool.ThreadPoolкласс как падени-в замене.def foo(bar, baz): print 'hello {0}'.format(bar) return 'foo' + baz from multiprocessing.pool import ThreadPool pool = ThreadPool(processes=1) async_result = pool.apply_async(foo, ('world', 'foo')) # tuple of args for foo # do some other stuff in the main process return_val = async_result.get() # get the return value from your function.
один из способов, который я видел, - передать изменяемый объект, такой как список или словарь, конструктору потока вместе с индексом или другим идентификатором какого-либо рода. Затем поток может хранить свои результаты в своем выделенном слоте в этом объекте. Например:
def foo(bar, result, index): print 'hello {0}'.format(bar) result[index] = "foo" from threading import Thread threads = [None] * 10 results = [None] * 10 for i in range(len(threads)): threads[i] = Thread(target=foo, args=('world!', results, i)) threads[i].start() # do some other stuff for i in range(len(threads)): threads[i].join() print " ".join(results) # what sound does a metasyntactic locomotive make?если вы действительно хотите
join()вернуть возвращаемое значение вызываемой функции, вы можете сделать это с помощьюThreadподкласс следующим образом:from threading import Thread def foo(bar): print 'hello {0}'.format(bar) return "foo" class ThreadWithReturnValue(Thread): def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, Verbose=None): Thread.__init__(self, group, target, name, args, kwargs, Verbose) self._return = None def run(self): if self._Thread__target is not None: self._return = self._Thread__target(*self._Thread__args, **self._Thread__kwargs) def join(self): Thread.join(self) return self._return twrv = ThreadWithReturnValue(target=foo, args=('world!',)) twrv.start() print twrv.join() # prints fooэто становится немного волосатым из-за некоторых имя искажения, и он обращается к" частным " структурам данных, которые являются специфичными для
Threadреализация... но это работает.для python3
class ThreadWithReturnValue(Thread): def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, Verbose=None): Thread.__init__(self, group, target, name, args, kwargs) self._return = None def run(self): print(type(self._target)) if self._target is not None: self._return = self._target(*self._args, **self._kwargs) def join(self, *args): Thread.join(self, *args) return self._return
ответ Джейка хорош, но если вы не хотите использовать threadpool (вы не знаете, сколько потоков вам понадобится, но создаете их по мере необходимости), то хорошим способом передачи информации между потоками является встроенный очереди.Очередь класс, так как он обеспечивает потокобезопасность.
я создал следующий декоратор, чтобы заставить его действовать аналогично threadpool:
def threaded(f, daemon=False): import Queue def wrapped_f(q, *args, **kwargs): '''this function calls the decorated function and puts the result in a queue''' ret = f(*args, **kwargs) q.put(ret) def wrap(*args, **kwargs): '''this is the function returned from the decorator. It fires off wrapped_f in a new thread and returns the thread object with the result queue attached''' q = Queue.Queue() t = threading.Thread(target=wrapped_f, args=(q,)+args, kwargs=kwargs) t.daemon = daemon t.start() t.result_queue = q return t return wrapтогда вы просто используете его как:
@threaded def long_task(x): import time x = x + 5 time.sleep(5) return x # does not block, returns Thread object y = long_task(10) print y # this blocks, waiting for the result result = y.result_queue.get() print resultдекорированные функция создает новый поток каждый раз, когда он вызывается и возвращает объект потока, который содержит очередь, которая получит результат.
обновление
прошло довольно много времени с тех пор, как я опубликовал этот ответ, но он все еще получает представления, поэтому я подумал, что обновлю его, чтобы отразить, как я это делаю в более новых версиях Python:
Python 3.2 добавлен в
concurrent.futuresмодуль, который обеспечивает интерфейс высокого уровня для параллельных задач. Он обеспечиваетThreadPoolExecutorиProcessPoolExecutor, так что вы можете использовать поток или пул процессов с тем же api.одним из преимуществ этого api является то, что отправка задачи в
Executorвозвращает aFutureобъект, который будет дополнен возвращаемым значением вызываемого объекта, который вы отправляете.это делает присоединение
queueобъект ненужный, что упрощает декоратор совсем немного:_DEFAULT_POOL = ThreadPoolExecutor() def threadpool(f, executor=None): @wraps(f) def wrap(*args, **kwargs): return (executor or _DEFAULT_POOL).submit(f, *args, **kwargs) return wrapэто будет использовать значение по умолчанию модуль ThreadPool executor, если он не передается.
использование очень похоже на предыдущее:
@threadpool def long_task(x): import time x = x + 5 time.sleep(5) return x # does not block, returns Future object y = long_task(10) print y # this blocks, waiting for the result result = y.result() print resultесли вы используете Python 3.4+, одна действительно хорошая особенность использования этого метода (и будущих объектов в целом) заключается в том, что возвращаемое будущее может быть обернуто, чтобы превратить его в
asyncio.FutureСasyncio.wrap_future. Это позволяет легко работать с сопрограммами:result = await asyncio.wrap_future(long_task(10))Если вам не нужен доступ к базовому
другое решение, которое не требует изменения существующего кода:
import Queue from threading import Thread def foo(bar): print 'hello {0}'.format(bar) return 'foo' que = Queue.Queue() t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!')) t.start() t.join() result = que.get() print resultего можно также легко отрегулировать к многопоточной окружающей среде:
import Queue from threading import Thread def foo(bar): print 'hello {0}'.format(bar) return 'foo' que = Queue.Queue() threads_list = list() t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!')) t.start() threads_list.append(t) # Add more threads here ... threads_list.append(t2) ... threads_list.append(t3) ... # Join all the threads for t in threads_list: t.join() # Check thread's return value while not que.empty(): result = que.get() print result
Паррис / kindall это!--5-->ответ
join/returnответ портирован на Python 3:from threading import Thread def foo(bar): print('hello {0}'.format(bar)) return "foo" class ThreadWithReturnValue(Thread): def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None): Thread.__init__(self, group, target, name, args, kwargs, daemon=daemon) self._return = None def run(self): if self._target is not None: self._return = self._target(*self._args, **self._kwargs) def join(self): Thread.join(self) return self._return twrv = ThreadWithReturnValue(target=foo, args=('world!',)) twrv.start() print(twrv.join()) # prints fooвнимание
Threadкласс реализован по-разному в Python 3.
я украл ответ kindall и очистил его немного.
ключевой частью является добавление *args и * * kwargs для соединения () для обработки таймаута
class threadWithReturn(Thread): def __init__(self, *args, **kwargs): super(threadWithReturn, self).__init__(*args, **kwargs) self._return = None def run(self): if self._Thread__target is not None: self._return = self._Thread__target(*self._Thread__args, **self._Thread__kwargs) def join(self, *args, **kwargs): super(threadWithReturn, self).join(*args, **kwargs) return self._returnОБНОВЛЕННЫЙ ОТВЕТ НИЖЕ
это мой самый популярный ответ, поэтому я решил обновить код, который будет работать как на py2, так и на py3.
кроме того, я вижу много ответов на этот вопрос из-за отсутствия понимания относительно резьбы.присоединяться.)( Некоторые полностью не справляются с
timeoutarg. Но есть также угловой случай, который вы должны знать о случаях, когда у вас есть (1) целевая функция, которая может возвращатьNoneи (2) вы также передаетеtimeoutarg, чтобы присоединиться (). Пожалуйста, смотрите "тест 4", чтобы понять этот угловой случай.ThreadWithReturn класс, который работает с py2 и py3:
import sys from threading import Thread from builtins import super # https://stackoverflow.com/a/30159479 if sys.version_info >= (3, 0): _thread_target_key = '_target' _thread_args_key = '_args' _thread_kwargs_key = '_kwargs' else: _thread_target_key = '_Thread__target' _thread_args_key = '_Thread__args' _thread_kwargs_key = '_Thread__kwargs' class ThreadWithReturn(Thread): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._return = None def run(self): target = getattr(self, _thread_target_key) if not target is None: self._return = target(*getattr(self, _thread_args_key), **getattr(self, _thread_kwargs_key)) def join(self, *args, **kwargs): super().join(*args, **kwargs) return self._returnнекоторые примеры тестов показаны ниже:
import time, random # TEST TARGET FUNCTION def giveMe(arg, seconds=None): if not seconds is None: time.sleep(seconds) return arg # TEST 1 my_thread = ThreadWithReturn(target=giveMe, args=('stringy',)) my_thread.start() returned = my_thread.join() # (returned == 'stringy') # TEST 2 my_thread = ThreadWithReturn(target=giveMe, args=(None,)) my_thread.start() returned = my_thread.join() # (returned is None) # TEST 3 my_thread = ThreadWithReturn(target=giveMe, args=('stringy',), kwargs={'seconds': 5}) my_thread.start() returned = my_thread.join(timeout=2) # (returned is None) # because join() timed out before giveMe() finished # TEST 4 my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5}) my_thread.start() returned = my_thread.join(timeout=random.randint(1, 10))можете ли вы определить угловой случай, который мы можем возможно, встреча с тестом 4?
проблема в том, что мы ожидаем, что giveMe() вернет None (см. Тест 2), но мы также ожидаем, что join() вернет None, если он истекает.
returned is Noneозначает:(1) вот что giveMe () вернул, или
(2) Время ожидания соединения ()
этот пример тривиален, так как мы знаем, что giveMe() всегда будет возвращать None. Но в реальном случае (где цель может законно вернуть None или что-то еще) мы хотели бы явно проверить, что произошло.
ниже показано, как решить этот угловой случай:
# TEST 4 my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5}) my_thread.start() returned = my_thread.join(timeout=random.randint(1, 10)) if my_thread.isAlive(): # returned is None because join() timed out # this also means that giveMe() is still running in the background pass # handle this based on your app's logic else: # join() is finished, and so is giveMe() # BUT we could also be in a race condition, so we need to update returned, just in case returned = my_thread.join()
мое решение проблемы заключается в том, чтобы обернуть функцию и поток в класс. Не требует использования пулов,очередей или передачи переменных типа C. Он также не блокирует. Вместо этого вы проверяете статус. См. пример его использования в конце кода.
import threading class ThreadWorker(): ''' The basic idea is given a function create an object. The object can then run the function in a thread. It provides a wrapper to start it,check its status,and get data out the function. ''' def __init__(self,func): self.thread = None self.data = None self.func = self.save_data(func) def save_data(self,func): '''modify function to save its returned data''' def new_func(*args, **kwargs): self.data=func(*args, **kwargs) return new_func def start(self,params): self.data = None if self.thread is not None: if self.thread.isAlive(): return 'running' #could raise exception here #unless thread exists and is alive start or restart it self.thread = threading.Thread(target=self.func,args=params) self.thread.start() return 'started' def status(self): if self.thread is None: return 'not_started' else: if self.thread.isAlive(): return 'running' else: return 'finished' def get_results(self): if self.thread is None: return 'not_started' #could return exception else: if self.thread.isAlive(): return 'running' else: return self.data def add(x,y): return x +y add_worker = ThreadWorker(add) print add_worker.start((1,2,)) print add_worker.status() print add_worker.get_results()
Используя Очереди :
import threading, queue def calc_square(num, out_queue1): l = [] for x in num: l.append(x*x) out_queue1.put(l) arr = [1,2,3,4,5,6,7,8,9,10] out_queue1=queue.Queue() t1=threading.Thread(target=calc_square, args=(arr,out_queue1)) t1.start() t1.join() print (out_queue1.get())
вы можете определить изменяемый над областью действия потоковой функции и добавить к этому результат. (Я также изменил код, чтобы быть python3 совместимы)
returns = {} def foo(bar): print('hello {0}'.format(bar)) returns[bar] = 'foo' from threading import Thread t = Thread(target=foo, args=('world!',)) t.start() t.join() print(returns)возвращает
{'world!': 'foo'}Если вы используете функцию ввода в качестве ключа к вашим результатам dict, каждый уникальный вход гарантированно даст запись в результатах
вы можете использовать пул в качестве пула рабочих процессов, как показано ниже:
from multiprocessing import Pool def f1(x, y): return x*y if __name__ == '__main__': with Pool(processes=10) as pool: result = pool.apply(f1, (2, 3)) print(result)
принимая во внимание @iman комментировать @JakeBiesinger ответ я перекомпоновал его, чтобы иметь различное количество потоков:
from multiprocessing.pool import ThreadPool def foo(bar, baz): print 'hello {0}'.format(bar) return 'foo' + baz numOfThreads = 3 results = [] pool = ThreadPool(numOfThreads) for i in range(0, numOfThreads): results.append(pool.apply_async(foo, ('world', 'foo'))) # tuple of args for foo) # do some other stuff in the main process # ... # ... results = [r.get() for r in results] print results pool.close() pool.join()спасибо,
парень.
joinвсегда возвращаетNone, Я думаю, что вы должны подклассThreadдля обработки кодов возврата и так далее.
я использую эту обертку, которая удобно превращает любую функцию для запуска в
Thread- забота о его возвращаемое значение или исключение. Это не добавляетQueueнакладные расходы.def threading_func(f): """Decorator for running a function in a thread and handling its return value or exception""" def start(*args, **kw): def run(): try: th.ret = f(*args, **kw) except: th.exc = sys.exc_info() def get(timeout=None): th.join(timeout) if th.exc: raise th.exc[0], th.exc[1], th.exc[2] # py2 ##raise th.exc[1] #py3 return th.ret th = threading.Thread(None, run) th.exc = None th.get = get th.start() return th return startПримеры Использования
def f(x): return 2.5 * x th = threading_func(f)(4) print("still running?:", th.is_alive()) print("result:", th.get(timeout=1.0)) @threading_func def th_mul(a, b): return a * b th = th_mul("text", 2.5) try: print(th.get()) except TypeError: print("exception thrown ok.")Примечания
threadingмодульудобная обработка возвращаемого значения и исключений для потоковой функции является частой" Питонической " потребностью и действительно должна быть предложена
threadingмодуль - возможно непосредственно в стандартеThreadкласса.ThreadPoolимеет слишком много накладных расходов для простых задач - 3 управление потоками, много бюрократии. К сожалениюThreadмакет был скопирован с Java изначально - который вы видите, например, из все еще бесполезного 1-го (!) параметр конструктораgroup.
Как уже упоминалось, многопроцессорный пул намного медленнее, чем базовая резьба. Использование очередей, как предлагается в некоторых ответах, является очень эффективной альтернативой. Я использую его со словарями, чтобы иметь возможность запускать много небольших потоков и восстанавливать несколько ответов, объединяя их со словарями:
#!/usr/bin/env python3 import threading # use Queue for python2 import queue import random LETTERS = 'abcdefghijklmnopqrstuvwxyz' LETTERS = [ x for x in LETTERS ] NUMBERS = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] def randoms(k, q): result = dict() result['letter'] = random.choice(LETTERS) result['number'] = random.choice(NUMBERS) q.put({k: result}) threads = list() q = queue.Queue() results = dict() for name in ('alpha', 'oscar', 'yankee',): threads.append( threading.Thread(target=randoms, args=(name, q)) ) threads[-1].start() _ = [ t.join() for t in threads ] while not q.empty(): results.update(q.get()) print(results)
определите свою цель в
1) принимают аргументq
2) заменить любые заявленияreturn fooСq.put(foo); returnтак что функция
def func(a): ans = a * a return ansстанет
def func(a, q): ans = a * a q.put(ans) returnи тогда вы действовали бы как таковой
from Queue import Queue from threading import Thread ans_q = Queue() arg_tups = [(i, ans_q) for i in xrange(10)] threads = [Thread(target=func, args=arg_tup) for arg_tup in arg_tups] _ = [t.start() for t in threads] _ = [t.join() for t in threads] results = [q.get() for _ in xrange(len(threads))]и вы можете использовать функции декораторы / обертки, чтобы сделать его так, что вы можете использовать существующие функции как
targetне изменяя их, но следуйте этой основной схеме.
одно обычное решение-обернуть вашу функцию
fooс декоратором, какresult = queue.Queue() def task_wrapper(*args): result.put(target(*args))тогда весь код может выглядеть так
result = queue.Queue() def task_wrapper(*args): result.put(target(*args)) threads = [threading.Thread(target=task_wrapper, args=args) for args in args_list] for t in threads: t.start() while(True): if(len(threading.enumerate()) < max_num): break for t in threads: t.join() return resultПримечание
одна важная проблема заключается в том, что возвращаемые значения могут быть unorderred. (На самом деле,
return value- это не обязательно сохраняется вqueue, так как вы можете выбрать произвольный потокобезопасным структура данных )
почему бы просто не использовать глобальную переменную?
import threading class myThread(threading.Thread): def __init__(self, ind, lock): threading.Thread.__init__(self) self.ind = ind self.lock = lock def run(self): global results with self.lock: results.append(self.ind) results = [] lock = threading.Lock() threads = [myThread(x, lock) for x in range(1, 4)] for t in threads: t.start() for t in threads: t.join() print(results)
очень простой способ сделать это для таких манекенов, как я:
import queue import threading # creating queue instance q = queue.Queue() # creating threading class class AnyThread(): def __init__ (self): threading.Thread.__init__(self) def run(self): # in this class and function we will put our test target function test() t = AnyThread() # having our test target function def test(): # do something in this function: result = 3 + 2 # and put result to a queue instance q.put(result) for i in range(3): #calling our threading fucntion 3 times (just for example) t.run() output = q.get() # here we get output from queue instance print(output) >>> 5 >>> 5 >>> 5главное здесь - это
queueмодуль. Мы создаемqueue.Queue()экземпляр и включить его в нашу функцию. Мы кормим его своим результатом, который позже мы получаем за пределами нити.см. Еще один пример с аргументами, переданными нашей тестовой функции:
import queue import threading # creating queue instance q = queue.Queue() # creating threading class class AnyThread(): def __init__ (self): threading.Thread.__init__(self) def run(self, a, b): # in this class and function we will put our execution test function test(a, b) t = AnyThread() # having our test target function def test(a, b): # do something in this function: result = a + b # and put result to a queue instance q.put(result) for i in range(3): #calling our threading fucntion 3 times (just for example) t.run(3+i, 2+i) output = q.get() # here we get output from queue instance print(output) >>> 5 >>> 7 >>> 9
идея GuySoft отличная, но я думаю, что объект не обязательно должен наследовать от Thread и start () может быть удален из интерфейса:
from threading import Thread import queue class ThreadWithReturnValue(object): def __init__(self, target=None, args=(), **kwargs): self._que = queue.Queue() self._t = Thread(target=lambda q,arg1,kwargs1: q.put(target(*arg1, **kwargs1)) , args=(self._que, args, kwargs), ) self._t.start() def join(self): self._t.join() return self._que.get() def foo(bar): print('hello {0}'.format(bar)) return "foo" twrv = ThreadWithReturnValue(target=foo, args=('world!',)) print(twrv.join()) # prints foo
Kindall это В Python3
class ThreadWithReturnValue(Thread): def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None): Thread.__init__(self, group, target, name, args, kwargs, daemon) self._return = None def run(self): try: if self._target: self._return = self._target(*self._args, **self._kwargs) finally: del self._target, self._args, self._kwargs def join(self,timeout=None): Thread.join(self,timeout) return self._return
Comments