Как дождаться завершения всех потоков, используя ExecutorService?
Мне нужно выполнить некоторое количество задач 4 одновременно, что-то вроде этого:
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
taskExecutor.execute(new MyTask());
}
//...wait for completion somehow
Как я могу получить уведомление, как только все они будут завершены? На данный момент я не могу думать ни о чем лучше, чем установить какой-то глобальный счетчик задач и уменьшить его в конце каждой задачи, а затем контролировать в бесконечном цикле этот счетчик, чтобы стать 0; или получить список фьючерсов и в бесконечном цикле монитор isDone для всех из них. Какие лучшие решения не связаны с бесконечным петли?
спасибо.
22 ответов:
в основном на
ExecutorServiceвы называетеshutdown()а тоawaitTermination():ExecutorService taskExecutor = Executors.newFixedThreadPool(4); while(...) { taskExecutor.execute(new MyTask()); } taskExecutor.shutdown(); try { taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); } catch (InterruptedException e) { ... }
использовать CountDownLatch за:
CountDownLatch latch = new CountDownLatch(totalNumberOfTasks); ExecutorService taskExecutor = Executors.newFixedThreadPool(4); while(...) { taskExecutor.execute(new MyTask()); } try { latch.await(); } catch (InterruptedException E) { // handle }и в рамках вашей задачи (заключите в try / finally)
latch.countDown();
ExecutorService.invokeAll()делает это для вас.ExecutorService taskExecutor = Executors.newFixedThreadPool(4); List<Callable<?>> tasks; // your tasks // invokeAll() returns when all tasks are complete List<Future<?>> futures = taskExecutor.invokeAll(tasks);
вы также можете использовать списки фьючерсов:
List<Future> futures = new ArrayList<Future>(); // now add to it: futures.add(executorInstance.submit(new Callable<Void>() { public Void call() throws IOException { // do something return null; } }));затем, когда вы хотите присоединиться ко всем из них, его по существу эквивалент присоединения к каждому (с дополнительным преимуществом, что он повторно вызывает исключения из дочерних потоков в основной):
for(Future f: this.futures) { f.get(); }В основном трюк состоит в том, чтобы позвонить .get () на каждом будущем по одному за раз, вместо бесконечного циклического вызова isDone () on (all или each). Таким образом, Вы гарантированно "двигаться дальше" Через и мимо этого блока, как только последний отделка резьбы. Оговорка заключается в том, что с тех пор .get () вызов повторно вызывает исключения, если один из потоков умирает, вы бы подняли из этого, возможно, до того, как другие потоки закончили до завершения [чтобы избежать этого, вы можете добавить
catch ExecutionExceptionвокруг вызова get]. Другое предостережение заключается в том, что он сохраняет ссылку на все потоки, поэтому, если у них есть локальные переменные потока, они не будут собраны до тех пор, пока вы не пройдете этот блок (хотя вы можете обойти это, если это стало проблемой, удалив Будущее вне ArrayList). Если вы хотите знать, какое будущее "заканчивается первым", вы можете использовать что-то вроде https://stackoverflow.com/a/31885029/32453
просто мои два цента. Чтобы преодолеть требование
CountDownLatchчтобы заранее знать количество задач, вы можете сделать это по-старому, используя простойSemaphore.ExecutorService taskExecutor = Executors.newFixedThreadPool(4); int numberOfTasks=0; Semaphore s=new Semaphore(0); while(...) { taskExecutor.execute(new MyTask()); numberOfTasks++; } try { s.aquire(numberOfTasks); ...в вашей задаче просто позвоните
s.release()как быlatch.countDown();
в Java8 вы можете сделать это с помощью CompletableFuture:
ExecutorService es = Executors.newFixedThreadPool(4); List<Runnable> tasks = getTasks(); CompletableFuture<?>[] futures = tasks.stream() .map(task -> CompletableFuture.runAsync(task, es)) .toArray(CompletableFuture[]::new); CompletableFuture.allOf(futures).join(); es.shutdown();
The CyclicBarrier класс в Java 5 и более поздних версиях предназначен для такого рода вещей.
немного поздно для игры, но ради завершения...
вместо того, чтобы "ждать" завершения всех задач, вы можете думать в терминах голливудского принципа: "Не звони мне, я позвоню тебе" - когда я закончу. Я думаю, что полученный код является более элегантным...
гуавы предлагает некоторые интересные инструменты для достижения этой цели.
пример:
оберните ExecutorService в ListeningExecutorService::
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));отправить коллекция вызываемых объектов для выполнения ::
for (Callable<Integer> callable : callables) { ListenableFuture<Integer> lf = service.submit(callable); // listenableFutures is a collection listenableFutures.add(lf) });теперь существенная часть:
ListenableFuture<List<Integer>> lf = Futures.successfulAsList(listenableFutures);прикрепите обратный вызов к ListenableFuture, который вы можете использовать, чтобы получать уведомления, когда все фьючерсы завершены ::
Futures.addCallback(lf, new FutureCallback<List<Integer>>() { @Override public void onSuccess(List<Integer> result) { log.info("@@ finished processing {} elements", Iterables.size(result)); // do something with all the results } @Override public void onFailure(Throwable t) { log.info("@@ failed because of :: {}", t); } });Это также имеет то преимущество, что вы можете собрать все результаты в одном месте после завершения обработки...
дополнительная информация здесь
следуйте одному из следующих подходов.
- перебрать все будущее задачи, вернулся из
submitonExecutorServiceи проверить состояние с блокировкой вызоваget()onFutureобъект, предложенныйKiran- использовать
invokeAll()on ExecutorService- CountDownLatch за
- ForkJoinPool или исполнителей.html#newWorkStealingPool
- использовать
shutdown, awaitTermination, shutdownNowAPI ThreadPoolExecutor в правильной последовательностисвязанные вопросы SE:
вы можете обернуть свои задачи в другой runnable, который будет отправлять уведомления:
taskExecutor.execute(new Runnable() { public void run() { taskStartedNotification(); new MyTask().run(); taskFinishedNotification(); } });
Я просто написал пример программы, которая решает вашу проблему. Было дано краткое, поэтому я добавлю один. В то время как вы можете использовать
executor.shutdown()иexecutor.awaitTermination(), это не лучшая практика, так как время, затраченное различными потоками, будет непредсказуемым.ExecutorService es = Executors.newCachedThreadPool(); List<Callable<Integer>> tasks = new ArrayList<>(); for (int j = 1; j <= 10; j++) { tasks.add(new Callable<Integer>() { @Override public Integer call() throws Exception { int sum = 0; System.out.println("Starting Thread " + Thread.currentThread().getId()); for (int i = 0; i < 1000000; i++) { sum += i; } System.out.println("Stopping Thread " + Thread.currentThread().getId()); return sum; } }); } try { List<Future<Integer>> futures = es.invokeAll(tasks); int flag = 0; for (Future<Integer> f : futures) { Integer res = f.get(); System.out.println("Sum: " + res); if (!f.isDone()) flag = 1; } if (flag == 0) System.out.println("SUCCESS"); else System.out.println("FAILED"); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
просто чтобы обеспечить больше альтернатив здесь разные использовать защелки / барьеры. Вы также можете получить частичные результаты, пока все они не закончат использовать CompletionService.
из параллелизма Java на практике: "Если у вас есть пакет вычислений для отправки исполнителю, и вы хотите получить их результаты, когда они станут доступный, вы можете сохранить будущее, связанное с каждой задачей, и повторно опросить для завершения, вызвав get with a тайм-аут нулевой. Этот можно, но нудно. К счастью, есть лучше: - услуги завершения."
реализацияpublic class TaskSubmiter { private final ExecutorService executor; TaskSubmiter(ExecutorService executor) { this.executor = executor; } void doSomethingLarge(AnySourceClass source) { final List<InterestedResult> info = doPartialAsyncProcess(source); CompletionService<PartialResult> completionService = new ExecutorCompletionService<PartialResult>(executor); for (final InterestedResult interestedResultItem : info) completionService.submit(new Callable<PartialResult>() { public PartialResult call() { return InterestedResult.doAnOperationToGetPartialResult(); } }); try { for (int t = 0, n = info.size(); t < n; t++) { Future<PartialResult> f = completionService.take(); PartialResult PartialResult = f.get(); processThisSegment(PartialResult); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (ExecutionException e) { throw somethinghrowable(e.getCause()); } } }
вы можете использовать свой собственный подкласс ExecutorCompletionService обертывание
taskExecutor, и ваша собственная реализация BlockingQueue чтобы получить информацию, когда каждая задача завершается и выполнить любой обратный вызов или другое действие, которое вы хотите, когда количество выполненных задач достигает желаемой цели.
вы должны использовать
executorService.shutdown()иexecutorService.awaitTerminationметод.пример следующий:
public class ScheduledThreadPoolExample { public static void main(String[] args) throws InterruptedException { ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5); executorService.scheduleAtFixedRate(() -> System.out.println("process task."), 0, 1, TimeUnit.SECONDS); TimeUnit.SECONDS.sleep(10); executorService.shutdown(); executorService.awaitTermination(1, TimeUnit.DAYS); } }
Java 8 - мы можем использовать stream API для обработки потока. Пожалуйста, смотрите фрагмент ниже
final List<Runnable> tasks = ...; //or any other functional interface tasks.stream().parallel().forEach(Runnable::run) // Uses default pool //alternatively to specify parallelism new ForkJoinPool(15).submit( () -> tasks.stream().parallel().forEach(Runnable::run) ).get();
вы можете использовать этот код:
public class MyTask implements Runnable { private CountDownLatch countDownLatch; public MyTask(CountDownLatch countDownLatch { this.countDownLatch = countDownLatch; } @Override public void run() { try { //Do somethings // this.countDownLatch.countDown();//important } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } } } CountDownLatch countDownLatch = new CountDownLatch(NUMBER_OF_TASKS); ExecutorService taskExecutor = Executors.newFixedThreadPool(4); for (int i = 0; i < NUMBER_OF_TASKS; i++){ taskExecutor.execute(new MyTask(countDownLatch)); } countDownLatch.await(); System.out.println("Finish tasks");
Это мое решение, основанное на подсказке" AdamSkywalker", и оно работает
package frss.main; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class TestHilos { void procesar() { ExecutorService es = Executors.newFixedThreadPool(4); List<Runnable> tasks = getTasks(); CompletableFuture<?>[] futures = tasks.stream().map(task -> CompletableFuture.runAsync(task, es)).toArray(CompletableFuture[]::new); CompletableFuture.allOf(futures).join(); es.shutdown(); System.out.println("FIN DEL PROCESO DE HILOS"); } private List<Runnable> getTasks() { List<Runnable> tasks = new ArrayList<Runnable>(); Hilo01 task1 = new Hilo01(); tasks.add(task1); Hilo02 task2 = new Hilo02(); tasks.add(task2); return tasks; } private class Hilo01 extends Thread { @Override public void run() { System.out.println("HILO 1"); } } private class Hilo02 extends Thread { @Override public void run() { try { sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("HILO 2"); } } public static void main(String[] args) { TestHilos test = new TestHilos(); test.procesar(); } }
поэтому я публикую свой ответ из связанного вопроса здесь, Если кто-то хочет более простой способ сделать это
ExecutorService executor = Executors.newFixedThreadPool(10); CompletableFuture[] futures = new CompletableFuture[10]; int i = 0; while (...) { futures[i++] = CompletableFuture.runAsync(runner, executor); } CompletableFuture.allOf(futures).join(); // THis will wait until all future ready.
вот два варианта, просто немного запутать, какой из них лучше пойти.
Вариант 1:
ExecutorService es = Executors.newFixedThreadPool(4); List<Runnable> tasks = getTasks(); CompletableFuture<?>[] futures = tasks.stream() .map(task -> CompletableFuture.runAsync(task, es)) .toArray(CompletableFuture[]::new); CompletableFuture.allOf(futures).join(); es.shutdown();Вариант 2:
ExecutorService es = Executors.newFixedThreadPool(4); List< Future<?>> futures = new ArrayList<>(); for(Runnable task : taskList) { futures.add(es.submit(task)); } for(Future<?> future : futures) { try { future.get(); }catch(Exception e){ // do logging and nothing else } } es.shutdown();Здесь Ставим будущее.get(); в try catch это хорошая идея, верно?
Это может помочь
Log.i(LOG_TAG, "shutting down executor..."); executor.shutdown(); while (true) { try { Log.i(LOG_TAG, "Waiting for executor to terminate..."); if (executor.isTerminated()) break; if (executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { break; } } catch (InterruptedException ignored) {} }
можно назвать waitTillDone () на Бегун класс:
Runner runner = Runner.runner(4); // create pool with 4 threads in thread pool while(...) { runner.run(new MyTask()); // here you submit your task } runner.waitTillDone(); // and this blocks until all tasks are finished (or failed) runner.shutdown(); // once you done you can shutdown the runnerВы можете использовать этот класс и вызов waitTillDone () столько раз, сколько вы хотите перед вызовом shutdown (), плюс ваш код очень просто. И ты не знаю the количество задач авансом.
чтобы использовать его, просто добавьте этот gradle / maven
compile 'com.github.matejtymes:javafixes:1.1.1'зависимость от вашего проекта.больше подробности можно узнать здесь:
https://github.com/MatejTymes/JavaFixes
http://matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html
есть метод в executor
getActiveCount()- это дает количество активных потоков.после охвата потока, мы можем проверить, если
activeCount()значение0. Как только значение равно нулю, это означает, что в настоящее время нет активных потоков, что означает, что задача завершена:while (true) { if (executor.getActiveCount() == 0) { //ur own piece of code break; } }
Comments