Сбор последовательных пар из потока



учитывая поток, такой как { 0, 1, 2, 3, 4 },



Как я могу наиболее элегантно преобразовать его в заданную форму:



{ new Pair(0, 1), new Pair(1, 2), new Pair(2, 3), new Pair(3, 4) }



(предполагая, конечно, что я определил пару классов)?



Edit: речь идет не только о ints или примитивных потоках. Ответ должен быть общим для потока любого типа.

552   18  

18 ответов:

мой StreamEx библиотека, которая расширяет стандартные потоки предоставляет pairMap способ для всех типов потоков. Для примитивных потоков он не изменяет тип потока, но может быть использован для выполнения некоторых вычислений. Наиболее распространенное использование для расчета различий:

int[] pairwiseDiffs = IntStreamEx.of(input).pairMap((a, b) -> (b-a)).toArray();

для потока объектов можно создать любой другой тип объекта. Моя библиотека не предоставляет никаких новых видимых пользователем структур данных, таких как Pair (это часть концепции библиотека). Однако если у вас есть свой Pair классе и вы хотите использовать его, вы можете сделать следующее:

Stream<Pair> pairs = IntStreamEx.of(input).boxed().pairMap(Pair::new);

или если у вас уже есть некоторые Stream:

Stream<Pair> pairs = StreamEx.of(stream).pairMap(Pair::new);

эта функция реализована с помощью настраиваемые последний. Он имеет довольно низкие накладные расходы и может хорошо распараллеливаться. Конечно, он работает с любым источником потока, а не только с произвольным доступом к списку/массиву, как и многие другие решения. Во многих тестах он работает очень хорошо. здесь a JMH benchmark где мы находим все входные значения, предшествующие большему значению, используя различные подходы (см. этой вопрос).

библиотека потоков Java 8 в основном ориентирована на разделение потоков на более мелкие фрагменты для параллельной обработки, поэтому этапы конвейера с отслеживанием состояния довольно ограничены, и такие действия, как получение индекса текущего элемента потока и доступ к соседним элементам потока, не поддерживаются.

типичный способ решения этих проблем, с некоторыми ограничениями, конечно, заключается в том, чтобы управлять потоком по индексам и полагаться на то, что значения обрабатываются в некоторых данных произвольного доступа структура, подобная ArrayList, из которого можно извлечь элементы. Если значения были в arrayList, можно было бы генерировать пары по запросу, делая что-то вроде этого:

    IntStream.range(1, arrayList.size())
             .mapToObj(i -> new Pair(arrayList.get(i-1), arrayList.get(i)))
             .forEach(System.out::println);

конечно ограничение заключается в том, что вход не может быть бесконечный поток. Однако этот конвейер может выполняться параллельно.

нет элегантного, это хакерское решение, но работает для бесконечных потоков

Stream<Pair> pairStream = Stream.iterate(0, (i) -> i + 1).map( // natural numbers
    new Function<Integer, Pair>() {
        Integer previous;

        @Override
        public Pair apply(Integer integer) {
            Pair pair = null;
            if (previous != null) pair = new Pair(previous, integer);
            previous = integer;
            return pair;
        }
    }).skip(1); // drop first null

Теперь вы можете ограничить свой поток до тех пор, как вы хотите

pairStream.limit(1_000_000).forEach(i -> System.out.println(i));

П. С. Я надеюсь, что есть лучшее решение, что-то вроде clojure (partition 2 1 stream)

я реализовал оболочку spliterator, которая принимает каждый n элементов T из оригинального spliterator и производит List<T>:

public class ConsecutiveSpliterator<T> implements Spliterator<List<T>> {

    private final Spliterator<T> wrappedSpliterator;

    private final int n;

    private final Deque<T> deque;

    private final Consumer<T> dequeConsumer;

    public ConsecutiveSpliterator(Spliterator<T> wrappedSpliterator, int n) {
        this.wrappedSpliterator = wrappedSpliterator;
        this.n = n;
        this.deque = new ArrayDeque<>();
        this.dequeConsumer = deque::addLast;
    }

    @Override
    public boolean tryAdvance(Consumer<? super List<T>> action) {
        deque.pollFirst();
        fillDeque();
        if (deque.size() == n) {
            List<T> list = new ArrayList<>(deque);
            action.accept(list);
            return true;
        } else {
            return false;
        }
    }

    private void fillDeque() {
        while (deque.size() < n && wrappedSpliterator.tryAdvance(dequeConsumer))
            ;
    }

    @Override
    public Spliterator<List<T>> trySplit() {
        return null;
    }

    @Override
    public long estimateSize() {
        return wrappedSpliterator.estimateSize();
    }

    @Override
    public int characteristics() {
        return wrappedSpliterator.characteristics();
    }
}

для создания последовательного потока можно использовать следующий метод:

public <E> Stream<List<E>> consecutiveStream(Stream<E> stream, int n) {
    Spliterator<E> spliterator = stream.spliterator();
    Spliterator<List<E>> wrapper = new ConsecutiveSpliterator<>(spliterator, n);
    return StreamSupport.stream(wrapper, false);
}

пример использования:

consecutiveStream(Stream.of(0, 1, 2, 3, 4, 5), 2)
    .map(list -> new Pair(list.get(0), list.get(1)))
    .forEach(System.out::println);

вы можете сделать это Циклоп-реагировать (Я вношу свой вклад в эту библиотеку), используя скользящий оператор.

  LazyFutureStream.of( 0, 1, 2, 3, 4 )
                  .sliding(2)
                  .map(Pair::new);

или

   ReactiveSeq.of( 0, 1, 2, 3, 4 )
                  .sliding(2)
                  .map(Pair::new);

предполагая, что конструктор пар может принимать коллекцию с 2 элементами.

если вы хотите сгруппировать по 4 и увеличить на 2, что также поддерживается.

     ReactiveSeq.rangeLong( 0L,Long.MAX_VALUE)
                .sliding(4,2)
                .forEach(System.out::println);

эквивалентные статические методы для создания скользящего вида над java.утиль.поток.Поток также обеспечен в циклопах-потоках StreamUtils класса.

       StreamUtils.sliding(Stream.of(1,2,3,4),2)
                  .map(Pair::new);

Примечание:-Для однопоточной работы ReactiveSeq будет более подходящим. LazyFutureStream расширяет ReactiveSeq, но в первую очередь ориентирован на параллельное / параллельное использование (это поток фьючерсов).

LazyFutureStream расширяет ReactiveSeq, который расширяет Seq от awesome jOOλ (который расширяет java.утиль.поток.Stream), поэтому представленные решения Lukas также будут работать с любым типом потока. Для тех, кто заинтересован основными различиями между операторами окна / скольжения являются очевидный относительный компромисс мощности / сложности и пригодность для использования с бесконечными потоками (скольжение не потребляет поток, а буферизует его по мере движения).

вы можете сделать это с потоком.метод reduce () (я не видел никаких других ответов, использующих эту технику).

public static <T> List<Pair<T, T>> consecutive(List<T> list) {
    List<Pair<T, T>> pairs = new LinkedList<>();
    list.stream().reduce((a, b) -> {
        pairs.add(new Pair<>(a, b));
        return b;
    });
    return pairs;
}

The библиотека протонного пакета обеспечивает оконную функциональность. Учитывая парный класс и поток, вы можете сделать это следующим образом:

Stream<Integer> st = Stream.iterate(0 , x -> x + 1);
Stream<Pair<Integer, Integer>> pairs = StreamUtils.windowed(st, 2, 1)
                                                  .map(l -> new Pair<>(l.get(0), l.get(1)))
                                                  .moreStreamOps(...);

Теперь pairs Stream содержит:

(0, 1)
(1, 2)
(2, 3)
(3, 4)
(4, ...) and so on

Поиск последовательных парах

если вы хотите использовать стороннюю библиотеку и не нуждаетесь в параллелизме, то jOOλ предлагает функции окна в стиле SQL следующим образом

System.out.println(
Seq.of(0, 1, 2, 3, 4)
   .window()
   .filter(w -> w.lead().isPresent())
   .map(w -> tuple(w.value(), w.lead().get())) // alternatively, use your new Pair() class
   .toList()
);

урожайность

[(0, 1), (1, 2), (2, 3), (3, 4)]

The lead() функция обращается к следующему значению в порядке обхода из окна.

Поиск последовательных тройки / четверки / N-кортежей

вопрос в комментариях просил более общего решение, где должны быть собраны не пары, а n-кортежи (или, возможно, списки). Вот таким образом, альтернативный подход:

int n = 3;

System.out.println(
Seq.of(0, 1, 2, 3, 4)
   .window(0, n - 1)
   .filter(w -> w.count() == n)
   .map(w -> w.window().toList())
   .toList()
);

получение списка списков

[[0, 1, 2], [1, 2, 3], [2, 3, 4]]

без filter(w -> w.count() == n) результат будет

[[0, 1, 2], [1, 2, 3], [2, 3, 4], [3, 4], [4]]

отказ от ответственности: я работаю в компании за jOOλ

можно использовать RxJava (очень мощный реактивные расширения библиотека)

IntStream intStream  = IntStream.iterate(1, n -> n + 1);

Observable<List<Integer>> pairObservable = Observable.from(intStream::iterator).buffer(2,1);

pairObservable.take(10).forEach(b -> {
            b.forEach(n -> System.out.println(n));
            System.out.println();
        });

на буфероператор превращает заметить, что испускает элементы в наблюдаемый, который испускает буферизованные коллекции те пункты..

в вашем случае я бы написал свою пользовательскую функцию IntFunction, которая отслеживает последний переданный int и использует ее для отображения исходного IntStream.

import java.util.function.IntFunction;
import java.util.stream.IntStream;

public class PairFunction implements IntFunction<PairFunction.Pair> {

  public static class Pair {

    private final int first;
    private final int second;

    public Pair(int first, int second) {
      this.first = first;
      this.second = second;
    }

    @Override
    public String toString() {
      return "[" + first + "|" + second + "]";
    }
  }

  private int last;
  private boolean first = true;

  @Override
  public Pair apply(int value) {
    Pair pair = !first ? new Pair(last, value) : null;
    last = value;
    first = false;
    return pair;
  }

  public static void main(String[] args) {

    IntStream intStream = IntStream.of(0, 1, 2, 3, 4);
    final PairFunction pairFunction = new PairFunction();
    intStream.mapToObj(pairFunction)
        .filter(p -> p != null) // filter out the null
        .forEach(System.out::println); // display each Pair

  }

}

операция по существу является статусной, поэтому не совсем то, что потоки предназначены для решения-см. раздел "поведение без состояния" в документация:

лучший подход заключается в том, чтобы полностью избегать поведенческих параметров с сохранением состояния для потоковых операций

одним из решений здесь является введение состояния в ваш поток через внешний счетчик, хотя он будет работать только с последовательным потоком.

public static void main(String[] args) {
    Stream<String> strings = Stream.of("a", "b", "c", "c");
    AtomicReference<String> previous = new AtomicReference<>();
    List<Pair> collect = strings.map(n -> {
                            String p = previous.getAndSet(n);
                            return p == null ? null : new Pair(p, n);
                        })
                        .filter(p -> p != null)
                        .collect(toList());
    System.out.println(collect);
}


static class Pair<T> {
    private T left, right;
    Pair(T left, T right) { this.left = left; this.right = right; }
    @Override public String toString() { return "{" + left + "," + right + '}'; }
}

для вычисления последовательных разностей во времени (x-значениях) временного ряда я использую stream ' s collect(...) способ:

final List< Long > intervals = timeSeries.data().stream()
                    .map( TimeSeries.Datum::x )
                    .collect( DifferenceCollector::new, DifferenceCollector::accept, DifferenceCollector::combine )
                    .intervals();

где DifferenceCollector что-то вроде этого:

public class DifferenceCollector implements LongConsumer
{
    private final List< Long > intervals = new ArrayList<>();
    private Long lastTime;

    @Override
    public void accept( final long time )
    {
        if( Objects.isNull( lastTime ) )
        {
            lastTime = time;
        }
        else
        {
            intervals.add( time - lastTime );
            lastTime = time;
        }
    }

    public void combine( final DifferenceCollector other )
    {
        intervals.addAll( other.intervals );
        lastTime = other.lastTime;
    }

    public List< Long > intervals()
    {
        return intervals;
    }
}

вы могли бы, вероятно, изменить это в соответствии с вашими потребностями.

элегантным решением было бы использовать zip. Что-то вроде:

List<Integer> input = Arrays.asList(0, 1, 2, 3, 4);
Stream<Pair> pairStream = Streams.zip(input.stream(),
                                      input.stream().substream(1),
                                      (a, b) -> new Pair(a, b)
);

Это довольно лаконично и элегантно, однако он использует список в качестве входных данных. Бесконечный источник потока не может быть обработан таким образом.

еще одна (гораздо более хлопотная) проблема заключается в том, что zip вместе со всем классом Streams недавно был удален из API. Приведенный выше код работает только с b95 или более старые выпуски. Так что с последним JDK я бы сказал, что нет элегантное решение в стиле FP, и прямо сейчас мы можем просто надеяться, что каким-то образом zip будет вновь введен в API.

Это интересная проблема. Это мой гибрид попытка ниже любого хорошего?

public static void main(String[] args) {
    List<Integer> list = Arrays.asList(1, 2, 3);
    Iterator<Integer> first = list.iterator();
    first.next();
    if (first.hasNext())
        list.stream()
        .skip(1)
        .map(v -> new Pair(first.next(), v))
        .forEach(System.out::println);
}

Я считаю, что он не поддается параллельной обработке и, следовательно, может быть дисквалифицирован.

как заметили другие, из-за природы проблемы требуется некоторая статичность.

я столкнулся с аналогичной проблемой, в которой я хотел, чтобы то, что было по существу функцией Oracle SQL LEAD. Моя попытка реализовать это ниже.

/**
 * Stream that pairs each element in the stream with the next subsequent element.
 * The final pair will have only the first item, the second will be null.
 */
<T> Spliterator<Pair<T>> lead(final Stream<T> stream)
{
    final Iterator<T> input = stream.sequential().iterator();

    final Iterable<Pair<T>> iterable = () ->
    {
        return new Iterator<Pair<T>>()
        {
            Optional<T> current = getOptionalNext(input);

            @Override
            public boolean hasNext()
            {
                return current.isPresent();
            }

            @Override
            public Pair<T> next()
            {
                Optional<T> next = getOptionalNext(input);
                final Pair<T> pair = next.isPresent()
                    ? new Pair(current.get(), next.get())
                    : new Pair(current.get(), null);
                current = next;

                return pair;
            }
        };
    };

    return iterable.spliterator();
}

private <T> Optional<T> getOptionalNext(final Iterator<T> iterator)
{
    return iterator.hasNext()
        ? Optional.of(iterator.next())
        : Optional.empty();
}

вы можете достичь этого, используя ограниченную очередь для хранения элементов, которые протекают через поток (что основано на идее, которую я подробно описал здесь:можно ли получить следующий элемент в потоке?)

ниже пример сначала определяет экземпляр класса BoundedQueue, который будет хранить элементы, проходящие через поток (если вам не нравится идея расширения LinkedList, обратитесь к ссылке, упомянутой выше для альтернативного и более общего подхода). Позже вы просто объединяете два последующих элемента в экземпляр Pair:

public class TwoSubsequentElems {
  public static void main(String[] args) {
    List<Integer> input = new ArrayList<Integer>(asList(0, 1, 2, 3, 4));

    class BoundedQueue<T> extends LinkedList<T> {
      public BoundedQueue<T> save(T curElem) {
        if (size() == 2) { // we need to know only two subsequent elements
          pollLast(); // remove last to keep only requested number of elements
        }

        offerFirst(curElem);

        return this;
      }

      public T getPrevious() {
        return (size() < 2) ? null : getLast();
      }

      public T getCurrent() {
        return (size() == 0) ? null : getFirst();
      }
    }

    BoundedQueue<Integer> streamHistory = new BoundedQueue<Integer>();

    final List<Pair<Integer>> answer = input.stream()
      .map(i -> streamHistory.save(i))
      .filter(e -> e.getPrevious() != null)
      .map(e -> new Pair<Integer>(e.getPrevious(), e.getCurrent()))
      .collect(Collectors.toList());

    answer.forEach(System.out::println);
  }
}

Я согласен с @aepurniet но вместо карты вы должны использовать mapToObj

range(0, 100).mapToObj((i) -> new Pair(i, i+1)).forEach(System.out::println);

запустить for цикл, который выполняется от 0 до length-1 из вашего потока

for(int i = 0 ; i < stream.length-1 ; i++)
{
    Pair pair = new Pair(stream[i], stream[i+1]);
    // then add your pair to an array
}

Comments

    Ничего не найдено.