Java 8 поток с пакетной обработкой



у меня есть большой файл, который содержит список элементов.



Я хотел бы создать пакет элементов, сделать HTTP-запрос с этим пакетом (все элементы необходимы в качестве параметров в HTTP-запросе). Я могу сделать это очень легко с for цикл, но как любитель Java 8, я хочу попробовать написать это с помощью Java 8 Stream framework (и воспользоваться преимуществами ленивой обработки).



пример:



List<String> batch = new ArrayList<>(BATCH_SIZE);
for (int i = 0; i < data.size(); i++) {
batch.add(data.get(i));
if (batch.size() == BATCH_SIZE) process(batch);
}

if (batch.size() > 0) process(batch);


Я хочу сделать что-то на длинной линии из
lazyFileStream.group(500).map(processBatch).collect(toList())



что было бы лучшим способом сделать это?

681   10  

10 ответов:

вы могли бы сделать это с jOOλ, библиотека, которая расширяет потоки Java 8 для однопоточных, последовательных случаев использования потока:

Seq.seq(lazyFileStream)              // Seq<String>
   .zipWithIndex()                   // Seq<Tuple2<String, Long>>
   .groupBy(tuple -> tuple.v2 / 500) // Map<Long, List<String>>
   .forEach((index, batch) -> {
       process(batch);
   });

за кулисами zipWithIndex() - это просто:

static <T> Seq<Tuple2<T, Long>> zipWithIndex(Stream<T> stream) {
    final Iterator<T> it = stream.iterator();

    class ZipWithIndex implements Iterator<Tuple2<T, Long>> {
        long index;

        @Override
        public boolean hasNext() {
            return it.hasNext();
        }

        @Override
        public Tuple2<T, Long> next() {
            return tuple(it.next(), index++);
        }
    }

    return seq(new ZipWithIndex());
}

... тогда как groupBy() это API удобство для:

default <K> Map<K, List<T>> groupBy(Function<? super T, ? extends K> classifier) {
    return collect(Collectors.groupingBy(classifier));
}

(отказ от ответственности: я работаю в компании за jOOλ)

для полноты картины, вот гуавы решение.

Iterators.partition(stream.iterator(), batchSize).forEachRemaining(this::process);

в вопросе коллекция доступна, поэтому поток не нужен и его можно записать так:

Iterables.partition(data, batchSize).forEach(this::process);

чистая реализация Java-8 также возможна:

int BATCH = 500;
IntStream.range(0, (data.size()+BATCH-1)/BATCH)
         .mapToObj(i -> data.subList(i*BATCH, Math.min(data.size(), (i+1)*BATCH)))
         .forEach(batch -> process(batch));

обратите внимание, что в отличие от JOOl он может прекрасно работать параллельно (при условии, что ваш data список с произвольным доступом).

чистое решение Java 8:

мы можем создать пользовательский коллектор, чтобы сделать это элегантно, который принимает в batch size и Consumer для обработки каждого пакета:

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.*;
import java.util.stream.Collector;

import static java.util.Objects.requireNonNull;


/**
 * Collects elements in the stream and calls the supplied batch processor
 * after the configured batch size is reached.
 *
 * In case of a parallel stream, the batch processor may be called with
 * elements less than the batch size.
 *
 * The elements are not kept in memory, and the final result will be an
 * empty list.
 *
 * @param <T> Type of the elements being collected
 */
class BatchCollector<T> implements Collector<T, List<T>, List<T>> {

    private final int batchSize;
    private final Consumer<List<T>> batchProcessor;


    /**
     * Constructs the batch collector
     *
     * @param batchSize the batch size after which the batchProcessor should be called
     * @param batchProcessor the batch processor which accepts batches of records to process
     */
    BatchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
        batchProcessor = requireNonNull(batchProcessor);

        this.batchSize = batchSize;
        this.batchProcessor = batchProcessor;
    }

    public Supplier<List<T>> supplier() {
        return ArrayList::new;
    }

    public BiConsumer<List<T>, T> accumulator() {
        return (ts, t) -> {
            ts.add(t);
            if (ts.size() >= batchSize) {
                batchProcessor.accept(ts);
                ts.clear();
            }
        };
    }

    public BinaryOperator<List<T>> combiner() {
        return (ts, ots) -> {
            // process each parallel list without checking for batch size
            // avoids adding all elements of one to another
            // can be modified if a strict batching mode is required
            batchProcessor.accept(ts);
            batchProcessor.accept(ots);
            return Collections.emptyList();
        };
    }

    public Function<List<T>, List<T>> finisher() {
        return ts -> {
            batchProcessor.accept(ts);
            return Collections.emptyList();
        };
    }

    public Set<Characteristics> characteristics() {
        return Collections.emptySet();
    }
}

при необходимости создайте вспомогательный служебный класс:

import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collector;

public class StreamUtils {

    /**
     * Creates a new batch collector
     * @param batchSize the batch size after which the batchProcessor should be called
     * @param batchProcessor the batch processor which accepts batches of records to process
     * @param <T> the type of elements being processed
     * @return a batch collector instance
     */
    public static <T> Collector<T, List<T>, List<T>> batchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
        return new BatchCollector<T>(batchSize, batchProcessor);
    }
}

пример использования:

List<Integer> input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
List<Integer> output = new ArrayList<>();

int batchSize = 3;
Consumer<List<Integer>> batchProcessor = xs -> output.addAll(xs);

input.stream()
     .collect(StreamUtils.batchCollector(batchSize, batchProcessor));

Я также разместил свой код на GitHub, если кто-то хочет взглянуть:

ссылка на Github

Я написал пользовательский разделитель для таких сценариев. Он будет заполнять списки заданного размера из входного потока. Преимущество этого подхода заключается в том, что он будет выполнять ленивую обработку, и он будет работать с другими функциями потока.

public static <T> Stream<List<T>> batches(Stream<T> stream, int batchSize) {
    return batchSize <= 0
        ? Stream.of(stream.collect(Collectors.toList()))
        : StreamSupport.stream(new BatchSpliterator<>(stream.spliterator(), batchSize), stream.isParallel());
}

private static class BatchSpliterator<E> implements Spliterator<List<E>> {

    private final Spliterator<E> base;
    private final int batchSize;

    public BatchSpliterator(Spliterator<E> base, int batchSize) {
        this.base = base;
        this.batchSize = batchSize;
    }

    @Override
    public boolean tryAdvance(Consumer<? super List<E>> action) {
        final List<E> batch = new ArrayList<>(batchSize);
        for (int i=0; i < batchSize && base.tryAdvance(batch::add); i++)
            ;
        if (batch.isEmpty())
            return false;
        action.accept(batch);
        return true;
    }

    @Override
    public Spliterator<List<E>> trySplit() {
        if (base.estimateSize() <= batchSize)
            return null;
        final Spliterator<E> splitBase = this.base.trySplit();
        return splitBase == null ? null
                : new BatchSpliterator<>(splitBase, batchSize);
    }

    @Override
    public long estimateSize() {
        final double baseSize = base.estimateSize();
        return baseSize == 0 ? 0
                : (long) Math.ceil(baseSize / (double) batchSize);
    }

    @Override
    public int characteristics() {
        return base.characteristics();
    }

}

вы также можете использовать RxJava:

Observable.from(data).buffer(BATCH_SIZE).forEach((batch) -> process(batch));

или

Observable.from(lazyFileStream).buffer(500).map((batch) -> process(batch)).toList();

или

Observable.from(lazyFileStream).buffer(500).map(MyClass::process).toList();

вы также можете взглянуть на Циклоп-реагировать, Я автор этой библиотеки. Он реализует интерфейс jOOλ (и по расширению JDK 8 Streams), но в отличие от параллельных потоков JDK 8 он фокусируется на асинхронных операциях (таких как потенциально блокирующие асинхронные вызовы ввода-вывода). Параллельные потоки JDK, напротив, фокусируются на параллелизме данных для операций, связанных с ЦП. Он работает путем управления агрегатами будущих задач на основе под капотом, но представляет стандартный расширенный поток API для конечные получатели.

этот пример кода может помочь вам начать

LazyFutureStream.parallelCommonBuilder()
                .react(data)
                .grouped(BATCH_SIZE)                  
                .map(this::process)
                .run();

есть учебник по дозированию здесь

и более общий учебник здесь

чтобы использовать свой собственный пул потоков (который, вероятно, более подходит для блокировки ввода-вывода), вы можете начать обработку с

     LazyReact reactor = new LazyReact(40);

     reactor.react(data)
            .grouped(BATCH_SIZE)                  
            .map(this::process)
            .run();

у нас была похожая проблема. Мы хотели взять поток, который был больше, чем системная память (итерация по всем объектам в базе данных) и рандомизировать порядок как можно лучше - мы думали, что было бы хорошо буферизировать 10 000 элементов и рандомизировать их.

целью была функция, которая принимала поток.

из предложенных здесь решений, кажется, есть целый ряд вариантов:

  • используйте различные не-java 8 дополнительные библиотеки
  • начните с чего - то, что не является потоком-например, список произвольного доступа
  • есть поток, который можно легко разделить в spliterator

наш инстинкт изначально состоял в том, чтобы использовать пользовательский коллектор, но это означало отказ от потоковой передачи. Решение custom collector выше очень хорошо, и мы почти использовали его.

вот решение, которое обманывает, используя тот факт, что StreamS может дать вам Iterator который вы можете использовать как люк чтобы позволить вам сделать что-то дополнительное, что потоки не поддерживают. Элемент Iterator преобразуется обратно в поток, используя другой бит Java 8 StreamSupport колдовство.

/**
 * An iterator which returns batches of items taken from another iterator
 */
public class BatchingIterator<T> implements Iterator<List<T>> {
    /**
     * Given a stream, convert it to a stream of batches no greater than the
     * batchSize.
     * @param originalStream to convert
     * @param batchSize maximum size of a batch
     * @param <T> type of items in the stream
     * @return a stream of batches taken sequentially from the original stream
     */
    public static <T> Stream<List<T>> batchedStreamOf(Stream<T> originalStream, int batchSize) {
        return asStream(new BatchingIterator<>(originalStream.iterator(), batchSize));
    }

    private static <T> Stream<T> asStream(Iterator<T> iterator) {
        return StreamSupport.stream(
            Spliterators.spliteratorUnknownSize(iterator,ORDERED),
            false);
    }

    private int batchSize;
    private List<T> currentBatch;
    private Iterator<T> sourceIterator;

    public BatchingIterator(Iterator<T> sourceIterator, int batchSize) {
        this.batchSize = batchSize;
        this.sourceIterator = sourceIterator;
    }

    @Override
    public boolean hasNext() {
        prepareNextBatch();
        return currentBatch!=null && !currentBatch.isEmpty();
    }

    @Override
    public List<T> next() {
        return currentBatch;
    }

    private void prepareNextBatch() {
        currentBatch = new ArrayList<>(batchSize);
        while (sourceIterator.hasNext() && currentBatch.size() < batchSize) {
            currentBatch.add(sourceIterator.next());
        }
    }
}

простой пример использования этого будет выглядеть так:

@Test
public void getsBatches() {
    BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
        .forEach(System.out::println);
}

выше выводит

[A, B, C]
[D, E, F]

для нашего случая использования мы хотели перетасовать пакеты, а затем сохранить их в виде потока - это выглядело так:

@Test
public void howScramblingCouldBeDone() {
    BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
        // the lambda in the map expression sucks a bit because Collections.shuffle acts on the list, rather than returning a shuffled one
        .map(list -> {
            Collections.shuffle(list); return list; })
        .flatMap(List::stream)
        .forEach(System.out::println);
}

это выводит что-то как (это рандомизировано, так отличается каждый раз)

A
C
B
E
D
F

секрет соуса здесь заключается в том, что всегда есть поток, поэтому вы можете либо работать с потоком партий, либо что-то делать с каждой партией, а затем flatMap обратно в поток. Еще лучше, все вышеперечисленное работает только как финал forEach или collect или другие завершающие выражения тянуть данные через поток.

получается, что iterator - это особый тип завершение работы на потоке и не вызывает весь поток, чтобы запустить и прийти в память! Спасибо ребятам Java 8 за блестящий дизайн!

простой пример использования Spliterator

    // read file into stream, try-with-resources
    try (Stream<String> stream = Files.lines(Paths.get(fileName))) {
        //skip header
        Spliterator<String> split = stream.skip(1).spliterator();
        Chunker<String> chunker = new Chunker<String>();
        while(true) {              
            boolean more = split.tryAdvance(chunker::doSomething);
            if (!more) {
                break;
            }
        }           
    } catch (IOException e) {
        e.printStackTrace();
    }
}

static class Chunker<T> {
    int ct = 0;
    public void doSomething(T line) {
        System.out.println(ct++ + " " + line.toString());
        if (ct % 100 == 0) {
            System.out.println("====================chunk=====================");               
        }           
    }       
}

ответ Брюса более полный, но я искал что-то быстрое и грязное, чтобы обработать кучу файлов.

чистой Java 8 пример, который работает с параллельными потоками, а также.

Как использовать:

Stream<Integer> integerStream = IntStream.range(0, 45).parallel().boxed();
CsStreamUtil.processInBatch(integerStream, 10, batch -> System.out.println("Batch: " + batch));

объявление и реализация метода:

public static <ElementType> void processInBatch(Stream<ElementType> stream, int batchSize, Consumer<Collection<ElementType>> batchProcessor)
{
    List<ElementType> newBatch = new ArrayList<>(batchSize);

    stream.forEach(element -> {
        List<ElementType> fullBatch;

        synchronized (newBatch)
        {
            if (newBatch.size() < batchSize)
            {
                newBatch.add(element);
                return;
            }
            else
            {
                fullBatch = new ArrayList<>(newBatch);
                newBatch.clear();
                newBatch.add(element);
            }
        }

        batchProcessor.accept(fullBatch);
    });

    if (newBatch.size() > 0)
        batchProcessor.accept(new ArrayList<>(newBatch));
}

Comments

    Ничего не найдено.