Преобразовать Java в будущем в класс CompletableFuture



Java 8 вводит CompletableFuture, новая реализация будущего, которая является составной (включает в себя кучу методов thenXxx). Я хотел бы использовать это исключительно, но многие из библиотек, которые я хочу использовать, возвращают только некомпозиционные Future экземпляров.



есть ли способ обернуть возвращенный Future экземпляры внутри a CompleteableFuture чтобы я мог его сочинить?

637   5  

5 ответов:

есть способ, но вам это не понравится. Следующий метод преобразует Future<T> на CompletableFuture<T>:

public static <T> CompletableFuture<T> makeCompletableFuture(Future<T> future) {
    return CompletableFuture.supplyAsync(() -> {
        try {
            return future.get();
        } catch (InterruptedException|ExecutionException e) {
            throw new RuntimeException(e);
        }
    });
}

очевидно, проблема с этим подходом заключается в том, что для каждого будущее, поток будет заблокирован, чтобы дождаться результата будущее--противоречит идее будущего. В некоторых случаях это может быть возможно сделать лучше. Однако, в целом, нет решения без активного ожидания результата будущее.

если библиотека, которую вы хотите использовать, также предлагает метод стиля обратного вызова в дополнение к будущему стилю, вы можете предоставить ему обработчик, который завершает CompletableFuture без какой-либо дополнительной блокировки потока. Вот так:

    AsynchronousFileChannel open = AsynchronousFileChannel.open(Paths.get("/some/file"));
    // ... 
    CompletableFuture<ByteBuffer> completableFuture = new CompletableFuture<ByteBuffer>();
    open.read(buffer, position, null, new CompletionHandler<Integer, Void>() {
        @Override
        public void completed(Integer result, Void attachment) {
            completableFuture.complete(buffer);
        }

        @Override
        public void failed(Throwable exc, Void attachment) {
            completableFuture.completeExceptionally(exc);
        }
    });
    completableFuture.thenApply(...)

без обратного вызова единственный другой способ, которым я вижу решение этой проблемы, - использовать цикл опроса, который помещает все ваши Future.isDone() проверяет один поток, а затем вызывает complete всякий раз, когда будущее является gettable.

я опубликовал немного будущее проект, который пытается сделать лучше, чем самый простой способ в ответ.

основная идея заключается в использовании только одного потока (и, конечно, не только с циклом спина) для проверки всех состояний фьючерсов внутри, что помогает избежать блокировки потока из пула для каждого преобразования Future -> CompletableFuture.

пример использования:

Future oldFuture = ...;
CompletableFuture profit = Futurity.shift(oldFuture);

позвольте мне предложить другой (надеюсь, лучший) вариант: https://github.com/vsilaev/java-async-await/tree/master/com.farata.lang.async.examples/src/main/java/com/farata/concurrent

вкратце, идея заключается в следующем:

  1. вводим CompletableTask<V> интерфейс -- Союз CompletionStage<V> + RunnableFuture<V>
  2. деформация ExecutorService вернуться CompletableTask С submit(...) методы (вместо Future<V>)
  3. сделано, мы имеем runnable и составные фьючерсы.

реализация использует альтернативную реализацию CompletionStage (обратите внимание, CompletionStage а не CompletableFuture):

использование:

J8ExecutorService exec = J8Executors.newCachedThreadPool();
CompletionStage<String> = exec
   .submit( someCallableA )
   .thenCombineAsync( exec.submit(someCallableB), (a, b) -> a + " " + b)
   .thenCombine( exec.submit(someCallableC), (ab, b) -> ab + " " + c); 

предложение:

http://www.thedevpiece.com/converting-old-java-future-to-completablefuture/

но, в основном:

public class CompletablePromiseContext {
    private static final ScheduledExecutorService SERVICE = Executors.newSingleThreadScheduledExecutor();

    public static void schedule(Runnable r) {
        SERVICE.schedule(r, 1, TimeUnit.MILLISECONDS);
    }
}

и, CompletablePromise:

public class CompletablePromise<V> extends CompletableFuture<V> {
    private Future<V> future;

    public CompletablePromise(Future<V> future) {
        this.future = future;
        CompletablePromiseContext.schedule(this::tryToComplete);
    }

    private void tryToComplete() {
        if (future.isDone()) {
            try {
                complete(future.get());
            } catch (InterruptedException e) {
                completeExceptionally(e);
            } catch (ExecutionException e) {
                completeExceptionally(e.getCause());
            }
            return;
        }

        if (future.isCancelled()) {
            cancel(true);
            return;
        }

        CompletablePromiseContext.schedule(this::tryToComplete);
    }
}

пример:

public class Main {
    public static void main(String[] args) {
        final ExecutorService service = Executors.newSingleThreadExecutor();
        final Future<String> stringFuture = service.submit(() -> "success");
        final CompletableFuture<String> completableFuture = new CompletablePromise<>(stringFuture);

        completableFuture.whenComplete((result, failure) -> {
            System.out.println(result);
        });
    }
}

Comments

    Ничего не найдено.