Очередь как тема в RxJava



Я ищу предмет (или что-то подобное), который может:




  1. может принимать элементы и удерживать их в очереди или буфере, если нет подписчиков

  2. Как только у нас есть подписчик, все элементы потребляются и никогда не испускаются снова

  3. я могу подписаться/отписаться от темы


BehaviorSubject почти выполнила бы задание, но при этом сохранила бы последний наблюдаемый пункт.



Обновить



Основываясь на принятом ответе, я разработал аналогичное решение для одного наблюдаемый пункт. Также добавлена часть отписки, чтобы избежать утечек памяти.



class LastEventObservable private constructor(
private val onSubscribe: OnSubscribe<Any>,
private val state: State
) : Observable<Any>(onSubscribe) {

fun emit(value: Any) {
if (state.subscriber.hasObservers()) {
state.subscriber.onNext(value)
} else {
state.lastItem = value
}
}

companion object {
fun create(): LastEventObservable {
val state = State()

val onSubscribe = OnSubscribe<Any> { subscriber ->
just(state.lastItem)
.filter { it != null }
.doOnNext { subscriber.onNext(it) }
.doOnCompleted { state.lastItem = null }
.subscribe()

val subscription = state.subscriber.subscribe(subscriber)

subscriber.add(Subscriptions.create { subscription.unsubscribe() })
}

return LastEventObservable(onSubscribe, state)
}
}

private class State {
var lastItem: Any? = null
val subscriber = PublishSubject.create<Any>()
}
}
357   3  

3 ответов:

Я достигаю ожидаемого результата, создавая настраиваемый Observable, который обертывает тему публикации и обрабатывает кэш эмиссии, если нет подключенных подписчиков. Проверить его.

public class ExampleUnitTest {
    @Test
    public void testSample() throws Exception {
        MyCustomObservable myCustomObservable = new MyCustomObservable();

        myCustomObservable.emit("1");
        myCustomObservable.emit("2");
        myCustomObservable.emit("3");

        Subscription subscription = myCustomObservable.subscribe(System.out::println);

        myCustomObservable.emit("4");
        myCustomObservable.emit("5");

        subscription.unsubscribe();

        myCustomObservable.emit("6");
        myCustomObservable.emit("7");
        myCustomObservable.emit("8");

        myCustomObservable.subscribe(System.out::println);
    }
}

class MyCustomObservable extends Observable<String> {
    private static PublishSubject<String> publishSubject = PublishSubject.create();
    private static List<String> valuesCache = new ArrayList<>();

    protected MyCustomObservable() {
        super(subscriber -> {
            Observable.from(valuesCache)
                    .doOnNext(subscriber::onNext)
                    .doOnCompleted(valuesCache::clear)
                    .subscribe();

            publishSubject.subscribe(subscriber);
        });
    }

    public void emit(String value) {
        if (publishSubject.hasObservers()) {
            publishSubject.onNext(value);
        } else {
            valuesCache.add(value);
        }
    }
}

Надеюсь, что это поможет!

С Наилучшими Пожеланиями.

Если вы хотите дождаться только одного подписчика, используйте UnicastSubject, но обратите внимание, что если вы откажетесь от подписки в середине, все последующие элементы очереди будут потеряны.

Правка:

Как только у нас естьабонент , все элементы потребляются и никогда не испускаются снова

Для нескольких абонентов используйте ReplaySubject.

У меня была аналогичная проблема, мои требования были:

  • должен поддерживать воспроизведение значений, когда ни один наблюдатель не подписан
  • должно позволять только одному наблюдателю подписываться одновременно
  • должен позволить другому наблюдателю подписаться, когда первый наблюдатель будет удален

Я реализовал его как RxRelay , но реализация для субъекта будет аналогичной:

public final class CacheRelay<T> extends Relay<T> {

    private final ConcurrentLinkedQueue<T> queue = new ConcurrentLinkedQueue<>();
    private final PublishRelay<T> relay = PublishRelay.create();

    private CacheRelay() {
    }

    public static <T> CacheRelay<T> create() {
        return new CacheRelay<>();
    }

    @Override
    public void accept(T value) {
        if (relay.hasObservers()) {
            relay.accept(value);
        } else {
            queue.add(value);
        }
    }

    @Override
    public boolean hasObservers() {
        return relay.hasObservers();
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (hasObservers()) {
            EmptyDisposable.error(new IllegalStateException("Only a single observer at a time allowed."), observer);
        } else {
            for (T element; (element = queue.poll()) != null; ) {
                observer.onNext(element);
            }
            relay.subscribeActual(observer);
        }
    }
}

Посмотрите на эту суть Подробнее

Comments

    Ничего не найдено.