Список к будущей последовательности
Я пытаюсь преобразовать List<CompletableFuture<X>> до CompletableFuture<List<T>>. Это очень полезно, Так как у вас есть много асинхронных задач, и вам нужно получить результаты всех из них.
если какой-либо из них терпит неудачу, то окончательное будущее терпит неудачу. Вот как я реализовал:
public static <T> CompletableFuture<List<T>> sequence2(List<CompletableFuture<T>> com, ExecutorService exec) {
if(com.isEmpty()){
throw new IllegalArgumentException();
}
Stream<? extends CompletableFuture<T>> stream = com.stream();
CompletableFuture<List<T>> init = CompletableFuture.completedFuture(new ArrayList<T>());
return stream.reduce(init, (ls, fut) -> ls.thenComposeAsync(x -> fut.thenApplyAsync(y -> {
x.add(y);
return x;
},exec),exec), (a, b) -> a.thenCombineAsync(b,(ls1,ls2)-> {
ls1.addAll(ls2);
return ls1;
},exec));
}
, чтобы запустить его:
ExecutorService executorService = Executors.newCachedThreadPool();
Stream<CompletableFuture<Integer>> que = IntStream.range(0,100000).boxed().map(x -> CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep((long) (Math.random() * 10));
} catch (InterruptedException e) {
e.printStackTrace();
}
return x;
}, executorService));
CompletableFuture<List<Integer>> sequence = sequence2(que.collect(Collectors.toList()), executorService);
если какой-либо из них терпит неудачу, то он терпит неудачу. Он дает результат, как ожидалось, даже если есть миллион фьючерсов. Проблема у меня есть: скажем, если есть более 5000 фьючерсов и если кто-то из них терпит неудачу, я получаю StackOverflowError:
исключение в потоке" pool-1-thread-2611 " java.ленг.StackOverflowError
на
Ява.утиль.параллельный.CompletableFuture.internalComplete (CompletableFuture.java:210)
на
Ява.утиль.параллельный.CompletableFuture$ThenCompose.run (CompletableFuture.java: 1487)
на
Ява.утиль.параллельный.CompletableFuture.postComplete (CompletableFuture.java: 193)
на
Ява.утиль.параллельный.CompletableFuture.internalComplete (CompletableFuture.java:210)
на
Ява.утиль.параллельный.CompletableFuture$ThenCompose.run (CompletableFuture.java: 1487)
что я делаю не так?
Примечание: вышеуказанное возвращенное будущее терпит неудачу прямо когда любое из будущего терпит неудачу. Принятый ответ также должен учитывать этот момент.
8 ответов:
использовать
CompletableFuture.allOf(...):static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) { return CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[com.size()])) .thenApply(v -> com.stream() .map(CompletableFuture::join) .collect(toList()) ); }несколько комментариев к вашей реализации:
использование
.thenComposeAsync,.thenApplyAsyncи.thenCombineAsyncскорее всего, не делает то, что вы ожидаете. Эти...Asyncметоды выполняют функцию, предоставленную им в отдельном потоке. Таким образом, в вашем случае вы вызываете добавление нового элемента в Список для запуска в предоставленном исполнителе. Нет необходимости набивать легкие операции в кэшированный исполнитель потока. Не используйтеthenXXXXAsyncметоды без веской причины.кроме того,
reduceне следует использовать для накопления в изменяемые контейнеры. Даже если он может работать правильно, когда поток является последовательным, он не будет работать, если поток должен быть параллельным. Для выполнения изменяемого сокращения используйте .если вы хотите завершить все вычисления исключительно сразу после первого сбоя, выполните следующие действия в своем
sequenceспособ:CompletableFuture<List<T>> result = CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[com.size()])) .thenApply(v -> com.stream() .map(CompletableFuture::join) .collect(toList()) ); com.forEach(f -> f.whenComplete((t, ex) -> { if (ex != null) { result.completeExceptionally(ex); } })); return result;если, кроме того, вы хотите отменить оставшиеся операции при первом сбое, добавить
exec.shutdownNow();сразу послеresult.completeExceptionally(ex);. Это, конечно, предполагает, чтоexecсуществуют только для этого одного вычисления. Если это не так, вам придется перебирать и отменять каждый оставшийсяFutureиндивидуально.
как Миша указал, вы злоупотреблять
…Asyncоперации. Кроме того, вы составляете сложную цепочку операций, моделирующих зависимость, которая не отражает логику вашей программы:
- вы создаете задание x, которое зависит от первого и второго задания вашего списка
- вы создаете задание x+1, которое зависит от задания x и третьего задания вашего списка
- вы создаете задание x+2, которое зависит от задания x+1 и 4-го задания вашего список
- ...
- вы создаете задание x + 5000, которое зависит от задания x + 4999 и последнего задания вашего списка
затем отмена (явно или из-за исключения) этого рекурсивно составленного задания может выполняться рекурсивно и может завершиться ошибкой с
StackOverflowError. Это зависит от реализации.как уже показано Мишей есть метод,
стоит отметить, что даже это не обязательно. Поскольку вы используете неограниченный исполнитель пула потоков, вы можете просто опубликовать асинхронное задание, собирающее результаты в список, и все готово. Ожидание завершения - это подразумевается запрашивая результат каждой работы в любом случае.allOfчто позволяет моделировать ваше первоначальное намерение, чтобы определить один задание, которое зависит от всех заданий вашего списка.ExecutorService executorService = Executors.newCachedThreadPool(); List<CompletableFuture<Integer>> que = IntStream.range(0, 100000) .mapToObj(x -> CompletableFuture.supplyAsync(() -> { LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos((long)(Math.random()*10))); return x; }, executorService)).collect(Collectors.toList()); CompletableFuture<List<Integer>> sequence = CompletableFuture.supplyAsync( () -> que.stream().map(CompletableFuture::join).collect(Collectors.toList()), executorService);использование методов для составления зависимых операций важно, когда количество потоков ограничено и задания могут порождать дополнительные асинхронные задания, чтобы избежать ожидания заданий, крадущих потоки из заданий, которые должны быть завершены в первую очередь, но и здесь не так.
в этом конкретном случае одно задание, просто повторяющее это большое количество предварительных заданий и ожидающее при необходимости, может быть более эффективным, чем моделирование этого большого количества зависимостей и наличие каждого задания для уведомления зависимого задания о завершении.
вы можете получить в Spotify
CompletableFuturesбиблиотеки и использоватьallAsListметод. Я думаю, что это вдохновлено гуавыFutures.allAsListметод.public static <T> CompletableFuture<List<T>> allAsList( List<? extends CompletionStage<? extends T>> stages) {
а вот простая реализация, если вы не хотите использовать библиотеку:
public <T> CompletableFuture<List<T>> allAsList(final List<CompletableFuture<T>> futures) { return CompletableFuture.allOf( futures.toArray(new CompletableFuture[futures.size()]) ).thenApply(ignored -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()) ); }
чтобы добавить к принятому ответу @Misha, его можно дополнительно расширить как коллектор:
public static <T> Collector<CompletableFuture<T>, ?, CompletableFuture<List<T>>> sequenceCollector() { return Collectors.collectingAndThen(Collectors.toList(), com -> sequence(com)); }теперь вы можете:
Stream<CompletableFuture<Integer>> stream = Stream.of( CompletableFuture.completedFuture(1), CompletableFuture.completedFuture(2), CompletableFuture.completedFuture(3) ); CompletableFuture<List<Integer>> ans = stream.collect(sequenceCollector());
в дополнение к библиотеке Spotify Futures вы можете попробовать мой код найти здесь: https://github.com/vsilaev/java-async-await/blob/master/net.tascalate.async.examples/src/main/java/net/tascalate/concurrent/CompletionStages.java (имеет зависимости от других классов в том же пакете)
он реализует логику, чтобы вернуть" по крайней мере N из M " CompletionStage-s с политикой, сколько ошибок разрешено терпеть. Есть удобные методы для всех случаев, плюс политика отмены для остальных фьючерсов, плюс код имеет дело с CompletionStage-s (интерфейс), а не CompletableFuture (конкретный класс).
пример операции последовательности с использованием thenCombine на CompletableFuture
public<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com){ CompletableFuture<List<T>> identity = CompletableFuture.completedFuture(new ArrayList<T>()); BiFunction<CompletableFuture<List<T>>,CompletableFuture<T>,CompletableFuture<List<T>>> combineToList = (acc,next) -> acc.thenCombine(next,(a,b) -> { a.add(b); return a;}); BinaryOperator<CompletableFuture<List<T>>> combineLists = (a,b)-> a.thenCombine(b,(l1,l2)-> { l1.addAll(l2); return l1;}) ; return com.stream() .reduce(identity, combineToList, combineLists); } }Если вы не против использовать сторонние библиотеки Циклоп-реагировать (Я автор) имеет набор полезных методов для CompletableFutures (и Optionals, потоки и т.д.)
List<CompletableFuture<String>> listOfFutures; CompletableFuture<ListX<String>> sequence =CompletableFutures.sequence(listOfFutures);
Javaslang очень удобно
FutureAPI. Это также позволяет сделать будущее коллекции из коллекции фьючерсов.List<Future<String>> listOfFutures = ... Future<Seq<String>> futureOfList = Future.sequence(listOfFutures);
отказ от ответственности: это не ответ на первоначальный вопрос. Ему будет не хватать части" провалить все, если один провалится". Однако я не могу ответить на фактический, более общий вопрос, потому что он был закрыт как дубликат этого: Java 8 CompletableFuture.все(...) с коллекцией или списком. Поэтому я отвечу здесь:
как преобразовать
List<CompletableFuture<V>>toCompletableFuture<List<V>>С помощью Java 8's stream API?резюме: Используйте следующее:
private <V> CompletableFuture<List<V>> sequence(List<CompletableFuture<V>> listOfFutures) { CompletableFuture<List<V>> identity = CompletableFuture.completedFuture(new ArrayList<>()); BiFunction<CompletableFuture<List<V>>, CompletableFuture<V>, CompletableFuture<List<V>>> accumulator = (futureList, futureValue) -> futureValue.thenCombine(futureList, (value, list) -> { List<V> newList = new ArrayList<>(list.size() + 1); newList.addAll(list); newList.add(value); return newList; }); BinaryOperator<CompletableFuture<List<V>>> combiner = (futureList1, futureList2) -> futureList1.thenCombine(futureList2, (list1, list2) -> { List<V> newList = new ArrayList<>(list1.size() + list2.size()); newList.addAll(list1); newList.addAll(list2); return newList; }); return listOfFutures.stream().reduce(identity, accumulator, combiner); }пример использования:
List<CompletableFuture<String>> listOfFutures = IntStream.range(0, numThreads) .mapToObj(i -> loadData(i, executor)).collect(toList()); CompletableFuture<List<String>> futureList = sequence(listOfFutures);Пример:
import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadLocalRandom; import java.util.function.BiFunction; import java.util.function.BinaryOperator; import java.util.stream.IntStream; import static java.util.stream.Collectors.toList; public class ListOfFuturesToFutureOfList { public static void main(String[] args) { ListOfFuturesToFutureOfList test = new ListOfFuturesToFutureOfList(); test.load(10); } public void load(int numThreads) { final ExecutorService executor = Executors.newFixedThreadPool(numThreads); List<CompletableFuture<String>> listOfFutures = IntStream.range(0, numThreads) .mapToObj(i -> loadData(i, executor)).collect(toList()); CompletableFuture<List<String>> futureList = sequence(listOfFutures); System.out.println("Future complete before blocking? " + futureList.isDone()); // this will block until all futures are completed List<String> data = futureList.join(); System.out.println("Loaded data: " + data); System.out.println("Future complete after blocking? " + futureList.isDone()); executor.shutdown(); } public CompletableFuture<String> loadData(int dataPoint, Executor executor) { return CompletableFuture.supplyAsync(() -> { ThreadLocalRandom rnd = ThreadLocalRandom.current(); System.out.println("Starting to load test data " + dataPoint); try { Thread.sleep(500 + rnd.nextInt(1500)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Successfully loaded test data " + dataPoint); return "data " + dataPoint; }, executor); } private <V> CompletableFuture<List<V>> sequence(List<CompletableFuture<V>> listOfFutures) { CompletableFuture<List<V>> identity = CompletableFuture.completedFuture(new ArrayList<>()); BiFunction<CompletableFuture<List<V>>, CompletableFuture<V>, CompletableFuture<List<V>>> accumulator = (futureList, futureValue) -> futureValue.thenCombine(futureList, (value, list) -> { List<V> newList = new ArrayList<>(list.size() + 1); newList.addAll(list); newList.add(value); return newList; }); BinaryOperator<CompletableFuture<List<V>>> combiner = (futureList1, futureList2) -> futureList1.thenCombine(futureList2, (list1, list2) -> { List<V> newList = new ArrayList<>(list1.size() + list2.size()); newList.addAll(list1); newList.addAll(list2); return newList; }); return listOfFutures.stream().reduce(identity, accumulator, combiner); } }
Comments