как получить возвращаемое значение из потока в Python?



как получить значение 'foo', который возвращается из потока?



from threading import Thread

def foo(bar):
print 'hello {}'.format(bar)
return 'foo'

thread = Thread(target=foo, args=('world!',))
thread.start()
ret = thread.join()
print ret


один очевидный способ сделать это, как показано выше, возвращает None.

15023   20  

20 ответов:

FWIW, the multiprocessing модуль имеет хороший интерфейс для этого с помощью Pool класса. И если вы хотите придерживаться темы, а не процессов, вы можете просто использовать multiprocessing.pool.ThreadPool класс как падени-в замене.

def foo(bar, baz):
  print 'hello {0}'.format(bar)
  return 'foo' + baz

from multiprocessing.pool import ThreadPool
pool = ThreadPool(processes=1)

async_result = pool.apply_async(foo, ('world', 'foo')) # tuple of args for foo

# do some other stuff in the main process

return_val = async_result.get()  # get the return value from your function.

один из способов, который я видел, - передать изменяемый объект, такой как список или словарь, конструктору потока вместе с индексом или другим идентификатором какого-либо рода. Затем поток может хранить свои результаты в своем выделенном слоте в этом объекте. Например:

def foo(bar, result, index):
    print 'hello {0}'.format(bar)
    result[index] = "foo"

from threading import Thread

threads = [None] * 10
results = [None] * 10

for i in range(len(threads)):
    threads[i] = Thread(target=foo, args=('world!', results, i))
    threads[i].start()

# do some other stuff

for i in range(len(threads)):
    threads[i].join()

print " ".join(results)  # what sound does a metasyntactic locomotive make?

если вы действительно хотите join() вернуть возвращаемое значение вызываемой функции, вы можете сделать это с помощью Thread подкласс следующим образом:

from threading import Thread

def foo(bar):
    print 'hello {0}'.format(bar)
    return "foo"

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, Verbose=None):
        Thread.__init__(self, group, target, name, args, kwargs, Verbose)
        self._return = None
    def run(self):
        if self._Thread__target is not None:
            self._return = self._Thread__target(*self._Thread__args,
                                                **self._Thread__kwargs)
    def join(self):
        Thread.join(self)
        return self._return

twrv = ThreadWithReturnValue(target=foo, args=('world!',))

twrv.start()
print twrv.join()   # prints foo

это становится немного волосатым из-за некоторых имя искажения, и он обращается к" частным " структурам данных, которые являются специфичными для Thread реализация... но это работает.

для python3

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, Verbose=None):
        Thread.__init__(self, group, target, name, args, kwargs)
        self._return = None
    def run(self):
        print(type(self._target))
        if self._target is not None:
            self._return = self._target(*self._args,
                                                **self._kwargs)
    def join(self, *args):
        Thread.join(self, *args)
        return self._return

ответ Джейка хорош, но если вы не хотите использовать threadpool (вы не знаете, сколько потоков вам понадобится, но создаете их по мере необходимости), то хорошим способом передачи информации между потоками является встроенный очереди.Очередь класс, так как он обеспечивает потокобезопасность.

я создал следующий декоратор, чтобы заставить его действовать аналогично threadpool:

def threaded(f, daemon=False):
    import Queue

    def wrapped_f(q, *args, **kwargs):
        '''this function calls the decorated function and puts the 
        result in a queue'''
        ret = f(*args, **kwargs)
        q.put(ret)

    def wrap(*args, **kwargs):
        '''this is the function returned from the decorator. It fires off
        wrapped_f in a new thread and returns the thread object with
        the result queue attached'''

        q = Queue.Queue()

        t = threading.Thread(target=wrapped_f, args=(q,)+args, kwargs=kwargs)
        t.daemon = daemon
        t.start()
        t.result_queue = q        
        return t

    return wrap

тогда вы просто используете его как:

@threaded
def long_task(x):
    import time
    x = x + 5
    time.sleep(5)
    return x

# does not block, returns Thread object
y = long_task(10)
print y

# this blocks, waiting for the result
result = y.result_queue.get()
print result

декорированные функция создает новый поток каждый раз, когда он вызывается и возвращает объект потока, который содержит очередь, которая получит результат.

обновление

прошло довольно много времени с тех пор, как я опубликовал этот ответ, но он все еще получает представления, поэтому я подумал, что обновлю его, чтобы отразить, как я это делаю в более новых версиях Python:

Python 3.2 добавлен в concurrent.futures модуль, который обеспечивает интерфейс высокого уровня для параллельных задач. Он обеспечивает ThreadPoolExecutor и ProcessPoolExecutor, так что вы можете использовать поток или пул процессов с тем же api.

одним из преимуществ этого api является то, что отправка задачи в Executor возвращает a Future объект, который будет дополнен возвращаемым значением вызываемого объекта, который вы отправляете.

это делает присоединение queue объект ненужный, что упрощает декоратор совсем немного:

_DEFAULT_POOL = ThreadPoolExecutor()

def threadpool(f, executor=None):
    @wraps(f)
    def wrap(*args, **kwargs):
        return (executor or _DEFAULT_POOL).submit(f, *args, **kwargs)

    return wrap

это будет использовать значение по умолчанию модуль ThreadPool executor, если он не передается.

использование очень похоже на предыдущее:

@threadpool
def long_task(x):
    import time
    x = x + 5
    time.sleep(5)
    return x

# does not block, returns Future object
y = long_task(10)
print y

# this blocks, waiting for the result
result = y.result()
print result

если вы используете Python 3.4+, одна действительно хорошая особенность использования этого метода (и будущих объектов в целом) заключается в том, что возвращаемое будущее может быть обернуто, чтобы превратить его в asyncio.Future С asyncio.wrap_future. Это позволяет легко работать с сопрограммами:

result = await asyncio.wrap_future(long_task(10))

Если вам не нужен доступ к базовому

другое решение, которое не требует изменения существующего кода:

import Queue
from threading import Thread

def foo(bar):
    print 'hello {0}'.format(bar)
    return 'foo'

que = Queue.Queue()

t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
t.join()
result = que.get()
print result

его можно также легко отрегулировать к многопоточной окружающей среде:

import Queue
from threading import Thread

def foo(bar):
    print 'hello {0}'.format(bar)
    return 'foo'

que = Queue.Queue()
threads_list = list()

t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
threads_list.append(t)

# Add more threads here
...
threads_list.append(t2)
...
threads_list.append(t3)
...

# Join all the threads
for t in threads_list:
    t.join()

# Check thread's return value
while not que.empty():
    result = que.get()
    print result

Паррис / kindall это!--5-->ответjoin/return ответ портирован на Python 3:

from threading import Thread

def foo(bar):
    print('hello {0}'.format(bar))
    return "foo"

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None):
        Thread.__init__(self, group, target, name, args, kwargs, daemon=daemon)

        self._return = None

    def run(self):
        if self._target is not None:
            self._return = self._target(*self._args, **self._kwargs)

    def join(self):
        Thread.join(self)
        return self._return


twrv = ThreadWithReturnValue(target=foo, args=('world!',))

twrv.start()
print(twrv.join())   # prints foo

внимание Thread класс реализован по-разному в Python 3.

я украл ответ kindall и очистил его немного.

ключевой частью является добавление *args и * * kwargs для соединения () для обработки таймаута

class threadWithReturn(Thread):
    def __init__(self, *args, **kwargs):
        super(threadWithReturn, self).__init__(*args, **kwargs)

        self._return = None

    def run(self):
        if self._Thread__target is not None:
            self._return = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)

    def join(self, *args, **kwargs):
        super(threadWithReturn, self).join(*args, **kwargs)

        return self._return

ОБНОВЛЕННЫЙ ОТВЕТ НИЖЕ

это мой самый популярный ответ, поэтому я решил обновить код, который будет работать как на py2, так и на py3.

кроме того, я вижу много ответов на этот вопрос из-за отсутствия понимания относительно резьбы.присоединяться.)( Некоторые полностью не справляются с timeout arg. Но есть также угловой случай, который вы должны знать о случаях, когда у вас есть (1) целевая функция, которая может возвращать None и (2) вы также передаете timeout arg, чтобы присоединиться (). Пожалуйста, смотрите "тест 4", чтобы понять этот угловой случай.

ThreadWithReturn класс, который работает с py2 и py3:

import sys
from threading import Thread
from builtins import super    # https://stackoverflow.com/a/30159479

if sys.version_info >= (3, 0):
    _thread_target_key = '_target'
    _thread_args_key = '_args'
    _thread_kwargs_key = '_kwargs'
else:
    _thread_target_key = '_Thread__target'
    _thread_args_key = '_Thread__args'
    _thread_kwargs_key = '_Thread__kwargs'

class ThreadWithReturn(Thread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._return = None

    def run(self):
        target = getattr(self, _thread_target_key)
        if not target is None:
            self._return = target(*getattr(self, _thread_args_key), **getattr(self, _thread_kwargs_key))

    def join(self, *args, **kwargs):
        super().join(*args, **kwargs)
        return self._return

некоторые примеры тестов показаны ниже:

import time, random

# TEST TARGET FUNCTION
def giveMe(arg, seconds=None):
    if not seconds is None:
        time.sleep(seconds)
    return arg

# TEST 1
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',))
my_thread.start()
returned = my_thread.join()
# (returned == 'stringy')

# TEST 2
my_thread = ThreadWithReturn(target=giveMe, args=(None,))
my_thread.start()
returned = my_thread.join()
# (returned is None)

# TEST 3
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=2)
# (returned is None) # because join() timed out before giveMe() finished

# TEST 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))

можете ли вы определить угловой случай, который мы можем возможно, встреча с тестом 4?

проблема в том, что мы ожидаем, что giveMe() вернет None (см. Тест 2), но мы также ожидаем, что join() вернет None, если он истекает.

returned is None означает:

(1) вот что giveMe () вернул, или

(2) Время ожидания соединения ()

этот пример тривиален, так как мы знаем, что giveMe() всегда будет возвращать None. Но в реальном случае (где цель может законно вернуть None или что-то еще) мы хотели бы явно проверить, что произошло.

ниже показано, как решить этот угловой случай:

# TEST 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))

if my_thread.isAlive():
    # returned is None because join() timed out
    # this also means that giveMe() is still running in the background
    pass
    # handle this based on your app's logic
else:
    # join() is finished, and so is giveMe()
    # BUT we could also be in a race condition, so we need to update returned, just in case
    returned = my_thread.join()

мое решение проблемы заключается в том, чтобы обернуть функцию и поток в класс. Не требует использования пулов,очередей или передачи переменных типа C. Он также не блокирует. Вместо этого вы проверяете статус. См. пример его использования в конце кода.

import threading

class ThreadWorker():
    '''
    The basic idea is given a function create an object.
    The object can then run the function in a thread.
    It provides a wrapper to start it,check its status,and get data out the function.
    '''
    def __init__(self,func):
        self.thread = None
        self.data = None
        self.func = self.save_data(func)

    def save_data(self,func):
        '''modify function to save its returned data'''
        def new_func(*args, **kwargs):
            self.data=func(*args, **kwargs)

        return new_func

    def start(self,params):
        self.data = None
        if self.thread is not None:
            if self.thread.isAlive():
                return 'running' #could raise exception here

        #unless thread exists and is alive start or restart it
        self.thread = threading.Thread(target=self.func,args=params)
        self.thread.start()
        return 'started'

    def status(self):
        if self.thread is None:
            return 'not_started'
        else:
            if self.thread.isAlive():
                return 'running'
            else:
                return 'finished'

    def get_results(self):
        if self.thread is None:
            return 'not_started' #could return exception
        else:
            if self.thread.isAlive():
                return 'running'
            else:
                return self.data

def add(x,y):
    return x +y

add_worker = ThreadWorker(add)
print add_worker.start((1,2,))
print add_worker.status()
print add_worker.get_results()

Используя Очереди :

import threading, queue

def calc_square(num, out_queue1):
  l = []
  for x in num:
    l.append(x*x)
  out_queue1.put(l)


arr = [1,2,3,4,5,6,7,8,9,10]
out_queue1=queue.Queue()
t1=threading.Thread(target=calc_square, args=(arr,out_queue1))
t1.start()
t1.join()
print (out_queue1.get())

вы можете определить изменяемый над областью действия потоковой функции и добавить к этому результат. (Я также изменил код, чтобы быть python3 совместимы)

returns = {}
def foo(bar):
    print('hello {0}'.format(bar))
    returns[bar] = 'foo'

from threading import Thread
t = Thread(target=foo, args=('world!',))
t.start()
t.join()
print(returns)

возвращает {'world!': 'foo'}

Если вы используете функцию ввода в качестве ключа к вашим результатам dict, каждый уникальный вход гарантированно даст запись в результатах

вы можете использовать пул в качестве пула рабочих процессов, как показано ниже:

from multiprocessing import Pool


def f1(x, y):
    return x*y


if __name__ == '__main__':
    with Pool(processes=10) as pool:
        result = pool.apply(f1, (2, 3))
        print(result)

принимая во внимание @iman комментировать @JakeBiesinger ответ я перекомпоновал его, чтобы иметь различное количество потоков:

from multiprocessing.pool import ThreadPool

def foo(bar, baz):
    print 'hello {0}'.format(bar)
    return 'foo' + baz

numOfThreads = 3 
results = []

pool = ThreadPool(numOfThreads)

for i in range(0, numOfThreads):
    results.append(pool.apply_async(foo, ('world', 'foo'))) # tuple of args for foo)

# do some other stuff in the main process
# ...
# ...

results = [r.get() for r in results]
print results

pool.close()
pool.join()

спасибо,

парень.

join всегда возвращает None, Я думаю, что вы должны подкласс Thread для обработки кодов возврата и так далее.

я использую эту обертку, которая удобно превращает любую функцию для запуска в Thread - забота о его возвращаемое значение или исключение. Это не добавляет Queue накладные расходы.

def threading_func(f):
    """Decorator for running a function in a thread and handling its return
    value or exception"""
    def start(*args, **kw):
        def run():
            try:
                th.ret = f(*args, **kw)
            except:
                th.exc = sys.exc_info()
        def get(timeout=None):
            th.join(timeout)
            if th.exc:
                raise th.exc[0], th.exc[1], th.exc[2] # py2
                ##raise th.exc[1] #py3                
            return th.ret
        th = threading.Thread(None, run)
        th.exc = None
        th.get = get
        th.start()
        return th
    return start

Примеры Использования

def f(x):
    return 2.5 * x
th = threading_func(f)(4)
print("still running?:", th.is_alive())
print("result:", th.get(timeout=1.0))

@threading_func
def th_mul(a, b):
    return a * b
th = th_mul("text", 2.5)

try:
    print(th.get())
except TypeError:
    print("exception thrown ok.")

Примечания threading модуль

удобная обработка возвращаемого значения и исключений для потоковой функции является частой" Питонической " потребностью и действительно должна быть предложена threading модуль - возможно непосредственно в стандарте Thread класса. ThreadPool имеет слишком много накладных расходов для простых задач - 3 управление потоками, много бюрократии. К сожалению Threadмакет был скопирован с Java изначально - который вы видите, например, из все еще бесполезного 1-го (!) параметр конструктора group.

Как уже упоминалось, многопроцессорный пул намного медленнее, чем базовая резьба. Использование очередей, как предлагается в некоторых ответах, является очень эффективной альтернативой. Я использую его со словарями, чтобы иметь возможность запускать много небольших потоков и восстанавливать несколько ответов, объединяя их со словарями:

#!/usr/bin/env python3

import threading
# use Queue for python2
import queue
import random

LETTERS = 'abcdefghijklmnopqrstuvwxyz'
LETTERS = [ x for x in LETTERS ]

NUMBERS = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

def randoms(k, q):
    result = dict()
    result['letter'] = random.choice(LETTERS)
    result['number'] = random.choice(NUMBERS)
    q.put({k: result})

threads = list()
q = queue.Queue()
results = dict()

for name in ('alpha', 'oscar', 'yankee',):
    threads.append( threading.Thread(target=randoms, args=(name, q)) )
    threads[-1].start()
_ = [ t.join() for t in threads ]
while not q.empty():
    results.update(q.get())

print(results)

определите свою цель в
1) принимают аргумент q
2) заменить любые заявления return foo С q.put(foo); return

так что функция

def func(a):
    ans = a * a
    return ans

станет

def func(a, q):
    ans = a * a
    q.put(ans)
    return

и тогда вы действовали бы как таковой

from Queue import Queue
from threading import Thread

ans_q = Queue()
arg_tups = [(i, ans_q) for i in xrange(10)]

threads = [Thread(target=func, args=arg_tup) for arg_tup in arg_tups]
_ = [t.start() for t in threads]
_ = [t.join() for t in threads]
results = [q.get() for _ in xrange(len(threads))]

и вы можете использовать функции декораторы / обертки, чтобы сделать его так, что вы можете использовать существующие функции как target не изменяя их, но следуйте этой основной схеме.

одно обычное решение-обернуть вашу функцию foo с декоратором, как

result = queue.Queue()

def task_wrapper(*args):
    result.put(target(*args))

тогда весь код может выглядеть так

result = queue.Queue()

def task_wrapper(*args):
    result.put(target(*args))

threads = [threading.Thread(target=task_wrapper, args=args) for args in args_list]

for t in threads:
    t.start()
    while(True):
        if(len(threading.enumerate()) < max_num):
            break
for t in threads:
    t.join()
return result

Примечание

одна важная проблема заключается в том, что возвращаемые значения могут быть unorderred. (На самом деле,return value - это не обязательно сохраняется в queue, так как вы можете выбрать произвольный потокобезопасным структура данных )

почему бы просто не использовать глобальную переменную?

import threading


class myThread(threading.Thread):
    def __init__(self, ind, lock):
        threading.Thread.__init__(self)
        self.ind = ind
        self.lock = lock

    def run(self):
        global results
        with self.lock:
            results.append(self.ind)



results = []
lock = threading.Lock()
threads = [myThread(x, lock) for x in range(1, 4)]
for t in threads:
    t.start()
for t in threads:
    t.join()
print(results)

очень простой способ сделать это для таких манекенов, как я:

import queue
import threading

# creating queue instance
q = queue.Queue()

# creating threading class
class AnyThread():
    def __init__ (self):
        threading.Thread.__init__(self)

    def run(self):
        # in this class and function we will put our test target function
        test()

t = AnyThread()

# having our test target function
def test():
    # do something in this function:
    result = 3 + 2
    # and put result to a queue instance
    q.put(result)

for i in range(3): #calling our threading fucntion 3 times (just for example)
    t.run()
    output = q.get() # here we get output from queue instance
    print(output)

>>> 5
>>> 5
>>> 5

главное здесь - это queue модуль. Мы создаем queue.Queue() экземпляр и включить его в нашу функцию. Мы кормим его своим результатом, который позже мы получаем за пределами нити.

см. Еще один пример с аргументами, переданными нашей тестовой функции:

import queue
import threading

# creating queue instance
q = queue.Queue()

# creating threading class
class AnyThread():
    def __init__ (self):
        threading.Thread.__init__(self)

    def run(self, a, b):
        # in this class and function we will put our execution test function
        test(a, b)

t = AnyThread()

# having our test target function
def test(a, b):
    # do something in this function:
    result = a + b
    # and put result to a queue instance
    q.put(result)

for i in range(3): #calling our threading fucntion 3 times (just for example)
    t.run(3+i, 2+i)
    output = q.get() # here we get output from queue instance
    print(output)

>>> 5
>>> 7
>>> 9

идея GuySoft отличная, но я думаю, что объект не обязательно должен наследовать от Thread и start () может быть удален из интерфейса:

from threading import Thread
import queue
class ThreadWithReturnValue(object):
    def __init__(self, target=None, args=(), **kwargs):
        self._que = queue.Queue()
        self._t = Thread(target=lambda q,arg1,kwargs1: q.put(target(*arg1, **kwargs1)) ,
                args=(self._que, args, kwargs), )
        self._t.start()

    def join(self):
        self._t.join()
        return self._que.get()


def foo(bar):
    print('hello {0}'.format(bar))
    return "foo"

twrv = ThreadWithReturnValue(target=foo, args=('world!',))

print(twrv.join())   # prints foo

Kindall это В Python3

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, *, daemon=None):
        Thread.__init__(self, group, target, name, args, kwargs, daemon)
        self._return = None 

    def run(self):
        try:
            if self._target:
                self._return = self._target(*self._args, **self._kwargs)
        finally:
            del self._target, self._args, self._kwargs 

    def join(self,timeout=None):
        Thread.join(self,timeout)
        return self._return

Comments

    Ничего не найдено.