Rx Java mergeDelayError не работает как ожидалось
Я использую RxJava в и Android-приложение с RxAndroid. Я использую mergeDelayError, чтобы объединить две сети ретро подходят звонков в одной наблюдаемой, которая будет обрабатывать выбрасываемых элементов, если дает один и об ошибках, если есть. Это не работает, и он только запускает действие onError, когда либо сталкивается с ошибкой. Теперь, чтобы проверить это, я перешел к очень простому примеру, и все же successAction никогда не вызывается, когда у меня есть вызов onError. Смотрите пример под.
Observable.mergeDelayError(
Observable.error(new RuntimeException()),
Observable.just("Hello")
)
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.finallyDo(completeAction)
.subscribe(successAction, errorAction);
Действие успеха будет вызвано только в том случае, если я использую две наблюдаемые успешности. Я что-то упустил с тем, как mergeDelayError должен работать?
Редактировать:
Я обнаружил, что если я удаляю observeOn и subscribeOn, все работает так, как ожидалось. Мне нужно указать потоки, и я подумал, что в этом весь смысл использования Rx. Есть идеи, почему указание этих Schedulers нарушит поведение?
3 ответов:
Используйте .observeOn (AndroidSchedulers.mainThread (), true) вместо .observeOn (AndroidSchedulers.mainThread()
Выше приведена сигнатура функции observeOn. Следующий код работает.public final Observable<T> observeOn(Scheduler scheduler, boolean delayError) { return observeOn(scheduler, delayError, RxRingBuffer.SIZE); }Observable.mergeDelayError( Observable.error(new RuntimeException()), Observable.just("Hello") ) .observeOn(AndroidSchedulers.mainThread(), true) .subscribeOn(Schedulers.io()) .subscribe(new Subscriber<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { } });Получил этот трюк от потока ConcatDelayError: https://github.com/ReactiveX/RxJava/issues/3908#issuecomment-217999009
Это все еще кажется ошибкой в операторе mergeDelayError, но я смог заставить его работать, дублируя observerOn и подписываясь на каждый наблюдаемый.
Observable.mergeDelayError( Observable.error(new RuntimeException()) .observeOn(AndroidSchedulers.mainThread()) .subscribeOn(Schedulers.io()), Observable.just("Hello") .observeOn(AndroidSchedulers.mainThread()) .subscribeOn(Schedulers.io()) ) .finallyDo(completeAction) .subscribe(successAction, errorAction);
Я думаю, что вы не ждете терминального события, и основной поток завершает работу, прежде чем события будут доставлены вашему наблюдателю. Следующий тест проходит для меня с RxJava 1.0.14:
@Test public void errorDelayed() { TestSubscriber<Object> ts = TestSubscriber.create(); Observable.mergeDelayError( Observable.error(new RuntimeException()), Observable.just("Hello") ) .subscribeOn(Schedulers.io()).subscribe(ts); ts.awaitTerminalEvent(); ts.assertError(RuntimeException.class); ts.assertValue("Hello"); }
Comments