Поймать исключение в потоке, в потоке абонента в Python



Я очень новичок в Python и многопоточном программировании в целом. В принципе, у меня есть скрипт, который будет копировать файлы в другое место. Я хотел бы, чтобы это было помещено в другой поток, чтобы я мог вывести ...., чтобы указать, что скрипт еще работает.



проблема, с которой я сталкиваюсь, заключается в том, что если файлы не могут быть скопированы, это вызовет исключение. Это нормально, если выполняется в основном потоке; однако наличие следующего кода не делает работа:



try:
threadClass = TheThread(param1, param2, etc.)
threadClass.start() ##### **Exception takes place here**
except:
print "Caught an exception"


в самом классе thread я попытался повторно бросить исключение, но оно не работает. Я видел, как люди здесь задают подобные вопросы, но все они, похоже, делают что-то более конкретное, чем то, что я пытаюсь сделать (и я не совсем понимаю предлагаемые решения). Я видел, как люди упоминают использование sys.exc_info(), но я не знаю, где и как ее использовать.



вся помощь очень ценится!



EDIT: в код для класса thread приведен ниже:



class TheThread(threading.Thread):
def __init__(self, sourceFolder, destFolder):
threading.Thread.__init__(self)
self.sourceFolder = sourceFolder
self.destFolder = destFolder

def run(self):
try:
shul.copytree(self.sourceFolder, self.destFolder)
except:
raise
1075   12  

12 ответов:

проблема в том, что thread_obj.start() сразу возвращается. Дочерний поток, который вы породили, выполняется в своем собственном контексте с собственным стеком. Любое исключение, которое возникает там, находится в контексте дочернего потока, и оно находится в своем собственном стеке. Один из способов, который я могу придумать прямо сейчас, чтобы передать эту информацию родительскому потоку, - это использовать какую-то передачу сообщений, поэтому вы можете посмотреть на это.

Попробуй вот это:

import sys
import threading
import Queue


class ExcThread(threading.Thread):

    def __init__(self, bucket):
        threading.Thread.__init__(self)
        self.bucket = bucket

    def run(self):
        try:
            raise Exception('An error occured here.')
        except Exception:
            self.bucket.put(sys.exc_info())


def main():
    bucket = Queue.Queue()
    thread_obj = ExcThread(bucket)
    thread_obj.start()

    while True:
        try:
            exc = bucket.get(block=False)
        except Queue.Empty:
            pass
        else:
            exc_type, exc_obj, exc_trace = exc
            # deal with the exception
            print exc_type, exc_obj
            print exc_trace

        thread_obj.join(0.1)
        if thread_obj.isAlive():
            continue
        else:
            break


if __name__ == '__main__':
    main()

хотя невозможно напрямую поймать исключение, брошенное в другой поток, вот код, чтобы совершенно прозрачно получить что-то очень близкое к этой функции. Ваш дочерний поток должен подкласс ExThread класс вместо threading.Thread и родительский поток должен вызвать child_thread.join_with_exception() вместо child_thread.join() при ожидании потока, чтобы закончить свою работу.

технические детали этой реализации: когда дочерний поток вызывает исключение, оно передается родитель через Queue и снова бросил в родительский поток. Обратите внимание, что в этом подходе нет занятого ожидания .

#!/usr/bin/env python

import sys
import threading
import Queue

class ExThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.__status_queue = Queue.Queue()

    def run_with_exception(self):
        """This method should be overriden."""
        raise NotImplementedError

    def run(self):
        """This method should NOT be overriden."""
        try:
            self.run_with_exception()
        except BaseException:
            self.__status_queue.put(sys.exc_info())
        self.__status_queue.put(None)

    def wait_for_exc_info(self):
        return self.__status_queue.get()

    def join_with_exception(self):
        ex_info = self.wait_for_exc_info()
        if ex_info is None:
            return
        else:
            raise ex_info[1]

class MyException(Exception):
    pass

class MyThread(ExThread):
    def __init__(self):
        ExThread.__init__(self)

    def run_with_exception(self):
        thread_name = threading.current_thread().name
        raise MyException("An error in thread '{}'.".format(thread_name))

def main():
    t = MyThread()
    t.start()
    try:
        t.join_with_exception()
    except MyException as ex:
        thread_name = threading.current_thread().name
        print "Caught a MyException in thread '{}': {}".format(thread_name, ex)

if __name__ == '__main__':
    main()

The concurrent.futures модуль позволяет легко выполнять работу в отдельных потоках( или процессах) и обрабатывать любые возникающие исключения:

import concurrent.futures
import shutil

def copytree_with_dots(src_path, dst_path):
    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
        # Execute the copy on a separate thread,
        # creating a future object to track progress.
        future = executor.submit(shutil.copytree, src_path, dst_path)

        while future.running():
            # Print pretty dots here.
            pass

        # Return the value returned by shutil.copytree(), None.
        # Raise any exceptions raised during the copy process.
        return future.result()

concurrent.futures входит в состав Python 3.2 и доступен как backported futures модуль для более ранних версий.

если исключение возникает в потоке, лучший способ-повторно поднять его в вызывающем потоке во время join. Вы можете получить информацию об исключении в настоящее время обрабатывается с помощью . Эта информация может просто храниться как свойство объекта thread до join вызывается, и в этот момент он может быть повторно поднят.

обратите внимание, что a Queue.Queue (как предлагается в других ответах) не нужно в этом простом случае, когда поток бросает на большинство 1 исключение и завершается сразу после вызова исключения. Мы избегаем условий гонки, просто ожидая завершения потока.

например, extend ExcThread (ниже), заменив excRun (вместо run).

Python 2.x:

import threading

class ExcThread(threading.Thread):
  def excRun(self):
    pass

  def run(self):
    self.exc = None
    try:
      # Possibly throws an exception
      self.excRun()
    except:
      import sys
      self.exc = sys.exc_info()
      # Save details of the exception thrown but don't rethrow,
      # just complete the function

  def join(self):
    threading.Thread.join(self)
    if self.exc:
      msg = "Thread '%s' threw an exception: %s" % (self.getName(), self.exc[1])
      new_exc = Exception(msg)
      raise new_exc.__class__, new_exc, self.exc[2]

Python 3.x:

форма 3 аргумента для raise ушел в Python 3, поэтому измените последнюю строку на:

raise new_exc.with_traceback(self.exc[2])

есть много действительно странно сложных ответов на этот вопрос. Я слишком упрощаю это, потому что это кажется мне достаточным для большинства вещей.

from threading import Thread

class PropagatingThread(Thread):
    def run(self):
        self.exc = None
        try:
            if hasattr(self, '_Thread__target'):
                # Thread uses name mangling prior to Python 3.
                self.ret = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
            else:
                self.ret = self._target(*self._args, **self._kwargs)
        except BaseException as e:
            self.exc = e

    def join(self):
        super(PropagatingThread, self).join()
        if self.exc:
            raise self.exc
        return self.ret

Если вы уверены, что вы будете работать только на одной или другой версии Python, вы можете уменьшить run() метод вплоть до только искаженной версии (если вы будете работать только на версиях Python до 3), или только чистая версия (если вы будете работать только на версиях Python, начиная с 3).

пример использования:

def f(*args, **kwargs)
    print(args)
    print(kwargs)
    raise Exception('I suck')

t = PropagatingThread(target=f, args=(5,), kwargs={'hello':'world'})
t.start()
t.join()

и вы увидите исключение, возникшее в другом потоке, когда вы присоединитесь.

Это была неприятная маленькая проблема, и я хотел бы бросить свое решение. Некоторые другие решения я нашел (async.io например) выглядел многообещающе, но также представил немного черного ящика. Подход очереди / цикла событий как бы привязывает вас к определенной реализации. параллельный исходный код фьючерсов, однако, составляет около 1000 строк и легко понять. Это позволило мне легко решить мою проблему: создать специальные рабочие потоки без особых настроек и иметь возможность поймать исключения в основном потоке.

мое решение использует параллельный фьючерсный API и потоковый API. Это позволяет вам создать работника, который дает вам как поток, так и будущее. Таким образом, вы можете присоединиться к потоку, чтобы дождаться результата:

worker = Worker(test)
thread = worker.start()
thread.join()
print(worker.future.result())

...или вы можете позволить работнику просто отправить обратный вызов, когда закончите:

worker = Worker(test)
thread = worker.start(lambda x: print('callback', x))

...или вы можете цикл до завершения события:

worker = Worker(test)
thread = worker.start()

while True:
    print("waiting")
    if worker.future.done():
        exc = worker.future.exception()
        print('exception?', exc)
        result = worker.future.result()
        print('result', result)           
        break
    time.sleep(0.25)

вот код:

from concurrent.futures import Future
import threading
import time

class Worker(object):
    def __init__(self, fn, args=()):
        self.future = Future()
        self._fn = fn
        self._args = args

    def start(self, cb=None):
        self._cb = cb
        self.future.set_running_or_notify_cancel()
        thread = threading.Thread(target=self.run, args=())
        thread.daemon = True #this will continue thread execution after the main thread runs out of code - you can still ctrl + c or kill the process
        thread.start()
        return thread

    def run(self):
        try:
            self.future.set_result(self._fn(*self._args))
        except BaseException as e:
            self.future.set_exception(e)

        if(self._cb):
            self._cb(self.future.result())

...и испытание функция:

def test(*args):
    print('args are', args)
    time.sleep(2)
    raise Exception('foo')

Как noobie к потоку, мне потребовалось много времени, чтобы понять, как реализовать код Матеуша Кобоса (выше). Вот уточненная версия, чтобы помочь понять, как ее использовать.

#!/usr/bin/env python

import sys
import threading
import Queue

class ExThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.__status_queue = Queue.Queue()

    def run_with_exception(self):
        """This method should be overriden."""
        raise NotImplementedError

    def run(self):
        """This method should NOT be overriden."""
        try:
            self.run_with_exception()
        except Exception:
            self.__status_queue.put(sys.exc_info())
        self.__status_queue.put(None)

    def wait_for_exc_info(self):
        return self.__status_queue.get()

    def join_with_exception(self):
        ex_info = self.wait_for_exc_info()
        if ex_info is None:
            return
        else:
            raise ex_info[1]

class MyException(Exception):
    pass

class MyThread(ExThread):
    def __init__(self):
        ExThread.__init__(self)

    # This overrides the "run_with_exception" from class "ExThread"
    # Note, this is where the actual thread to be run lives. The thread
    # to be run could also call a method or be passed in as an object
    def run_with_exception(self):
        # Code will function until the int
        print "sleeping 5 seconds"
        import time
        for i in 1, 2, 3, 4, 5:
            print i
            time.sleep(1) 
        # Thread should break here
        int("str")
# I'm honestly not sure why these appear here? So, I removed them. 
# Perhaps Mateusz can clarify?        
#         thread_name = threading.current_thread().name
#         raise MyException("An error in thread '{}'.".format(thread_name))

if __name__ == '__main__':
    # The code lives in MyThread in this example. So creating the MyThread 
    # object set the code to be run (but does not start it yet)
    t = MyThread()
    # This actually starts the thread
    t.start()
    print
    print ("Notice 't.start()' is considered to have completed, although" 
           " the countdown continues in its new thread. So you code "
           "can tinue into new processing.")
    # Now that the thread is running, the join allows for monitoring of it
    try:
        t.join_with_exception()
    # should be able to be replace "Exception" with specific error (untested)
    except Exception, e: 
        print
        print "Exceptioon was caught and control passed back to the main thread"
        print "Do some handling here...or raise a custom exception "
        thread_name = threading.current_thread().name
        e = ("Caught a MyException in thread: '" + 
             str(thread_name) + 
             "' [" + str(e) + "]")
        raise Exception(e) # Or custom class of exception, such as MyException

аналогичным способом, как RickardSjogren без очереди, системы и т. д. но и без некоторых слушателей сигналов: выполните непосредственно обработчик исключений, который соответствует блоку except.

#!/usr/bin/env python3

import threading

class ExceptionThread(threading.Thread):

    def __init__(self, callback=None, *args, **kwargs):
        """
        Redirect exceptions of thread to an exception handler.

        :param callback: function to handle occured exception
        :type callback: function(thread, exception)
        :param args: arguments for threading.Thread()
        :type args: tuple
        :param kwargs: keyword arguments for threading.Thread()
        :type kwargs: dict
        """
        self._callback = callback
        super().__init__(*args, **kwargs)

    def run(self):
        try:
            if self._target:
                self._target(*self._args, **self._kwargs)
        except BaseException as e:
            if self._callback is None:
                raise e
            else:
                self._callback(self, e)
        finally:
            # Avoid a refcycle if the thread is running a function with
            # an argument that has a member that points to the thread.
            del self._target, self._args, self._kwargs, self._callback

только на себя._callback и блок except в run () являются дополнительными к обычной резьбе.Нитка.

один метод, который мне нравится, основан на шаблон Observer. Я определяю класс сигнала, который мой поток использует для создания исключений для слушателей. Он также может использоваться для возврата значений из потоков. Пример:

import threading

class Signal:
    def __init__(self):
        self._subscribers = list()

    def emit(self, *args, **kwargs):
        for func in self._subscribers:
            func(*args, **kwargs)

    def connect(self, func):
        self._subscribers.append(func)

    def disconnect(self, func):
        try:
            self._subscribers.remove(func)
        except ValueError:
            raise ValueError('Function {0} not removed from {1}'.format(func, self))


class WorkerThread(threading.Thread):

    def __init__(self, *args, **kwargs):
        super(WorkerThread, self).__init__(*args, **kwargs)
        self.Exception = Signal()
        self.Result = Signal()

    def run(self):
        if self._Thread__target is not None:
            try:
                self._return_value = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
            except Exception as e:
                self.Exception.emit(e)
            else:
                self.Result.emit(self._return_value)

if __name__ == '__main__':
    import time

    def handle_exception(exc):
        print exc.message

    def handle_result(res):
        print res

    def a():
        time.sleep(1)
        raise IOError('a failed')

    def b():
        time.sleep(2)
        return 'b returns'

    t = WorkerThread(target=a)
    t2 = WorkerThread(target=b)
    t.Exception.connect(handle_exception)
    t2.Result.connect(handle_result)
    t.start()
    t2.start()

    print 'Threads started'

    t.join()
    t2.join()
    print 'Done'

У меня нет достаточного опыта работы с потоками, чтобы утверждать, что это абсолютно безопасный метод. Но это сработало для меня, и мне нравится гибкость.

использование голых исключений не является хорошей практикой, потому что вы обычно ловите больше, чем вы торгуетесь.

Я бы предложил изменить except чтобы поймать только исключение, которое вы хотели бы обработать. Я не думаю, что поднятие его имеет желаемый эффект, потому что когда вы идете в instantiate TheThread в наружном try, если он вызывает исключение, назначение никогда не произойдет.

вместо этого вы можете просто предупредить об этом и двигаться дальше, например как:

def run(self):
    try:
       shul.copytree(self.sourceFolder, self.destFolder)
    except OSError, err:
       print err

затем, когда это исключение будет поймано, вы можете справиться с ним там. Тогда когда внешний try ловит исключение из TheThread, вы знаете, что это не будет тот, который вы уже обработали, и поможет вам изолировать поток процесса.

простой способ перехвата исключения потока и передачи обратно вызывающему методу может быть путем передачи словаря или списка в worker метод.

пример (передача словаря рабочему методу):

import threading

def my_method(throw_me):
    raise Exception(throw_me)

def worker(shared_obj, *args, **kwargs):
    try:
        shared_obj['target'](*args, **kwargs)
    except Exception as err:
        shared_obj['err'] = err

shared_obj = {'err':'', 'target': my_method}
throw_me = "Test"

th = threading.Thread(target=worker, args=(shared_obj, throw_me), kwargs={})
th.start()
th.join()

if shared_obj['err']:
    print(">>%s" % shared_obj['err'])

Я знаю, что немного опоздал на вечеринку здесь, но у меня была очень похожая проблема, но она включала использование tkinter в качестве графического интерфейса, а mainloop не позволял использовать ни одно из решений, которые зависят .присоединяться.)( Поэтому я адаптировал решение, данное в редактировании исходного вопроса, но сделал его более общим, чтобы облегчить его понимание для других.

вот новый класс потока в действии:

import threading
import traceback
import logging


class ExceptionThread(threading.Thread):
    def __init__(self, *args, **kwargs):
        threading.Thread.__init__(self, *args, **kwargs)

    def run(self):
        try:
            if self._target:
                self._target(*self._args, **self._kwargs)
        except Exception:
            logging.error(traceback.format_exc())


def test_function_1(input):
    raise IndexError(input)


if __name__ == "__main__":
    input = 'useful'

    t1 = ExceptionThread(target=test_function_1, args=[input])
    t1.start()

конечно, вы всегда можете иметь его обрабатывать исключение какой-либо другой способ ведения журнала, например, его печать или вывод на консоль.

Это позволяет использовать класс ExceptionThread точно так же, как и класс Thread, без каких-либо специальных изменений.

Comments

    Ничего не найдено.