Java 8 поток с пакетной обработкой
у меня есть большой файл, который содержит список элементов.
Я хотел бы создать пакет элементов, сделать HTTP-запрос с этим пакетом (все элементы необходимы в качестве параметров в HTTP-запросе). Я могу сделать это очень легко с for цикл, но как любитель Java 8, я хочу попробовать написать это с помощью Java 8 Stream framework (и воспользоваться преимуществами ленивой обработки).
пример:
List<String> batch = new ArrayList<>(BATCH_SIZE);
for (int i = 0; i < data.size(); i++) {
batch.add(data.get(i));
if (batch.size() == BATCH_SIZE) process(batch);
}
if (batch.size() > 0) process(batch);
Я хочу сделать что-то на длинной линии из
lazyFileStream.group(500).map(processBatch).collect(toList())
что было бы лучшим способом сделать это?
10 ответов:
вы могли бы сделать это с jOOλ, библиотека, которая расширяет потоки Java 8 для однопоточных, последовательных случаев использования потока:
Seq.seq(lazyFileStream) // Seq<String> .zipWithIndex() // Seq<Tuple2<String, Long>> .groupBy(tuple -> tuple.v2 / 500) // Map<Long, List<String>> .forEach((index, batch) -> { process(batch); });за кулисами
zipWithIndex()- это просто:static <T> Seq<Tuple2<T, Long>> zipWithIndex(Stream<T> stream) { final Iterator<T> it = stream.iterator(); class ZipWithIndex implements Iterator<Tuple2<T, Long>> { long index; @Override public boolean hasNext() { return it.hasNext(); } @Override public Tuple2<T, Long> next() { return tuple(it.next(), index++); } } return seq(new ZipWithIndex()); }... тогда как
groupBy()это API удобство для:default <K> Map<K, List<T>> groupBy(Function<? super T, ? extends K> classifier) { return collect(Collectors.groupingBy(classifier)); }(отказ от ответственности: я работаю в компании за jOOλ)
для полноты картины, вот гуавы решение.
Iterators.partition(stream.iterator(), batchSize).forEachRemaining(this::process);в вопросе коллекция доступна, поэтому поток не нужен и его можно записать так:
Iterables.partition(data, batchSize).forEach(this::process);
чистая реализация Java-8 также возможна:
int BATCH = 500; IntStream.range(0, (data.size()+BATCH-1)/BATCH) .mapToObj(i -> data.subList(i*BATCH, Math.min(data.size(), (i+1)*BATCH))) .forEach(batch -> process(batch));обратите внимание, что в отличие от JOOl он может прекрасно работать параллельно (при условии, что ваш
dataсписок с произвольным доступом).
чистое решение Java 8:
мы можем создать пользовательский коллектор, чтобы сделать это элегантно, который принимает в
batch sizeиConsumerдля обработки каждого пакета:import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Set; import java.util.function.*; import java.util.stream.Collector; import static java.util.Objects.requireNonNull; /** * Collects elements in the stream and calls the supplied batch processor * after the configured batch size is reached. * * In case of a parallel stream, the batch processor may be called with * elements less than the batch size. * * The elements are not kept in memory, and the final result will be an * empty list. * * @param <T> Type of the elements being collected */ class BatchCollector<T> implements Collector<T, List<T>, List<T>> { private final int batchSize; private final Consumer<List<T>> batchProcessor; /** * Constructs the batch collector * * @param batchSize the batch size after which the batchProcessor should be called * @param batchProcessor the batch processor which accepts batches of records to process */ BatchCollector(int batchSize, Consumer<List<T>> batchProcessor) { batchProcessor = requireNonNull(batchProcessor); this.batchSize = batchSize; this.batchProcessor = batchProcessor; } public Supplier<List<T>> supplier() { return ArrayList::new; } public BiConsumer<List<T>, T> accumulator() { return (ts, t) -> { ts.add(t); if (ts.size() >= batchSize) { batchProcessor.accept(ts); ts.clear(); } }; } public BinaryOperator<List<T>> combiner() { return (ts, ots) -> { // process each parallel list without checking for batch size // avoids adding all elements of one to another // can be modified if a strict batching mode is required batchProcessor.accept(ts); batchProcessor.accept(ots); return Collections.emptyList(); }; } public Function<List<T>, List<T>> finisher() { return ts -> { batchProcessor.accept(ts); return Collections.emptyList(); }; } public Set<Characteristics> characteristics() { return Collections.emptySet(); } }при необходимости создайте вспомогательный служебный класс:
import java.util.List; import java.util.function.Consumer; import java.util.stream.Collector; public class StreamUtils { /** * Creates a new batch collector * @param batchSize the batch size after which the batchProcessor should be called * @param batchProcessor the batch processor which accepts batches of records to process * @param <T> the type of elements being processed * @return a batch collector instance */ public static <T> Collector<T, List<T>, List<T>> batchCollector(int batchSize, Consumer<List<T>> batchProcessor) { return new BatchCollector<T>(batchSize, batchProcessor); } }пример использования:
List<Integer> input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); List<Integer> output = new ArrayList<>(); int batchSize = 3; Consumer<List<Integer>> batchProcessor = xs -> output.addAll(xs); input.stream() .collect(StreamUtils.batchCollector(batchSize, batchProcessor));Я также разместил свой код на GitHub, если кто-то хочет взглянуть:
Я написал пользовательский разделитель для таких сценариев. Он будет заполнять списки заданного размера из входного потока. Преимущество этого подхода заключается в том, что он будет выполнять ленивую обработку, и он будет работать с другими функциями потока.
public static <T> Stream<List<T>> batches(Stream<T> stream, int batchSize) { return batchSize <= 0 ? Stream.of(stream.collect(Collectors.toList())) : StreamSupport.stream(new BatchSpliterator<>(stream.spliterator(), batchSize), stream.isParallel()); } private static class BatchSpliterator<E> implements Spliterator<List<E>> { private final Spliterator<E> base; private final int batchSize; public BatchSpliterator(Spliterator<E> base, int batchSize) { this.base = base; this.batchSize = batchSize; } @Override public boolean tryAdvance(Consumer<? super List<E>> action) { final List<E> batch = new ArrayList<>(batchSize); for (int i=0; i < batchSize && base.tryAdvance(batch::add); i++) ; if (batch.isEmpty()) return false; action.accept(batch); return true; } @Override public Spliterator<List<E>> trySplit() { if (base.estimateSize() <= batchSize) return null; final Spliterator<E> splitBase = this.base.trySplit(); return splitBase == null ? null : new BatchSpliterator<>(splitBase, batchSize); } @Override public long estimateSize() { final double baseSize = base.estimateSize(); return baseSize == 0 ? 0 : (long) Math.ceil(baseSize / (double) batchSize); } @Override public int characteristics() { return base.characteristics(); } }
вы также можете использовать RxJava:
Observable.from(data).buffer(BATCH_SIZE).forEach((batch) -> process(batch));или
Observable.from(lazyFileStream).buffer(500).map((batch) -> process(batch)).toList();или
Observable.from(lazyFileStream).buffer(500).map(MyClass::process).toList();
вы также можете взглянуть на Циклоп-реагировать, Я автор этой библиотеки. Он реализует интерфейс jOOλ (и по расширению JDK 8 Streams), но в отличие от параллельных потоков JDK 8 он фокусируется на асинхронных операциях (таких как потенциально блокирующие асинхронные вызовы ввода-вывода). Параллельные потоки JDK, напротив, фокусируются на параллелизме данных для операций, связанных с ЦП. Он работает путем управления агрегатами будущих задач на основе под капотом, но представляет стандартный расширенный поток API для конечные получатели.
этот пример кода может помочь вам начать
LazyFutureStream.parallelCommonBuilder() .react(data) .grouped(BATCH_SIZE) .map(this::process) .run();есть учебник по дозированию здесь
чтобы использовать свой собственный пул потоков (который, вероятно, более подходит для блокировки ввода-вывода), вы можете начать обработку с
LazyReact reactor = new LazyReact(40); reactor.react(data) .grouped(BATCH_SIZE) .map(this::process) .run();
у нас была похожая проблема. Мы хотели взять поток, который был больше, чем системная память (итерация по всем объектам в базе данных) и рандомизировать порядок как можно лучше - мы думали, что было бы хорошо буферизировать 10 000 элементов и рандомизировать их.
целью была функция, которая принимала поток.
из предложенных здесь решений, кажется, есть целый ряд вариантов:
- используйте различные не-java 8 дополнительные библиотеки
- начните с чего - то, что не является потоком-например, список произвольного доступа
- есть поток, который можно легко разделить в spliterator
наш инстинкт изначально состоял в том, чтобы использовать пользовательский коллектор, но это означало отказ от потоковой передачи. Решение custom collector выше очень хорошо, и мы почти использовали его.
вот решение, которое обманывает, используя тот факт, что
StreamS может дать вамIteratorкоторый вы можете использовать как люк чтобы позволить вам сделать что-то дополнительное, что потоки не поддерживают. ЭлементIteratorпреобразуется обратно в поток, используя другой бит Java 8StreamSupportколдовство./** * An iterator which returns batches of items taken from another iterator */ public class BatchingIterator<T> implements Iterator<List<T>> { /** * Given a stream, convert it to a stream of batches no greater than the * batchSize. * @param originalStream to convert * @param batchSize maximum size of a batch * @param <T> type of items in the stream * @return a stream of batches taken sequentially from the original stream */ public static <T> Stream<List<T>> batchedStreamOf(Stream<T> originalStream, int batchSize) { return asStream(new BatchingIterator<>(originalStream.iterator(), batchSize)); } private static <T> Stream<T> asStream(Iterator<T> iterator) { return StreamSupport.stream( Spliterators.spliteratorUnknownSize(iterator,ORDERED), false); } private int batchSize; private List<T> currentBatch; private Iterator<T> sourceIterator; public BatchingIterator(Iterator<T> sourceIterator, int batchSize) { this.batchSize = batchSize; this.sourceIterator = sourceIterator; } @Override public boolean hasNext() { prepareNextBatch(); return currentBatch!=null && !currentBatch.isEmpty(); } @Override public List<T> next() { return currentBatch; } private void prepareNextBatch() { currentBatch = new ArrayList<>(batchSize); while (sourceIterator.hasNext() && currentBatch.size() < batchSize) { currentBatch.add(sourceIterator.next()); } } }простой пример использования этого будет выглядеть так:
@Test public void getsBatches() { BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3) .forEach(System.out::println); }выше выводит
[A, B, C] [D, E, F]для нашего случая использования мы хотели перетасовать пакеты, а затем сохранить их в виде потока - это выглядело так:
@Test public void howScramblingCouldBeDone() { BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3) // the lambda in the map expression sucks a bit because Collections.shuffle acts on the list, rather than returning a shuffled one .map(list -> { Collections.shuffle(list); return list; }) .flatMap(List::stream) .forEach(System.out::println); }это выводит что-то как (это рандомизировано, так отличается каждый раз)
A C B E D Fсекрет соуса здесь заключается в том, что всегда есть поток, поэтому вы можете либо работать с потоком партий, либо что-то делать с каждой партией, а затем
flatMapобратно в поток. Еще лучше, все вышеперечисленное работает только как финалforEachилиcollectили другие завершающие выражения тянуть данные через поток.получается, что
iterator- это особый тип завершение работы на потоке и не вызывает весь поток, чтобы запустить и прийти в память! Спасибо ребятам Java 8 за блестящий дизайн!
простой пример использования Spliterator
// read file into stream, try-with-resources try (Stream<String> stream = Files.lines(Paths.get(fileName))) { //skip header Spliterator<String> split = stream.skip(1).spliterator(); Chunker<String> chunker = new Chunker<String>(); while(true) { boolean more = split.tryAdvance(chunker::doSomething); if (!more) { break; } } } catch (IOException e) { e.printStackTrace(); } } static class Chunker<T> { int ct = 0; public void doSomething(T line) { System.out.println(ct++ + " " + line.toString()); if (ct % 100 == 0) { System.out.println("====================chunk====================="); } } }ответ Брюса более полный, но я искал что-то быстрое и грязное, чтобы обработать кучу файлов.
чистой Java 8 пример, который работает с параллельными потоками, а также.
Как использовать:
Stream<Integer> integerStream = IntStream.range(0, 45).parallel().boxed(); CsStreamUtil.processInBatch(integerStream, 10, batch -> System.out.println("Batch: " + batch));объявление и реализация метода:
public static <ElementType> void processInBatch(Stream<ElementType> stream, int batchSize, Consumer<Collection<ElementType>> batchProcessor) { List<ElementType> newBatch = new ArrayList<>(batchSize); stream.forEach(element -> { List<ElementType> fullBatch; synchronized (newBatch) { if (newBatch.size() < batchSize) { newBatch.add(element); return; } else { fullBatch = new ArrayList<>(newBatch); newBatch.clear(); newBatch.add(element); } } batchProcessor.accept(fullBatch); }); if (newBatch.size() > 0) batchProcessor.accept(new ArrayList<>(newBatch)); }
Comments