Наиболее эффективный способ получить последний элемент потока



поток не имеет last() способ:



Stream<T> stream;
T last = stream.last(); // No such method


какой самый элегантный и/или эффективный способ получить последний элемент (или null для пустой поток)?

602   7  

7 ответов:

сделать сокращение, которое просто возвращает текущее значение:

Stream<T> stream;
T last = stream.reduce((a, b) -> b).orElse(null);

это сильно зависит от характера Stream. Имейте в виду, что "простой" не значит "эффективный". Если вы подозреваете, что поток очень большой, несущий тяжелые операции или имеющий источник, который заранее знает размер, следующее Может быть существенно более эффективным, чем простое решение:

static <T> T getLast(Stream<T> stream) {
    Spliterator<T> sp=stream.spliterator();
    if(sp.hasCharacteristics(Spliterator.SIZED|Spliterator.SUBSIZED)) {
        for(;;) {
            Spliterator<T> part=sp.trySplit();
            if(part==null) break;
            if(sp.getExactSizeIfKnown()==0) {
                sp=part;
                break;
            }
        }
    }
    T value=null;
    for(Iterator<T> it=recursive(sp); it.hasNext(); )
        value=it.next();
    return value;
}

private static <T> Iterator<T> recursive(Spliterator<T> sp) {
    Spliterator<T> prev=sp.trySplit();
    if(prev==null) return Spliterators.iterator(sp);
    Iterator<T> it=recursive(sp);
    if(it!=null && it.hasNext()) return it;
    return recursive(prev);
}

вы можете проиллюстрировать разницу на следующем примере:

String s=getLast(
    IntStream.range(0, 10_000_000).mapToObj(i-> {
        System.out.println("potential heavy operation on "+i);
        return String.valueOf(i);
    }).parallel()
);
System.out.println(s);

Он будет печатать:

potential heavy operation on 9999999
9999999

в других словом, он выполнял операцию не на первых 9999999 элементах, а только на последних.

Это просто рефакторинг Хольгерответ, потому что код, хотя и фантастический, немного трудно читать/понимать, особенно для людей, которые не были программистами C до Java. Надеюсь, мой переделанный пример класса-это немного легче для тех, кто не знаком с spliterators, что они делают, или как они работают.

public class LastElementFinderExample {
    public static void main(String[] args){
        String s = getLast(
            LongStream.range(0, 10_000_000_000L).mapToObj(i-> {
                System.out.println("potential heavy operation on "+i);
                return String.valueOf(i);
            }).parallel()
        );
        System.out.println(s);
    }

    public static <T> T getLast(Stream<T> stream){
        Spliterator<T> sp = stream.spliterator();
        if(isSized(sp)) {
            sp = getLastSplit(sp);
        }
        return getIteratorLastValue(getLastIterator(sp));
    }

    private static boolean isSized(Spliterator<?> sp){
        return sp.hasCharacteristics(Spliterator.SIZED|Spliterator.SUBSIZED);
    }

    private static <T> Spliterator<T> getLastSplit(Spliterator<T> sp){
        return splitUntil(sp, s->s.getExactSizeIfKnown() == 0);
    }

    private static <T> Iterator<T> getLastIterator(Spliterator<T> sp) {
        return Spliterators.iterator(splitUntil(sp, null));
    }

    private static <T> T getIteratorLastValue(Iterator<T> it){
        T result = null;
        while (it.hasNext()){
            result = it.next();
        }
        return result;
    }

    private static <T> Spliterator<T> splitUntil(Spliterator<T> sp, Predicate<Spliterator<T>> condition){
        Spliterator<T> result = sp;
        for (Spliterator<T> part = sp.trySplit(); part != null; part = result.trySplit()){
            if (condition == null || condition.test(result)){
                result = part;
            }
        }
        return result;      
    }   
}

Я считаю, что это решение является более эффективным и читабельным, чем ХольгерС:

import java.util.Spliterator;
import static java.util.Spliterator.ORDERED;
import java.util.stream.Stream;

/**
 * @param <T>    the type of elements in the stream
 * @param stream a stream
 * @return the last element in the stream
 * @throws AssertionError if the stream is unordered
 */
public static <T> Optional<T> getLastElement(Stream<T> stream)
{
    Spliterator<T> spliterator = stream.spliterator();
    assert (spliterator.hasCharacteristics(ORDERED)): "Operation makes no sense on unordered streams";

    // First we skip as many elements as possible
    Consumer<T> noOp = input -> {};
    while (true) {
        // trySplit() moves the first spliterator forward by the size of the second spliterator
        Spliterator<T> second = spliterator.trySplit();
        if (second == null)
            break;
        if (!spliterator.tryAdvance(noOp)) {
            // If the first spliterator is empty, continue splitting the second spliterator
            spliterator = second;
        }
    }

    // Then we consume the last element(s)
    LastElementConsumer<T> consumer = new LastElementConsumer<>();
    spliterator.forEachRemaining(consumer);
    return consumer.get();
}

[...]

import java.util.Optional;
import java.util.function.Consumer;

/**
 * A consumer that returns the last value that was consumed.
 * <p>
 * @param <T> the type of elements to consume
 * @author Gili Tzabari
 */
public final class LastElementConsumer<T> implements Consumer<T>
{
    private Optional<T> result = Optional.empty();

    @Override
    public void accept(T t)
    {
        result = Optional.of(t);
    }

    /**
     * @return the last value that was consumed
     */
    public Optional<T> get()
    {
        return result;
    }
}

если вы запустите:

String s = getLastElement(IntStream.range(0, 10_000_000).mapToObj(i->
  {
    System.out.println("Potential heavy operation on " + i);
    return String.valueOf(i);
  }).parallel()
);
System.out.println(s);

он будет печатать тот же вывод, что и решение Хольгера:

Potential heavy operation on 9999999
9999999

другими словами, он не выполнял операцию на первых 9999999 элементах, а только на последнем.

вот еще одно решение (не эффективных):

List<String> list = Arrays.asList("abc","ab","cc");
long count = list.stream().count();
list.stream().skip(count-1).findFirst().ifPresent(System.out::println);

параллельные безразмерные потоки с методами 'skip' сложны, и реализация @Holger дает неправильный ответ. Также реализация @Holger немного медленнее, потому что она использует итераторы.

оптимизация @Holger ответ:

public static <T> Optional<T> last(Stream<? extends T> stream) {
    Objects.requireNonNull(stream, "stream");

    Spliterator<? extends T> spliterator = stream.spliterator();
    Spliterator<? extends T> lastSpliterator = spliterator;

    // Note that this method does not work very well with:
    // unsized parallel streams when used with skip methods.
    // on that cases it will answer Optional.empty.

    // Find the last spliterator with estimate size
    // Meaningfull only on unsized parallel streams
    if(spliterator.estimateSize() == Long.MAX_VALUE) {
        for (Spliterator<? extends T> prev = spliterator.trySplit(); prev != null; prev = spliterator.trySplit()) {
            lastSpliterator = prev;
        }
    }

    // Find the last spliterator on sized streams
    // Meaningfull only on parallel streams (note that unsized was transformed in sized)
    for (Spliterator<? extends T> prev = lastSpliterator.trySplit(); prev != null; prev = lastSpliterator.trySplit()) {
        if (lastSpliterator.estimateSize() == 0) {
            lastSpliterator = prev;
            break;
        }
    }

    // Find the last element of the last spliterator
    // Parallel streams only performs operation on one element
    AtomicReference<T> last = new AtomicReference<>();
    lastSpliterator.forEachRemaining(last::set);

    return Optional.ofNullable(last.get());
}

модульное тестирование с использованием junit 5:

@Test
@DisplayName("last sequential sized")
void last_sequential_sized() throws Exception {
    long expected = 10_000_000L;
    AtomicLong count = new AtomicLong();
    Stream<Long> stream = LongStream.rangeClosed(1, expected).boxed();
    stream = stream.skip(50_000).peek(num -> count.getAndIncrement());

    assertThat(Streams.last(stream)).hasValue(expected);
    assertThat(count).hasValue(9_950_000L);
}

@Test
@DisplayName("last sequential unsized")
void last_sequential_unsized() throws Exception {
    long expected = 10_000_000L;
    AtomicLong count = new AtomicLong();
    Stream<Long> stream = LongStream.rangeClosed(1, expected).boxed();
    stream = StreamSupport.stream(((Iterable<Long>) stream::iterator).spliterator(), stream.isParallel());
    stream = stream.skip(50_000).peek(num -> count.getAndIncrement());

    assertThat(Streams.last(stream)).hasValue(expected);
    assertThat(count).hasValue(9_950_000L);
}

@Test
@DisplayName("last parallel sized")
void last_parallel_sized() throws Exception {
    long expected = 10_000_000L;
    AtomicLong count = new AtomicLong();
    Stream<Long> stream = LongStream.rangeClosed(1, expected).boxed().parallel();
    stream = stream.skip(50_000).peek(num -> count.getAndIncrement());

    assertThat(Streams.last(stream)).hasValue(expected);
    assertThat(count).hasValue(1);
}

@Test
@DisplayName("getLast parallel unsized")
void last_parallel_unsized() throws Exception {
    long expected = 10_000_000L;
    AtomicLong count = new AtomicLong();
    Stream<Long> stream = LongStream.rangeClosed(1, expected).boxed().parallel();
    stream = StreamSupport.stream(((Iterable<Long>) stream::iterator).spliterator(), stream.isParallel());
    stream = stream.peek(num -> count.getAndIncrement());

    assertThat(Streams.last(stream)).hasValue(expected);
    assertThat(count).hasValue(1);
}

@Test
@DisplayName("last parallel unsized with skip")
void last_parallel_unsized_with_skip() throws Exception {
    long expected = 10_000_000L;
    AtomicLong count = new AtomicLong();
    Stream<Long> stream = LongStream.rangeClosed(1, expected).boxed().parallel();
    stream = StreamSupport.stream(((Iterable<Long>) stream::iterator).spliterator(), stream.isParallel());
    stream = stream.skip(50_000).peek(num -> count.getAndIncrement());

    // Unfortunately unsized parallel streams does not work very well with skip
    //assertThat(Streams.last(stream)).hasValue(expected);
    //assertThat(count).hasValue(1);

    // @Holger implementation gives wrong answer!!
    //assertThat(Streams.getLast(stream)).hasValue(9_950_000L); //!!!
    //assertThat(count).hasValue(1);

    // This is also not a very good answer better
    assertThat(Streams.last(stream)).isEmpty();
    assertThat(count).hasValue(0);
}

единственное решение для поддержки обоих сценариев заключается в том, чтобы избежать обнаружения последнего разделителя на некрупных параллельных потоках. Следствием этого является то, что решение будет выполнять операции на всех элементах, но это даст всегда правильный ответ.

обратите внимание, что в последовательных потоках он в любом случае будет выполнять операции со всеми элементами.

public static <T> Optional<T> last(Stream<? extends T> stream) {
    Objects.requireNonNull(stream, "stream");

    Spliterator<? extends T> spliterator = stream.spliterator();

    // Find the last spliterator with estimate size (sized parallel streams)
    if(spliterator.hasCharacteristics(Spliterator.SIZED|Spliterator.SUBSIZED)) {
        // Find the last spliterator on sized streams (parallel streams)
        for (Spliterator<? extends T> prev = spliterator.trySplit(); prev != null; prev = spliterator.trySplit()) {
            if (spliterator.getExactSizeIfKnown() == 0) {
                spliterator = prev;
                break;
            }
        }
    }

    // Find the last element of the spliterator
    //AtomicReference<T> last = new AtomicReference<>();
    //spliterator.forEachRemaining(last::set);

    //return Optional.ofNullable(last.get());

    // A better one that supports native parallel streams
    return (Optional<T>) StreamSupport.stream(spliterator, stream.isParallel())
            .reduce((a, b) -> b);
}

что касается модульного тестирования для этой реализации, первые три теста точно такие же (последовательные и параллельные). Тесты для безразмерный параллель здесь:

@Test
@DisplayName("last parallel unsized")
void last_parallel_unsized() throws Exception {
    long expected = 10_000_000L;
    AtomicLong count = new AtomicLong();
    Stream<Long> stream = LongStream.rangeClosed(1, expected).boxed().parallel();
    stream = StreamSupport.stream(((Iterable<Long>) stream::iterator).spliterator(), stream.isParallel());
    stream = stream.peek(num -> count.getAndIncrement());

    assertThat(Streams.last(stream)).hasValue(expected);
    assertThat(count).hasValue(10_000_000L);
}

@Test
@DisplayName("last parallel unsized with skip")
void last_parallel_unsized_with_skip() throws Exception {
    long expected = 10_000_000L;
    AtomicLong count = new AtomicLong();
    Stream<Long> stream = LongStream.rangeClosed(1, expected).boxed().parallel();
    stream = StreamSupport.stream(((Iterable<Long>) stream::iterator).spliterator(), stream.isParallel());
    stream = stream.skip(50_000).peek(num -> count.getAndIncrement());

    assertThat(Streams.last(stream)).hasValue(expected);
    assertThat(count).hasValue(9_950_000L);
}

гуава имеет потоки.findlast ядро:

Stream<T> stream;
T last = Streams.findLast(stream);

Comments

    Ничего не найдено.