Очередь как тема в RxJava
Я ищу предмет (или что-то подобное), который может:
- может принимать элементы и удерживать их в очереди или буфере, если нет подписчиков
- Как только у нас есть подписчик, все элементы потребляются и никогда не испускаются снова
- я могу подписаться/отписаться от темы
BehaviorSubject почти выполнила бы задание, но при этом сохранила бы последний наблюдаемый пункт.
Обновить
Основываясь на принятом ответе, я разработал аналогичное решение для одного наблюдаемый пункт. Также добавлена часть отписки, чтобы избежать утечек памяти.
class LastEventObservable private constructor(
private val onSubscribe: OnSubscribe<Any>,
private val state: State
) : Observable<Any>(onSubscribe) {
fun emit(value: Any) {
if (state.subscriber.hasObservers()) {
state.subscriber.onNext(value)
} else {
state.lastItem = value
}
}
companion object {
fun create(): LastEventObservable {
val state = State()
val onSubscribe = OnSubscribe<Any> { subscriber ->
just(state.lastItem)
.filter { it != null }
.doOnNext { subscriber.onNext(it) }
.doOnCompleted { state.lastItem = null }
.subscribe()
val subscription = state.subscriber.subscribe(subscriber)
subscriber.add(Subscriptions.create { subscription.unsubscribe() })
}
return LastEventObservable(onSubscribe, state)
}
}
private class State {
var lastItem: Any? = null
val subscriber = PublishSubject.create<Any>()
}
}
3 ответов:
Я достигаю ожидаемого результата, создавая настраиваемый Observable, который обертывает тему публикации и обрабатывает кэш эмиссии, если нет подключенных подписчиков. Проверить его.
public class ExampleUnitTest { @Test public void testSample() throws Exception { MyCustomObservable myCustomObservable = new MyCustomObservable(); myCustomObservable.emit("1"); myCustomObservable.emit("2"); myCustomObservable.emit("3"); Subscription subscription = myCustomObservable.subscribe(System.out::println); myCustomObservable.emit("4"); myCustomObservable.emit("5"); subscription.unsubscribe(); myCustomObservable.emit("6"); myCustomObservable.emit("7"); myCustomObservable.emit("8"); myCustomObservable.subscribe(System.out::println); } } class MyCustomObservable extends Observable<String> { private static PublishSubject<String> publishSubject = PublishSubject.create(); private static List<String> valuesCache = new ArrayList<>(); protected MyCustomObservable() { super(subscriber -> { Observable.from(valuesCache) .doOnNext(subscriber::onNext) .doOnCompleted(valuesCache::clear) .subscribe(); publishSubject.subscribe(subscriber); }); } public void emit(String value) { if (publishSubject.hasObservers()) { publishSubject.onNext(value); } else { valuesCache.add(value); } } }Надеюсь, что это поможет!
С Наилучшими Пожеланиями.
Если вы хотите дождаться только одного подписчика, используйте
UnicastSubject, но обратите внимание, что если вы откажетесь от подписки в середине, все последующие элементы очереди будут потеряны.Правка:
Как только у нас естьабонент , все элементы потребляются и никогда не испускаются снова
Для нескольких абонентов используйте
ReplaySubject.
У меня была аналогичная проблема, мои требования были:
- должен поддерживать воспроизведение значений, когда ни один наблюдатель не подписан
- должно позволять только одному наблюдателю подписываться одновременно
- должен позволить другому наблюдателю подписаться, когда первый наблюдатель будет удален
Я реализовал его как RxRelay , но реализация для субъекта будет аналогичной:
public final class CacheRelay<T> extends Relay<T> { private final ConcurrentLinkedQueue<T> queue = new ConcurrentLinkedQueue<>(); private final PublishRelay<T> relay = PublishRelay.create(); private CacheRelay() { } public static <T> CacheRelay<T> create() { return new CacheRelay<>(); } @Override public void accept(T value) { if (relay.hasObservers()) { relay.accept(value); } else { queue.add(value); } } @Override public boolean hasObservers() { return relay.hasObservers(); } @Override protected void subscribeActual(Observer<? super T> observer) { if (hasObservers()) { EmptyDisposable.error(new IllegalStateException("Only a single observer at a time allowed."), observer); } else { for (T element; (element = queue.poll()) != null; ) { observer.onNext(element); } relay.subscribeActual(observer); } } }Посмотрите на эту суть Подробнее
Comments