RxJava Выборка Наблюдаемых Параллельно
Мне нужна помощь в реализации параллельных асинхронных вызовов в RxJava. Я выбрал простой случай использования, в котором первый вызов извлекает (а не ищет) список продуктов (плитка), которые будут отображаться. Последующие вызовы выходят и получают (а) обзоры и (б) изображения продукта
после нескольких попыток я добрался до этого места.
1 Observable<Tile> searchTile = searchServiceClient.getSearchResults(searchTerm);
2 List<Tile> allTiles = new ArrayList<Tile>();
3 ClientResponse response = new ClientResponse();
4 searchTile.parallel(oTile -> {
5 return oTile.flatMap(t -> {
6 Observable<Reviews> reviews = reviewsServiceClient.getSellerReviews(t.getSellerId());
7 Observable<String> imageUrl = reviewsServiceClient.getProductImage(t.getProductId());
8 return Observable.zip(reviews, imageUrl, (r, u) -> {
9 t.setReviews(r);
10 t.setImageUrl(u);
11 return t;
12 });
13 });
14 }).subscribe(e -> {
15 allTiles.add((Tile) e);
16 });
строка 1: выходит и выбирает продукт (плитку) для отображения
строка 4: Берем список наблюдаемых и осколок его, чтобы получить отзывы и imageUrls
ложь 6,7: принести заметный комментарий и наблюдаемые адрес
строка 8: Наконец, 2 наблюдаемые молнии, чтобы вернуть обновленный Observable
строка 15: наконец строка 15 сопоставляет все отдельные продукты, которые будут отображаться в коллекции, которая может быть возвращена обратно в вызывающий слой
в то время как наблюдаемое было разделено и в наших тестах выполняется более 4 различных потоков; выборка обзоров и образы, кажется, идут один за другим. Я подозреваю, что шаг zip на линии 8 в основном вызывает последовательный вызов 2 наблюдаемых (обзоры и url).

есть ли у этой группы какие-либо предложения для параллельной выборки reiews и URL-адресов изображений. По сути, диаграмма водопада, прикрепленная выше, должна выглядеть более вертикально уложенной. Звонки на отзывы и изображения должны быть параллельно
спасибо
Ананд Раман
2 ответов:
параллельный оператор оказался проблемой почти для всех случаев использования и не делает то, что большинство ожидает от него, поэтому он был удален в 1.0.0.дистанционное управление.4 релиз:https://github.com/ReactiveX/RxJava/pull/1716
хороший пример того, как сделать этот тип поведения и получить параллельное выполнение можно увидеть здесь.
в вашем примере непонятно, если
searchServiceClientсинхронным или асинхронным. Это влияет на то, как решить проблему немного, как будто это уже асинхронно, никакого дополнительного планирования не требуется. Если требуется Синхронное дополнительное планирование.во-первых, вот несколько простых примеров, показывающих синхронное и асинхронное поведение:
import rx.Observable; import rx.Subscriber; import rx.schedulers.Schedulers; public class ParallelExecution { public static void main(String[] args) { System.out.println("------------ mergingAsync"); mergingAsync(); System.out.println("------------ mergingSync"); mergingSync(); System.out.println("------------ mergingSyncMadeAsync"); mergingSyncMadeAsync(); System.out.println("------------ flatMapExampleSync"); flatMapExampleSync(); System.out.println("------------ flatMapExampleAsync"); flatMapExampleAsync(); System.out.println("------------"); } private static void mergingAsync() { Observable.merge(getDataAsync(1), getDataAsync(2)).toBlocking().forEach(System.out::println); } private static void mergingSync() { // here you'll see the delay as each is executed synchronously Observable.merge(getDataSync(1), getDataSync(2)).toBlocking().forEach(System.out::println); } private static void mergingSyncMadeAsync() { // if you have something synchronous and want to make it async, you can schedule it like this // so here we see both executed concurrently Observable.merge(getDataSync(1).subscribeOn(Schedulers.io()), getDataSync(2).subscribeOn(Schedulers.io())).toBlocking().forEach(System.out::println); } private static void flatMapExampleAsync() { Observable.range(0, 5).flatMap(i -> { return getDataAsync(i); }).toBlocking().forEach(System.out::println); } private static void flatMapExampleSync() { Observable.range(0, 5).flatMap(i -> { return getDataSync(i); }).toBlocking().forEach(System.out::println); } // artificial representations of IO work static Observable<Integer> getDataAsync(int i) { return getDataSync(i).subscribeOn(Schedulers.io()); } static Observable<Integer> getDataSync(int i) { return Observable.create((Subscriber<? super Integer> s) -> { // simulate latency try { Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } s.onNext(i); s.onCompleted(); }); } }Ниже приведена попытка предоставить пример, который более точно соответствует вашему коду:
import java.util.List; import rx.Observable; import rx.Subscriber; import rx.schedulers.Schedulers; public class ParallelExecutionExample { public static void main(String[] args) { final long startTime = System.currentTimeMillis(); Observable<Tile> searchTile = getSearchResults("search term") .doOnSubscribe(() -> logTime("Search started ", startTime)) .doOnCompleted(() -> logTime("Search completed ", startTime)); Observable<TileResponse> populatedTiles = searchTile.flatMap(t -> { Observable<Reviews> reviews = getSellerReviews(t.getSellerId()) .doOnCompleted(() -> logTime("getSellerReviews[" + t.id + "] completed ", startTime)); Observable<String> imageUrl = getProductImage(t.getProductId()) .doOnCompleted(() -> logTime("getProductImage[" + t.id + "] completed ", startTime)); return Observable.zip(reviews, imageUrl, (r, u) -> { return new TileResponse(t, r, u); }).doOnCompleted(() -> logTime("zip[" + t.id + "] completed ", startTime)); }); List<TileResponse> allTiles = populatedTiles.toList() .doOnCompleted(() -> logTime("All Tiles Completed ", startTime)) .toBlocking().single(); } private static Observable<Tile> getSearchResults(String string) { return mockClient(new Tile(1), new Tile(2), new Tile(3)); } private static Observable<Reviews> getSellerReviews(int id) { return mockClient(new Reviews()); } private static Observable<String> getProductImage(int id) { return mockClient("image_" + id); } private static void logTime(String message, long startTime) { System.out.println(message + " => " + (System.currentTimeMillis() - startTime) + "ms"); } private static <T> Observable<T> mockClient(T... ts) { return Observable.create((Subscriber<? super T> s) -> { // simulate latency try { Thread.sleep(1000); } catch (Exception e) { } for (T t : ts) { s.onNext(t); } s.onCompleted(); }).subscribeOn(Schedulers.io()); // note the use of subscribeOn to make an otherwise synchronous Observable async } public static class TileResponse { public TileResponse(Tile t, Reviews r, String u) { // store the values } } public static class Tile { private final int id; public Tile(int i) { this.id = i; } public int getSellerId() { return id; } public int getProductId() { return id; } } public static class Reviews { } }вот результаты:
Search started => 65ms Search completed => 1094ms getProductImage[1] completed => 2095ms getSellerReviews[2] completed => 2095ms getProductImage[3] completed => 2095ms zip[1] completed => 2096ms zip[2] completed => 2096ms getProductImage[2] completed => 2096ms getSellerReviews[1] completed => 2096ms zip[3] completed => 2096ms All Tiles Completed => 2097ms getSellerReviews[3] completed => 2097msЯ сделал каждый вызов IO моделироваться, чтобы взять 1000 мс, так что очевидно, где задержка и что это происходит параллельно. Он распечатывает прогресс, который происходит за истекшие миллисекунды.
трюк здесь заключается в том, что flatMap объединяет асинхронные вызовы, поэтому до тех пор, пока объединяемые наблюдаемые являются асинхронными, все они будут выполняться одновременно.
если как
getProductImage(t.getProductId())был синхронным, его можно сделать асинхронным следующим образом: getProductImage (t. getProductId ()). subscribeOn(Schedulers.io).вот важная часть приведенного выше примера без всех протоколирования и шаблонные типы:
Observable<Tile> searchTile = getSearchResults("search term");; Observable<TileResponse> populatedTiles = searchTile.flatMap(t -> { Observable<Reviews> reviews = getSellerReviews(t.getSellerId()); Observable<String> imageUrl = getProductImage(t.getProductId()); return Observable.zip(reviews, imageUrl, (r, u) -> { return new TileResponse(t, r, u); }); }); List<TileResponse> allTiles = populatedTiles.toList() .toBlocking().single();Я надеюсь, что это помогает.
люди, которые все еще @ JDK 7, чья IDE еще не автоматически обнаруживает источник JDK 8, и что попробовать вышеупомянутый блестящий ответ (и объяснение) от @benjchristensen, могут использовать этот бесстыдно преломленный код JDK 7. Слава @benjchristensen за удивительное объяснение и пример !
import java.util.List; import rx.Observable; import rx.Subscriber; import rx.functions.Action0; import rx.functions.Func1; import rx.functions.Func2; import rx.schedulers.Schedulers; public class ParallelExecutionExample { public static void main(String[] args) { final long startTime = System.currentTimeMillis(); Observable<Tile> searchTile = getSearchResults("search term") .doOnSubscribe(new Action0() { @Override public void call() { logTime("Search started ", startTime); } }) .doOnCompleted(new Action0() { @Override public void call() { logTime("Search completed ", startTime); } }); Observable<TileResponse> populatedTiles = searchTile.flatMap(new Func1<Tile, Observable<TileResponse>>() { @Override public Observable<TileResponse> call(final Tile t) { Observable<Reviews> reviews = getSellerReviews(t.getSellerId()) .doOnCompleted(new Action0() { @Override public void call() { logTime("getSellerReviews[" + t.id + "] completed ", startTime); } }); Observable<String> imageUrl = getProductImage(t.getProductId()) .doOnCompleted(new Action0() { @Override public void call() { logTime("getProductImage[" + t.id + "] completed ", startTime); } }); return Observable.zip(reviews, imageUrl, new Func2<Reviews, String, TileResponse>() { @Override public TileResponse call(Reviews r, String u) { return new TileResponse(t, r, u); } }) .doOnCompleted(new Action0() { @Override public void call() { logTime("zip[" + t.id + "] completed ", startTime); } }); } }); List<TileResponse> allTiles = populatedTiles .toList() .doOnCompleted(new Action0() { @Override public void call() { logTime("All Tiles Completed ", startTime); } }) .toBlocking() .single(); } private static Observable<Tile> getSearchResults(String string) { return mockClient(new Tile(1), new Tile(2), new Tile(3)); } private static Observable<Reviews> getSellerReviews(int id) { return mockClient(new Reviews()); } private static Observable<String> getProductImage(int id) { return mockClient("image_" + id); } private static void logTime(String message, long startTime) { System.out.println(message + " => " + (System.currentTimeMillis() - startTime) + "ms"); } private static <T> Observable<T> mockClient(final T... ts) { return Observable.create(new Observable.OnSubscribe<T>() { @Override public void call(Subscriber<? super T> s) { try { Thread.sleep(1000); } catch (Exception e) { } for (T t : ts) { s.onNext(t); } s.onCompleted(); } }) .subscribeOn(Schedulers.io()); // note the use of subscribeOn to make an otherwise synchronous Observable async } public static class TileResponse { public TileResponse(Tile t, Reviews r, String u) { // store the values } } public static class Tile { private final int id; public Tile(int i) { this.id = i; } public int getSellerId() { return id; } public int getProductId() { return id; } } public static class Reviews { } }
Comments