Преобразовать Java в будущем в класс CompletableFuture
Java 8 вводит CompletableFuture, новая реализация будущего, которая является составной (включает в себя кучу методов thenXxx). Я хотел бы использовать это исключительно, но многие из библиотек, которые я хочу использовать, возвращают только некомпозиционные Future экземпляров.
есть ли способ обернуть возвращенный Future экземпляры внутри a CompleteableFuture чтобы я мог его сочинить?
5 ответов:
есть способ, но вам это не понравится. Следующий метод преобразует
Future<T>наCompletableFuture<T>:public static <T> CompletableFuture<T> makeCompletableFuture(Future<T> future) { return CompletableFuture.supplyAsync(() -> { try { return future.get(); } catch (InterruptedException|ExecutionException e) { throw new RuntimeException(e); } }); }очевидно, проблема с этим подходом заключается в том, что для каждого будущее, поток будет заблокирован, чтобы дождаться результата будущее--противоречит идее будущего. В некоторых случаях это может быть возможно сделать лучше. Однако, в целом, нет решения без активного ожидания результата будущее.
если библиотека, которую вы хотите использовать, также предлагает метод стиля обратного вызова в дополнение к будущему стилю, вы можете предоставить ему обработчик, который завершает CompletableFuture без какой-либо дополнительной блокировки потока. Вот так:
AsynchronousFileChannel open = AsynchronousFileChannel.open(Paths.get("/some/file")); // ... CompletableFuture<ByteBuffer> completableFuture = new CompletableFuture<ByteBuffer>(); open.read(buffer, position, null, new CompletionHandler<Integer, Void>() { @Override public void completed(Integer result, Void attachment) { completableFuture.complete(buffer); } @Override public void failed(Throwable exc, Void attachment) { completableFuture.completeExceptionally(exc); } }); completableFuture.thenApply(...)без обратного вызова единственный другой способ, которым я вижу решение этой проблемы, - использовать цикл опроса, который помещает все ваши
Future.isDone()проверяет один поток, а затем вызывает complete всякий раз, когда будущее является gettable.
я опубликовал немного будущее проект, который пытается сделать лучше, чем самый простой способ в ответ.
основная идея заключается в использовании только одного потока (и, конечно, не только с циклом спина) для проверки всех состояний фьючерсов внутри, что помогает избежать блокировки потока из пула для каждого преобразования Future -> CompletableFuture.
пример использования:
Future oldFuture = ...; CompletableFuture profit = Futurity.shift(oldFuture);
позвольте мне предложить другой (надеюсь, лучший) вариант: https://github.com/vsilaev/java-async-await/tree/master/com.farata.lang.async.examples/src/main/java/com/farata/concurrent
вкратце, идея заключается в следующем:
- вводим
CompletableTask<V>интерфейс -- СоюзCompletionStage<V>+RunnableFuture<V>- деформация
ExecutorServiceвернутьсяCompletableTaskСsubmit(...)методы (вместоFuture<V>)- сделано, мы имеем runnable и составные фьючерсы.
реализация использует альтернативную реализацию CompletionStage (обратите внимание, CompletionStage а не CompletableFuture):
использование:
J8ExecutorService exec = J8Executors.newCachedThreadPool(); CompletionStage<String> = exec .submit( someCallableA ) .thenCombineAsync( exec.submit(someCallableB), (a, b) -> a + " " + b) .thenCombine( exec.submit(someCallableC), (ab, b) -> ab + " " + c);
предложение:
http://www.thedevpiece.com/converting-old-java-future-to-completablefuture/
но, в основном:
public class CompletablePromiseContext { private static final ScheduledExecutorService SERVICE = Executors.newSingleThreadScheduledExecutor(); public static void schedule(Runnable r) { SERVICE.schedule(r, 1, TimeUnit.MILLISECONDS); } }и, CompletablePromise:
public class CompletablePromise<V> extends CompletableFuture<V> { private Future<V> future; public CompletablePromise(Future<V> future) { this.future = future; CompletablePromiseContext.schedule(this::tryToComplete); } private void tryToComplete() { if (future.isDone()) { try { complete(future.get()); } catch (InterruptedException e) { completeExceptionally(e); } catch (ExecutionException e) { completeExceptionally(e.getCause()); } return; } if (future.isCancelled()) { cancel(true); return; } CompletablePromiseContext.schedule(this::tryToComplete); } }пример:
public class Main { public static void main(String[] args) { final ExecutorService service = Executors.newSingleThreadExecutor(); final Future<String> stringFuture = service.submit(() -> "success"); final CompletableFuture<String> completableFuture = new CompletablePromise<>(stringFuture); completableFuture.whenComplete((result, failure) -> { System.out.println(result); }); } }
Comments