Список к будущей последовательности



Я пытаюсь преобразовать List<CompletableFuture<X>> до CompletableFuture<List<T>>. Это очень полезно, Так как у вас есть много асинхронных задач, и вам нужно получить результаты всех из них.



если какой-либо из них терпит неудачу, то окончательное будущее терпит неудачу. Вот как я реализовал:



  public static <T> CompletableFuture<List<T>> sequence2(List<CompletableFuture<T>> com, ExecutorService exec) {
if(com.isEmpty()){
throw new IllegalArgumentException();
}
Stream<? extends CompletableFuture<T>> stream = com.stream();
CompletableFuture<List<T>> init = CompletableFuture.completedFuture(new ArrayList<T>());
return stream.reduce(init, (ls, fut) -> ls.thenComposeAsync(x -> fut.thenApplyAsync(y -> {
x.add(y);
return x;
},exec),exec), (a, b) -> a.thenCombineAsync(b,(ls1,ls2)-> {
ls1.addAll(ls2);
return ls1;
},exec));
}


, чтобы запустить его:



ExecutorService executorService = Executors.newCachedThreadPool();
Stream<CompletableFuture<Integer>> que = IntStream.range(0,100000).boxed().map(x -> CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep((long) (Math.random() * 10));
} catch (InterruptedException e) {
e.printStackTrace();
}
return x;
}, executorService));
CompletableFuture<List<Integer>> sequence = sequence2(que.collect(Collectors.toList()), executorService);


если какой-либо из них терпит неудачу, то он терпит неудачу. Он дает результат, как ожидалось, даже если есть миллион фьючерсов. Проблема у меня есть: скажем, если есть более 5000 фьючерсов и если кто-то из них терпит неудачу, я получаю StackOverflowError:




исключение в потоке" pool-1-thread-2611 " java.ленг.StackOverflowError
на
Ява.утиль.параллельный.CompletableFuture.internalComplete (CompletableFuture.java:210)
на
Ява.утиль.параллельный.CompletableFuture$ThenCompose.run (CompletableFuture.java: 1487)
на
Ява.утиль.параллельный.CompletableFuture.postComplete (CompletableFuture.java: 193)
на
Ява.утиль.параллельный.CompletableFuture.internalComplete (CompletableFuture.java:210)
на
Ява.утиль.параллельный.CompletableFuture$ThenCompose.run (CompletableFuture.java: 1487)




что я делаю не так?



Примечание: вышеуказанное возвращенное будущее терпит неудачу прямо когда любое из будущего терпит неудачу. Принятый ответ также должен учитывать этот момент.

666   8  

8 ответов:

использовать CompletableFuture.allOf(...):

static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) {
    return CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[com.size()]))
            .thenApply(v -> com.stream()
                .map(CompletableFuture::join)
                .collect(toList())
            );
}

несколько комментариев к вашей реализации:

использование .thenComposeAsync,.thenApplyAsync и .thenCombineAsync скорее всего, не делает то, что вы ожидаете. Эти ...Async методы выполняют функцию, предоставленную им в отдельном потоке. Таким образом, в вашем случае вы вызываете добавление нового элемента в Список для запуска в предоставленном исполнителе. Нет необходимости набивать легкие операции в кэшированный исполнитель потока. Не используйте thenXXXXAsync методы без веской причины.

кроме того, reduce не следует использовать для накопления в изменяемые контейнеры. Даже если он может работать правильно, когда поток является последовательным, он не будет работать, если поток должен быть параллельным. Для выполнения изменяемого сокращения используйте .

если вы хотите завершить все вычисления исключительно сразу после первого сбоя, выполните следующие действия в своем sequence способ:

CompletableFuture<List<T>> result = CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[com.size()]))
        .thenApply(v -> com.stream()
                .map(CompletableFuture::join)
                .collect(toList())
        );

com.forEach(f -> f.whenComplete((t, ex) -> {
    if (ex != null) {
        result.completeExceptionally(ex);
    }
}));

return result;

если, кроме того, вы хотите отменить оставшиеся операции при первом сбое, добавить exec.shutdownNow(); сразу после result.completeExceptionally(ex);. Это, конечно, предполагает, что exec существуют только для этого одного вычисления. Если это не так, вам придется перебирать и отменять каждый оставшийся Future индивидуально.

как Миша указал, вы злоупотреблять …Async операции. Кроме того, вы составляете сложную цепочку операций, моделирующих зависимость, которая не отражает логику вашей программы:

  • вы создаете задание x, которое зависит от первого и второго задания вашего списка
  • вы создаете задание x+1, которое зависит от задания x и третьего задания вашего списка
  • вы создаете задание x+2, которое зависит от задания x+1 и 4-го задания вашего список
  • ...
  • вы создаете задание x + 5000, которое зависит от задания x + 4999 и последнего задания вашего списка

затем отмена (явно или из-за исключения) этого рекурсивно составленного задания может выполняться рекурсивно и может завершиться ошибкой с StackOverflowError. Это зависит от реализации.

как уже показано Мишей есть метод, allOf что позволяет моделировать ваше первоначальное намерение, чтобы определить один задание, которое зависит от всех заданий вашего списка.

стоит отметить, что даже это не обязательно. Поскольку вы используете неограниченный исполнитель пула потоков, вы можете просто опубликовать асинхронное задание, собирающее результаты в список, и все готово. Ожидание завершения - это подразумевается запрашивая результат каждой работы в любом случае.
ExecutorService executorService = Executors.newCachedThreadPool();
List<CompletableFuture<Integer>> que = IntStream.range(0, 100000)
  .mapToObj(x -> CompletableFuture.supplyAsync(() -> {
    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos((long)(Math.random()*10)));
    return x;
}, executorService)).collect(Collectors.toList());
CompletableFuture<List<Integer>> sequence = CompletableFuture.supplyAsync(
    () -> que.stream().map(CompletableFuture::join).collect(Collectors.toList()),
    executorService);

использование методов для составления зависимых операций важно, когда количество потоков ограничено и задания могут порождать дополнительные асинхронные задания, чтобы избежать ожидания заданий, крадущих потоки из заданий, которые должны быть завершены в первую очередь, но и здесь не так.

в этом конкретном случае одно задание, просто повторяющее это большое количество предварительных заданий и ожидающее при необходимости, может быть более эффективным, чем моделирование этого большого количества зависимостей и наличие каждого задания для уведомления зависимого задания о завершении.

вы можете получить в Spotify CompletableFutures библиотеки и использовать allAsList метод. Я думаю, что это вдохновлено гуавы Futures.allAsList метод.

public static <T> CompletableFuture<List<T>> allAsList(
    List<? extends CompletionStage<? extends T>> stages) {

а вот простая реализация, если вы не хотите использовать библиотеку:

public <T> CompletableFuture<List<T>> allAsList(final List<CompletableFuture<T>> futures) {
    return CompletableFuture.allOf(
        futures.toArray(new CompletableFuture[futures.size()])
    ).thenApply(ignored ->
        futures.stream().map(CompletableFuture::join).collect(Collectors.toList())
    );
}

чтобы добавить к принятому ответу @Misha, его можно дополнительно расширить как коллектор:

 public static <T> Collector<CompletableFuture<T>, ?, CompletableFuture<List<T>>> sequenceCollector() {
    return Collectors.collectingAndThen(Collectors.toList(), com -> sequence(com));
}

теперь вы можете:

Stream<CompletableFuture<Integer>> stream = Stream.of(
    CompletableFuture.completedFuture(1),
    CompletableFuture.completedFuture(2),
    CompletableFuture.completedFuture(3)
);
CompletableFuture<List<Integer>> ans = stream.collect(sequenceCollector());

в дополнение к библиотеке Spotify Futures вы можете попробовать мой код найти здесь: https://github.com/vsilaev/java-async-await/blob/master/net.tascalate.async.examples/src/main/java/net/tascalate/concurrent/CompletionStages.java (имеет зависимости от других классов в том же пакете)

он реализует логику, чтобы вернуть" по крайней мере N из M " CompletionStage-s с политикой, сколько ошибок разрешено терпеть. Есть удобные методы для всех случаев, плюс политика отмены для остальных фьючерсов, плюс код имеет дело с CompletionStage-s (интерфейс), а не CompletableFuture (конкретный класс).

пример операции последовательности с использованием thenCombine на CompletableFuture

public<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com){

    CompletableFuture<List<T>> identity = CompletableFuture.completedFuture(new ArrayList<T>());

    BiFunction<CompletableFuture<List<T>>,CompletableFuture<T>,CompletableFuture<List<T>>> combineToList = 
            (acc,next) -> acc.thenCombine(next,(a,b) -> { a.add(b); return a;});

    BinaryOperator<CompletableFuture<List<T>>> combineLists = (a,b)-> a.thenCombine(b,(l1,l2)-> { l1.addAll(l2); return l1;}) ;  

    return com.stream()
              .reduce(identity,
                      combineToList,
                      combineLists);  

   }
} 

Если вы не против использовать сторонние библиотеки Циклоп-реагировать (Я автор) имеет набор полезных методов для CompletableFutures (и Optionals, потоки и т.д.)

  List<CompletableFuture<String>> listOfFutures;

  CompletableFuture<ListX<String>> sequence =CompletableFutures.sequence(listOfFutures);

Javaslang очень удобно Future API. Это также позволяет сделать будущее коллекции из коллекции фьючерсов.

List<Future<String>> listOfFutures = ... 
Future<Seq<String>> futureOfList = Future.sequence(listOfFutures);

см http://static.javadoc.io/io.javaslang/javaslang/2.0.5/javaslang/concurrent/Future.html#sequence-java.lang.Iterable-

отказ от ответственности: это не ответ на первоначальный вопрос. Ему будет не хватать части" провалить все, если один провалится". Однако я не могу ответить на фактический, более общий вопрос, потому что он был закрыт как дубликат этого: Java 8 CompletableFuture.все(...) с коллекцией или списком. Поэтому я отвечу здесь:

как преобразовать List<CompletableFuture<V>> to CompletableFuture<List<V>> С помощью Java 8's stream API?

резюме: Используйте следующее:

private <V> CompletableFuture<List<V>> sequence(List<CompletableFuture<V>> listOfFutures) {
    CompletableFuture<List<V>> identity = CompletableFuture.completedFuture(new ArrayList<>());

    BiFunction<CompletableFuture<List<V>>, CompletableFuture<V>, CompletableFuture<List<V>>> accumulator = (futureList, futureValue) ->
        futureValue.thenCombine(futureList, (value, list) -> {
                List<V> newList = new ArrayList<>(list.size() + 1);
                newList.addAll(list);
                newList.add(value);
                return newList;
            });

    BinaryOperator<CompletableFuture<List<V>>> combiner = (futureList1, futureList2) -> futureList1.thenCombine(futureList2, (list1, list2) -> {
        List<V> newList = new ArrayList<>(list1.size() + list2.size());
        newList.addAll(list1);
        newList.addAll(list2);
        return newList;
    });

    return listOfFutures.stream().reduce(identity, accumulator, combiner);
}

пример использования:

List<CompletableFuture<String>> listOfFutures = IntStream.range(0, numThreads)
    .mapToObj(i -> loadData(i, executor)).collect(toList());

CompletableFuture<List<String>> futureList = sequence(listOfFutures);

Пример:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.stream.IntStream;

import static java.util.stream.Collectors.toList;

public class ListOfFuturesToFutureOfList {

    public static void main(String[] args) {
        ListOfFuturesToFutureOfList test = new ListOfFuturesToFutureOfList();
        test.load(10);
    }

    public void load(int numThreads) {
        final ExecutorService executor = Executors.newFixedThreadPool(numThreads);

        List<CompletableFuture<String>> listOfFutures = IntStream.range(0, numThreads)
            .mapToObj(i -> loadData(i, executor)).collect(toList());

        CompletableFuture<List<String>> futureList = sequence(listOfFutures);

        System.out.println("Future complete before blocking? " + futureList.isDone());

        // this will block until all futures are completed
        List<String> data = futureList.join();
        System.out.println("Loaded data: " + data);

        System.out.println("Future complete after blocking? " + futureList.isDone());

        executor.shutdown();
    }

    public CompletableFuture<String> loadData(int dataPoint, Executor executor) {
        return CompletableFuture.supplyAsync(() -> {
            ThreadLocalRandom rnd = ThreadLocalRandom.current();

            System.out.println("Starting to load test data " + dataPoint);

            try {
                Thread.sleep(500 + rnd.nextInt(1500));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println("Successfully loaded test data " + dataPoint);

            return "data " + dataPoint;
        }, executor);
    }

    private <V> CompletableFuture<List<V>> sequence(List<CompletableFuture<V>> listOfFutures) {
        CompletableFuture<List<V>> identity = CompletableFuture.completedFuture(new ArrayList<>());

        BiFunction<CompletableFuture<List<V>>, CompletableFuture<V>, CompletableFuture<List<V>>> accumulator = (futureList, futureValue) ->
            futureValue.thenCombine(futureList, (value, list) -> {
                    List<V> newList = new ArrayList<>(list.size() + 1);
                    newList.addAll(list);
                    newList.add(value);
                    return newList;
                });

        BinaryOperator<CompletableFuture<List<V>>> combiner = (futureList1, futureList2) -> futureList1.thenCombine(futureList2, (list1, list2) -> {
            List<V> newList = new ArrayList<>(list1.size() + list2.size());
            newList.addAll(list1);
            newList.addAll(list2);
            return newList;
        });

        return listOfFutures.stream().reduce(identity, accumulator, combiner);
    }

}

Comments

    Ничего не найдено.