В ожидании списка будущего



у меня есть метод, который возвращает List фьючерсы



List<Future<O>> futures = getFutures();


теперь я хочу подождать, пока либо все фьючерсы не будут успешно обработаны, либо любая из задач, выход которых возвращается будущим, вызывает исключение. Даже если одна задача вызывает исключение, нет смысла ждать других фьючерсов.



простой подход к



wait() {

For(Future f : futures) {
try {
f.get();
} catch(Exception e) {
//TODO catch specific exception
// this future threw exception , means somone could not do its task
return;
}
}
}


но проблема здесь в том, что если, например, 4-е будущее выбрасывает исключение, то я буду подождите без необходимости, пока первые 3 фьючерса будут доступны.



как решить эту проблему? Отсчет защелки поможет в любом случае? Я не могу использовать Future isDone потому что Java doc говорит



boolean isDone()
Returns true if this task completed. Completion may be due to normal termination, an exception, or cancellation -- in all of these cases, this method will return true.
690   8  

8 ответов:

вы можете использовать CompletionService чтобы получить фьючерсы, как только они будут готовы, и если один из них выдает исключение, отмените обработку. Что-то вроде этого:

Executor executor = Executors.newFixedThreadPool(4);
CompletionService<SomeResult> completionService = 
       new ExecutorCompletionService<SomeResult>(executor);

//4 tasks
for(int i = 0; i < 4; i++) {
   completionService.submit(new Callable<SomeResult>() {
       public SomeResult call() {
           ...
           return result;
       }
   });
}

int received = 0;
boolean erros = false;

while(received < 4 && !errors) {
      Future<SomeResult> resultFuture = completionService.take(); //blocks if none available
      try {
         SomeResult result = resultFuture.get();
         received ++;
         ... // do something with the result
      }
      catch(Exception e) {
             //log
         errors = true;
      }
}

Я думаю, что вы можете дополнительно улучшить, чтобы отменить все еще выполняющиеся задачи, если один из них выдает ошибку.

EDIT: я нашел более полный пример здесь:http://blog.teamlazerbeez.com/2009/04/29/java-completionservice/

Если вы используете Java 8 тогда вы можете сделать это проще с CompletableFuture и CompletableFuture.все, который применяет обратный вызов только после того, как все поставляемые CompletableFutures сделаны.

// Waits for all futures to complete and returns a list of results.
// If a future completes exceptionally then the resulting future will too.

public static <T> CompletableFuture<List<T>> all(List<CompletableFuture<T>> futures) {
    CompletableFuture[] cfs = futures.toArray(new CompletableFuture[futures.size()]);

    return CompletableFuture.allOf(cfs)
            .thenApply(() -> futures.stream()
                                    .map(CompletableFuture::join)
                                    .collect(Collectors.toList())
            );
}

можно использовать ExecutorCompletionService. В документации даже есть пример для вашего точного использования:

предположим вместо этого, что вы хотели бы использовать первый ненулевой результат набора задач, игнорируя любые возникающие исключения и отменяя все другие задачи, когда первый готов:

void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException {
    CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
    int n = solvers.size();
    List<Future<Result>> futures = new ArrayList<Future<Result>>(n);
    Result result = null;
    try {
        for (Callable<Result> s : solvers)
            futures.add(ecs.submit(s));
        for (int i = 0; i < n; ++i) {
            try {
                Result r = ecs.take().get();
                if (r != null) {
                    result = r;
                    break;
                }
            } catch (ExecutionException ignore) {
            }
        }
    } finally {
        for (Future<Result> f : futures)
            f.cancel(true);
    }

    if (result != null)
        use(result);
}

здесь важно отметить, что ecs.take() получит первый завершено задач, не только первый представленный. Таким образом, вы должны получить их в порядок завершения исполнения (или исключение).

использовать CompletableFutureв Java 8

    // Kick of multiple, asynchronous lookups
    CompletableFuture<User> page1 = gitHubLookupService.findUser("Test1");
    CompletableFuture<User> page2 = gitHubLookupService.findUser("Test2");
    CompletableFuture<User> page3 = gitHubLookupService.findUser("Test3");

    // Wait until they are all done
    CompletableFuture.allOf(page1,page2,page3).join();

    logger.info("--> " + page1.get());

может быть, это поможет (ничто не будет заменено сырой нитью, да!) Я предлагаю запустить каждый Future парень с разделенным потоком (они идут параллельно), а затем, когда когда-либо одна из полученных ошибок, она просто сигнализирует менеджеру(Handler класс).

class Handler{
//...
private Thread thisThread;
private boolean failed=false;
private Thread[] trds;
public void waitFor(){
  thisThread=Thread.currentThread();
  List<Future<Object>> futures = getFutures();
  trds=new Thread[futures.size()];
  for (int i = 0; i < trds.length; i++) {
    RunTask rt=new RunTask(futures.get(i), this);
    trds[i]=new Thread(rt);
  }
  synchronized (this) {
    for(Thread tx:trds){
      tx.start();
    }  
  }
  for(Thread tx:trds){
    try {tx.join();
    } catch (InterruptedException e) {
      System.out.println("Job failed!");break;
    }
  }if(!failed){System.out.println("Job Done");}
}

private List<Future<Object>> getFutures() {
  return null;
}

public synchronized void cancelOther(){if(failed){return;}
  failed=true;
  for(Thread tx:trds){
    tx.stop();//Deprecated but works here like a boss
  }thisThread.interrupt();
}
//...
}
class RunTask implements Runnable{
private Future f;private Handler h;
public RunTask(Future f,Handler h){this.f=f;this.h=h;}
public void run(){
try{
f.get();//beware about state of working, the stop() method throws ThreadDeath Error at any thread state (unless it blocked by some operation)
}catch(Exception e){System.out.println("Error, stopping other guys...");h.cancelOther();}
catch(Throwable t){System.out.println("Oops, some other guy has stopped working...");}
}
}

Я должен сказать, что приведенный выше код будет ошибкой (не проверял), но я надеюсь, что смогу объяснить решение. пожалуйста, попробуйте.

если вы используете Java 8 и не хочу манипулировать CompletableFutures, Я написал инструмент для получения результатов для List<Future<T>> С помощью потоковой передачи. Ключ в том, что вам запрещено map(Future::get) как он бросает.

public final class Futures
{

    private Futures()
    {}

    public static <E> Collector<Future<E>, Collection<E>, List<E>> present()
    {
        return new FutureCollector<>();
    }

    private static class FutureCollector<T> implements Collector<Future<T>, Collection<T>, List<T>>
    {
        private final List<Throwable> exceptions = new LinkedList<>();

        @Override
        public Supplier<Collection<T>> supplier()
        {
            return LinkedList::new;
        }

        @Override
        public BiConsumer<Collection<T>, Future<T>> accumulator()
        {
            return (r, f) -> {
                try
                {
                    r.add(f.get());
                }
                catch (InterruptedException e)
                {}
                catch (ExecutionException e)
                {
                    exceptions.add(e.getCause());
                }
            };
        }

        @Override
        public BinaryOperator<Collection<T>> combiner()
        {
            return (l1, l2) -> {
                l1.addAll(l2);
                return l1;
            };
        }

        @Override
        public Function<Collection<T>, List<T>> finisher()
        {
            return l -> {

                List<T> ret = new ArrayList<>(l);
                if (!exceptions.isEmpty())
                    throw new AggregateException(exceptions, ret);

                return ret;
            };

        }

        @Override
        public Set<java.util.stream.Collector.Characteristics> characteristics()
        {
            return java.util.Collections.emptySet();
        }
    }

это нужно AggregateException это работает как C# ' s

public class AggregateException extends RuntimeException
{
    /**
     *
     */
    private static final long serialVersionUID = -4477649337710077094L;

    private final List<Throwable> causes;
    private List<?> successfulElements;

    public AggregateException(List<Throwable> causes, List<?> l)
    {
        this.causes = causes;
        successfulElements = l;
    }

    public AggregateException(List<Throwable> causes)
    {
        this.causes = causes;
    }

    @Override
    public synchronized Throwable getCause()
    {
        return this;
    }

    public List<Throwable> getCauses()
    {
        return causes;
    }

    public List<?> getSuccessfulElements()
    {
        return successfulElements;
    }

    public void setSuccessfulElements(List<?> successfulElements)
    {
        this.successfulElements = successfulElements;
    }

}

этот компонент действует точно так же, как C#'S задач.WaitAll. Я работаю над вариантом, который делает то же самое, что и CompletableFuture.allOf (эквивалентно Task.WhenAll)

почему я сделал это, что я использую весной ListenableFuture и не хочу портить CompletableFuture несмотря на то, что это более стандартный способ

 /**
     * execute suppliers as future tasks then wait / join for getting results
     * @param functors a supplier(s) to execute
     * @return a list of results
     */
    private List getResultsInFuture(Supplier<?>... functors) {
        CompletableFuture[] futures = stream(functors)
                .map(CompletableFuture::supplyAsync)
                .collect(Collectors.toList())
                .toArray(new CompletableFuture[functors.length]);
        CompletableFuture.allOf(futures).join();
        return stream(futures).map(a-> {
            try {
                return a.get();
            } catch (InterruptedException | ExecutionException e) {
                //logger.error("an error occurred during runtime execution a function",e);
                return null;
            }
        }).collect(Collectors.toList());
    };

CompletionService примет ваши Callables с собой .submit() метод, и вы можете получить вычисленные фьючерсы с помощью.метод take ().

одна вещь, которую вы не должны забывать, чтобы завершить ExecutorService, вызвав .метод shutdown (). Также вы можете вызвать этот метод только тогда, когда вы сохранили ссылку на службу исполнителя, поэтому убедитесь, что она сохранилась.

пример кода-для фиксированного количества рабочих элементов, над которыми необходимо работать параллель:

ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

CompletionService<YourCallableImplementor> completionService = 
new ExecutorCompletionService<YourCallableImplementor>(service);

ArrayList<Future<YourCallableImplementor>> futures = new ArrayList<Future<YourCallableImplementor>>();

for (String computeMe : elementsToCompute) {
    futures.add(completionService.submit(new YourCallableImplementor(computeMe)));
}
//now retrieve the futures after computation (auto wait for it)
int received = 0;

while(received < elementsToCompute.size()) {
 Future<YourCallableImplementor> resultFuture = completionService.take(); 
 YourCallableImplementor result = resultFuture.get();
 received ++;
}
//important: shutdown your ExecutorService
service.shutdown();

пример кода-для динамического количества рабочих элементов, которые будут работать параллельно:

public void runIt(){
    ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    CompletionService<CallableImplementor> completionService = new ExecutorCompletionService<CallableImplementor>(service);
    ArrayList<Future<CallableImplementor>> futures = new ArrayList<Future<CallableImplementor>>();

    //Initial workload is 8 threads
    for (int i = 0; i < 9; i++) {
        futures.add(completionService.submit(write.new CallableImplementor()));             
    }
    boolean finished = false;
    while (!finished) {
        try {
            Future<CallableImplementor> resultFuture;
            resultFuture = completionService.take();
            CallableImplementor result = resultFuture.get();
            finished = doSomethingWith(result.getResult());
            result.setResult(null);
            result = null;
            resultFuture = null;
            //After work package has been finished create new work package and add it to futures
            futures.add(completionService.submit(write.new CallableImplementor()));
        } catch (InterruptedException | ExecutionException e) {
            //handle interrupted and assert correct thread / work packet count              
        } 
    }

    //important: shutdown your ExecutorService
    service.shutdown();
}

public class CallableImplementor implements Callable{
    boolean result;

    @Override
    public CallableImplementor call() throws Exception {
        //business logic goes here
        return this;
    }

    public boolean getResult() {
        return result;
    }

    public void setResult(boolean result) {
        this.result = result;
    }
}

Comments

    Ничего не найдено.