Разница между потоками Java 8 и наблюдаемыми RxJava



являются ли потоки Java 8 похожими на наблюдаемые RxJava?



Java 8 определение потока:




классы в новом java.util.stream пакет предоставляет API потока для
поддержка операций функционального стиля над потоками элементов.


1040   7  

7 ответов:

TL; DR: все библиотеки обработки последовательности / потока предлагают очень похожий API для строительства трубопроводов. Различия заключаются в API для обработки многопоточности и композиции трубопроводов.

RxJava сильно отличается от Stream. Из всех вещей JDK, самый близкий к rx.Наблюдаемый, возможно, java.утиль.поток.Коллекционер Stream + CompletableFuture combo (который поставляется за счет работы с дополнительным слоем монады, т. е. для обработки преобразования между Stream<CompletableFuture<T>> и CompletableFuture<Stream<T>>).

существуют значительные различия между Observable и Stream:

  • потоки основаны на вытягивании, наблюдаемые основаны на выталкивании. Это может показаться слишком абстрактным, но это имеет значительные последствия, которые очень конкретны.
  • поток может быть использован только один раз, наблюдаемый может быть подписан на много раз
  • Stream#parallel() разбивает последовательность на разделы Observable#subscribeOn() и Observable#observeOn() не надо; это сложно эмулировать Stream#parallel() поведение с наблюдаемым, это когда-то было .parallel() метод но этот метод вызвал столько путаницы, что .parallel() поддержка была перемещена в отдельный репозиторий на github, RxJavaParallel. Более подробная информация находится в еще один ответ.
  • Stream#parallel() не позволяет указать пул потоков для использования, в отличие от большинства методов RxJava, принимающих необязательный планировщик. Так как все потоковые экземпляры в JVM используют один и тот же пул fork-join, добавляя .parallel() может случайно влияют поведение в другом модуле программы
  • потоки не хватает операций, связанных с временем, как Observable#interval(),Observable#window() и многие другие; это в основном потому, что потоки на основе pull
  • потоки предлагают ограниченный набор операций по сравнению с RxJava. Например, в потоках отсутствуют операции отсечки (takeWhile(),takeUntil()); обходной путь с помощью Stream#anyMatch() ограничено: это терминальная операция, поэтому вы не можете использовать ее более одного раза в потоке
  • по состоянию на JDK 8, нет никакой операции Stream#zip, которая иногда очень полезна
  • потоки трудно построить самостоятельно, наблюдаемые могут быть построены многими способамиEDIT: как отмечается в комментариях, есть способы построить поток. Однако, поскольку нет никакого нетерминального короткого замыкания, вы не можете, например, легко генерировать поток строк в файле (JDK предоставляет файлы#lines и BufferedReader#lines из коробки, хотя и другие подобные сценарии могут быть управляется путем построения потока из итератора).
  • Observable предлагает средство управления ресурсами (Observable#using()); вы можете обернуть поток ввода-вывода или мьютекс с ним и быть уверенным, что пользователь не забудет освободить ресурс - он будет удален автоматически при прекращении подписки; поток имеет onClose(Runnable) метод, но вы должны вызвать его вручную или через try-with-resources. Например, вы должны иметь в виду, что файлы#lines() должны быть заключенным в try-with-resources блок.
  • наблюдаемые синхронизируются полностью (я на самом деле не проверял, верно ли то же самое для потоков). Это избавляет вас от размышлений о том, являются ли основные операции потокобезопасными (ответ всегда "да", если нет ошибки), но накладные расходы, связанные с параллелизмом, будут там, независимо от того, нужен ли ваш код или нет.

Round-up: RxJava значительно отличается от потоков. Реальные альтернативы RxJava - это другие реализации ReactiveStreams, например, соответствующая часть Акка.

обновление. Есть трюк, чтобы использовать не по умолчанию fork-join pool для Stream#parallel см. пользовательский пул потоков в параллельном потоке Java 8

обновление. Все вышесказанное основано на опыте работы с RxJava 1.х. Теперь что RxJava 2.x здесь этот ответ может быть устаревшей.

Java 8 Stream и RxJava выглядят довольно похоже. Они имеют похожие операторы (фильтр, карта, flatMap...) но не построены для того же использования.

вы можете выполнять задачи asynchonus с помощью RxJava.

С Java 8 stream, вы будете пересекать элементы вашей коллекции.

вы можете сделать почти то же самое в RxJava (пересекать элементы коллекции), но, поскольку RxJava сосредоточен на параллельной задаче,..., он использует синхронизацию, защелку ... Так что та же задача использование RxJava может быть медленнее, чем с потоком Java 8.

RxJava можно сравнить с CompletableFuture, но это может быть в состоянии вычислить более одного значения.

существует несколько технических и концептуальных различий, например, потоки Java 8 являются одиночными, основанными на вытягивании, синхронными последовательностями значений, тогда как наблюдаемые RxJava являются повторно наблюдаемыми, адаптивно двухтактными, потенциально асинхронными последовательностями значений. RxJava ориентирован на Java 6+ и работает также на Android.

потоки Java 8 основаны на вытягивании. Вы перебираете поток Java 8, потребляющий каждый элемент. И это может быть бесконечный поток.

RXJava Observable на нажимаем по умолчанию. Вы подписываетесь на Observable и вы получите уведомление, когда следующий пункт прибывает (onNext), или когда поток будет завершен (onCompleted), или когда произошла ошибка (onError). Потому что с Observable появляется onNext,onCompleted,onError событий, вы можете сделать некоторые мощные функции, такие как объединение разные Observable s к новому (zip,merge,concat). Другие вещи, которые вы могли бы сделать, это кэширование, дросселирование ... И он использует более или менее один и тот же API на разных языках (RxJava, RX в C#, RxJS, ...)

по умолчанию RxJava является однопоточным. Если вы не начнете использовать планировщики, все будет происходить в одном потоке.

существующие ответы являются всеобъемлющими и правильными, но четкого примера для начинающих не хватает. Позвольте мне поставить некоторые конкретные термины, такие как" push/pull-based "и"re-observable". Примечание: Я ненавижу этот термин Observable (это поток ради Бога), поэтому будет просто ссылаться на потоки J8 vs RX.

рассмотрим список, если целые,

digits = [1,2,3,4,5]

поток J8-это утилита для изменения коллекции. Например, даже цифры могут быть извлечены как,

evens = digits.stream().filter(x -> x%2)

это в основном в Python карта, фильтр, уменьшить, очень хорошее (и давно назревшее) дополнение к Java. Но что, если цифры не были сохранены заранее? Можно ли еще фильтровать четные цифры?

представьте, что отдельный процесс потока выводит целые числа случайным образом (--- обозначает время)

digits = 12345---6------7--8--9-10--------11--12

в RX, evenможете реагировать к каждой новой цифре и применить фильтр в в режиме реального времени

even = -2-4-----6---------8----10------------12

нет необходимости хранить списки ввода и вывода. Если вы хочу выходной список, нет проблем, что это потоковое тоже. На самом деле,все это поток.

evens_stored = even.collect()  

вот почему такие термины, как" безгосударственный "и" функциональный", больше связаны с RX

RxJava также тесно связан с инициатива реактивных потоков и рассматривает его как простую реализацию API реактивных потоков (например, по сравнению с реализация потоков Akka). Основное различие заключается в том, что реактивные потоки предназначены для обработки обратного давления, но если вы посмотрите на страницу реактивных потоков, вы получите идею. Они довольно хорошо описывают свои цели, и потоки также тесно связаны с реактивный манифест.

потоки Java 8 в значительной степени являются реализацией неограниченной коллекции, очень похожей на Scala Stream или Clojure lazy seq.

потоки Java 8 позволяют эффективно обрабатывать действительно большие коллекции, используя при этом многоядерные архитектуры. Напротив, RxJava является однопоточным по умолчанию (без планировщиков). Поэтому RxJava не будет использовать преимущества многоядерных машин, если вы сами не закодируете эту логику.

Comments

    Ничего не найдено.