В ожидании списка будущего
у меня есть метод, который возвращает List фьючерсы
List<Future<O>> futures = getFutures();
теперь я хочу подождать, пока либо все фьючерсы не будут успешно обработаны, либо любая из задач, выход которых возвращается будущим, вызывает исключение. Даже если одна задача вызывает исключение, нет смысла ждать других фьючерсов.
простой подход к
wait() {
For(Future f : futures) {
try {
f.get();
} catch(Exception e) {
//TODO catch specific exception
// this future threw exception , means somone could not do its task
return;
}
}
}
но проблема здесь в том, что если, например, 4-е будущее выбрасывает исключение, то я буду подождите без необходимости, пока первые 3 фьючерса будут доступны.
как решить эту проблему? Отсчет защелки поможет в любом случае? Я не могу использовать Future isDone потому что Java doc говорит
boolean isDone()
Returns true if this task completed. Completion may be due to normal termination, an exception, or cancellation -- in all of these cases, this method will return true.
8 ответов:
вы можете использовать CompletionService чтобы получить фьючерсы, как только они будут готовы, и если один из них выдает исключение, отмените обработку. Что-то вроде этого:
Executor executor = Executors.newFixedThreadPool(4); CompletionService<SomeResult> completionService = new ExecutorCompletionService<SomeResult>(executor); //4 tasks for(int i = 0; i < 4; i++) { completionService.submit(new Callable<SomeResult>() { public SomeResult call() { ... return result; } }); } int received = 0; boolean erros = false; while(received < 4 && !errors) { Future<SomeResult> resultFuture = completionService.take(); //blocks if none available try { SomeResult result = resultFuture.get(); received ++; ... // do something with the result } catch(Exception e) { //log errors = true; } }Я думаю, что вы можете дополнительно улучшить, чтобы отменить все еще выполняющиеся задачи, если один из них выдает ошибку.
EDIT: я нашел более полный пример здесь:http://blog.teamlazerbeez.com/2009/04/29/java-completionservice/
Если вы используете Java 8 тогда вы можете сделать это проще с CompletableFuture и CompletableFuture.все, который применяет обратный вызов только после того, как все поставляемые CompletableFutures сделаны.
// Waits for all futures to complete and returns a list of results. // If a future completes exceptionally then the resulting future will too. public static <T> CompletableFuture<List<T>> all(List<CompletableFuture<T>> futures) { CompletableFuture[] cfs = futures.toArray(new CompletableFuture[futures.size()]); return CompletableFuture.allOf(cfs) .thenApply(() -> futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()) ); }
можно использовать ExecutorCompletionService. В документации даже есть пример для вашего точного использования:
предположим вместо этого, что вы хотели бы использовать первый ненулевой результат набора задач, игнорируя любые возникающие исключения и отменяя все другие задачи, когда первый готов:
void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException { CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e); int n = solvers.size(); List<Future<Result>> futures = new ArrayList<Future<Result>>(n); Result result = null; try { for (Callable<Result> s : solvers) futures.add(ecs.submit(s)); for (int i = 0; i < n; ++i) { try { Result r = ecs.take().get(); if (r != null) { result = r; break; } } catch (ExecutionException ignore) { } } } finally { for (Future<Result> f : futures) f.cancel(true); } if (result != null) use(result); }здесь важно отметить, что ecs.take() получит первый завершено задач, не только первый представленный. Таким образом, вы должны получить их в порядок завершения исполнения (или исключение).
использовать
CompletableFutureв Java 8// Kick of multiple, asynchronous lookups CompletableFuture<User> page1 = gitHubLookupService.findUser("Test1"); CompletableFuture<User> page2 = gitHubLookupService.findUser("Test2"); CompletableFuture<User> page3 = gitHubLookupService.findUser("Test3"); // Wait until they are all done CompletableFuture.allOf(page1,page2,page3).join(); logger.info("--> " + page1.get());
может быть, это поможет (ничто не будет заменено сырой нитью, да!) Я предлагаю запустить каждый
Futureпарень с разделенным потоком (они идут параллельно), а затем, когда когда-либо одна из полученных ошибок, она просто сигнализирует менеджеру(Handlerкласс).class Handler{ //... private Thread thisThread; private boolean failed=false; private Thread[] trds; public void waitFor(){ thisThread=Thread.currentThread(); List<Future<Object>> futures = getFutures(); trds=new Thread[futures.size()]; for (int i = 0; i < trds.length; i++) { RunTask rt=new RunTask(futures.get(i), this); trds[i]=new Thread(rt); } synchronized (this) { for(Thread tx:trds){ tx.start(); } } for(Thread tx:trds){ try {tx.join(); } catch (InterruptedException e) { System.out.println("Job failed!");break; } }if(!failed){System.out.println("Job Done");} } private List<Future<Object>> getFutures() { return null; } public synchronized void cancelOther(){if(failed){return;} failed=true; for(Thread tx:trds){ tx.stop();//Deprecated but works here like a boss }thisThread.interrupt(); } //... } class RunTask implements Runnable{ private Future f;private Handler h; public RunTask(Future f,Handler h){this.f=f;this.h=h;} public void run(){ try{ f.get();//beware about state of working, the stop() method throws ThreadDeath Error at any thread state (unless it blocked by some operation) }catch(Exception e){System.out.println("Error, stopping other guys...");h.cancelOther();} catch(Throwable t){System.out.println("Oops, some other guy has stopped working...");} } }Я должен сказать, что приведенный выше код будет ошибкой (не проверял), но я надеюсь, что смогу объяснить решение. пожалуйста, попробуйте.
если вы используете Java 8 и не хочу манипулировать
CompletableFutures, Я написал инструмент для получения результатов дляList<Future<T>>С помощью потоковой передачи. Ключ в том, что вам запрещеноmap(Future::get)как он бросает.public final class Futures { private Futures() {} public static <E> Collector<Future<E>, Collection<E>, List<E>> present() { return new FutureCollector<>(); } private static class FutureCollector<T> implements Collector<Future<T>, Collection<T>, List<T>> { private final List<Throwable> exceptions = new LinkedList<>(); @Override public Supplier<Collection<T>> supplier() { return LinkedList::new; } @Override public BiConsumer<Collection<T>, Future<T>> accumulator() { return (r, f) -> { try { r.add(f.get()); } catch (InterruptedException e) {} catch (ExecutionException e) { exceptions.add(e.getCause()); } }; } @Override public BinaryOperator<Collection<T>> combiner() { return (l1, l2) -> { l1.addAll(l2); return l1; }; } @Override public Function<Collection<T>, List<T>> finisher() { return l -> { List<T> ret = new ArrayList<>(l); if (!exceptions.isEmpty()) throw new AggregateException(exceptions, ret); return ret; }; } @Override public Set<java.util.stream.Collector.Characteristics> characteristics() { return java.util.Collections.emptySet(); } }это нужно
AggregateExceptionэто работает как C# ' spublic class AggregateException extends RuntimeException { /** * */ private static final long serialVersionUID = -4477649337710077094L; private final List<Throwable> causes; private List<?> successfulElements; public AggregateException(List<Throwable> causes, List<?> l) { this.causes = causes; successfulElements = l; } public AggregateException(List<Throwable> causes) { this.causes = causes; } @Override public synchronized Throwable getCause() { return this; } public List<Throwable> getCauses() { return causes; } public List<?> getSuccessfulElements() { return successfulElements; } public void setSuccessfulElements(List<?> successfulElements) { this.successfulElements = successfulElements; } }этот компонент действует точно так же, как C#'S задач.WaitAll. Я работаю над вариантом, который делает то же самое, что и
CompletableFuture.allOf(эквивалентноTask.WhenAll)почему я сделал это, что я использую весной
ListenableFutureи не хочу портитьCompletableFutureнесмотря на то, что это более стандартный способ
/** * execute suppliers as future tasks then wait / join for getting results * @param functors a supplier(s) to execute * @return a list of results */ private List getResultsInFuture(Supplier<?>... functors) { CompletableFuture[] futures = stream(functors) .map(CompletableFuture::supplyAsync) .collect(Collectors.toList()) .toArray(new CompletableFuture[functors.length]); CompletableFuture.allOf(futures).join(); return stream(futures).map(a-> { try { return a.get(); } catch (InterruptedException | ExecutionException e) { //logger.error("an error occurred during runtime execution a function",e); return null; } }).collect(Collectors.toList()); };
CompletionService примет ваши Callables с собой .submit() метод, и вы можете получить вычисленные фьючерсы с помощью.метод take ().
одна вещь, которую вы не должны забывать, чтобы завершить ExecutorService, вызвав .метод shutdown (). Также вы можете вызвать этот метод только тогда, когда вы сохранили ссылку на службу исполнителя, поэтому убедитесь, что она сохранилась.
пример кода-для фиксированного количества рабочих элементов, над которыми необходимо работать параллель:
ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); CompletionService<YourCallableImplementor> completionService = new ExecutorCompletionService<YourCallableImplementor>(service); ArrayList<Future<YourCallableImplementor>> futures = new ArrayList<Future<YourCallableImplementor>>(); for (String computeMe : elementsToCompute) { futures.add(completionService.submit(new YourCallableImplementor(computeMe))); } //now retrieve the futures after computation (auto wait for it) int received = 0; while(received < elementsToCompute.size()) { Future<YourCallableImplementor> resultFuture = completionService.take(); YourCallableImplementor result = resultFuture.get(); received ++; } //important: shutdown your ExecutorService service.shutdown();пример кода-для динамического количества рабочих элементов, которые будут работать параллельно:
public void runIt(){ ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); CompletionService<CallableImplementor> completionService = new ExecutorCompletionService<CallableImplementor>(service); ArrayList<Future<CallableImplementor>> futures = new ArrayList<Future<CallableImplementor>>(); //Initial workload is 8 threads for (int i = 0; i < 9; i++) { futures.add(completionService.submit(write.new CallableImplementor())); } boolean finished = false; while (!finished) { try { Future<CallableImplementor> resultFuture; resultFuture = completionService.take(); CallableImplementor result = resultFuture.get(); finished = doSomethingWith(result.getResult()); result.setResult(null); result = null; resultFuture = null; //After work package has been finished create new work package and add it to futures futures.add(completionService.submit(write.new CallableImplementor())); } catch (InterruptedException | ExecutionException e) { //handle interrupted and assert correct thread / work packet count } } //important: shutdown your ExecutorService service.shutdown(); } public class CallableImplementor implements Callable{ boolean result; @Override public CallableImplementor call() throws Exception { //business logic goes here return this; } public boolean getResult() { return result; } public void setResult(boolean result) { this.result = result; } }
Comments