Считывание из потока ввода Java с очень большими объемами данных несколько раз
Я хотел бы знать, как лучше всего считывать байты из Java InputStream несколько раз и все равно быть эффективным, когда поток очень большой.
Допустим, у меня есть следующий код:
public void handleBytes(InputStream in) {
doStuff1(in);
doStuff2(in);
doStuff3(in);
}
Где doStuff1, doStuff2 и doStuff3 все должны работать с одними и теми же байтами, но делать разные вещи. Кроме того, я предполагаю, что эти функции могут быть асинхронными.
Я знаю, что можно mark, а затем reset поток, но мне интересно, если это путь, Когда in имеет много данных. Кроме того, если я хочу иметь работника потока на doStuff-X, я не могу действительно использовать reset.
Должен ли я иметь копию потока для каждого метода doStuff-X? Но опять же я не уверен, что это будет эффективно для больших объемов данных.
4 ответов:
Если вы знаете, что три функции doStuff() выполняются асинхронно, то вы можете попробовать использовать Apache Commons IO TeeInputStream для копирования содержимого начального входного потока в PipedOutputStream, который подключен к PipedInputStream, который читается doStuff2(). Аналогично вы можете настроить второй TeeInputStream, построенный с использованием второго PipedOutputStream, подключенного ко второму PipedInputStream для doStuff3().
Есть некоторые ограничения на этот подход:
1) doStuff1 (), doStuff2 () и doStuff3 () должны выполняться в отдельных потоках, иначе вы будете буферизировать весь файл дважды, пока выполняется doStuff1 () и до запуска doStuff2 () и doStuff3 (). Этот подход предполагает, что doStuff2() и doStuff3() читают и обрабатывают данные, в то время как doStuff1() читает данные изначально.
2) doStuff1 () не может использовать use skip (), mark() или reset (), так как это испортит нижестоящие функции (как описано выше в TeeInputStream javadoc.
Этот подход должен быть достаточно эффективным для памяти, если все три функции doStuff () могут обрабатывать данные примерно с одинаковой скоростью.
Вы можете прочитать входной поток только один раз без буферизации всего входного потока.
Вы можете загрузить его в память, если это ГБ или около того, или скопировать его в файл и воспроизвести его, если у вас есть много Гб. Если вы можете проанализировать данные в одном потоке, вы можете передать их в другие потоки.
Вообще говоря, это кажется плохой идеей.
markне гарантируется, что он вообще поддерживается потоком, и даже если он поддерживается, вы должны указать предел, сколько байтов может быть прочитано до вызоваreset.Поскольку вы упомянули, что эти
dostuffмогут выполняться асинхронно, почему бы не запустить поток для каждого из них и не использовать очереди для подачи входных данных из основного потока в эти три очереди одновременно? Это требует некоторой синхронизации, но таким образом у вас нет ограничений на входной объем и все еще может ограничить использование памяти.
Можно использовать PipedOutputStream и PipedInputStream.
static class Task extends Thread{ private final String taskName; private final BufferedInputStream input; public Task(String taskName, PipedInputStream input){ this.taskName = taskName; this.input = new BufferedInputStream( input); } public void run(){ try { System.out.println("Thread "+this.taskName+" Start"); final byte buf[] = new byte[8]; // 8 bytes for demo while(true){ if( input.available() > 0){ input.read(buf); System.out.println(String.format("Task Name %s, read:%s", this.taskName, new String(buf))); } else{ // TODO: Set break Condition:Ex: Check the expected read size Thread.sleep(1000); } } } catch (IOException | InterruptedException e) { throw new RuntimeException(e); } } } public static void main(String args[]) { try{ final PipedInputStream input1 = new PipedInputStream(); final PipedInputStream input2 = new PipedInputStream(); final PipedInputStream input3 = new PipedInputStream(); final Task t1 = new Task("Task1", input1); final Task t2 = new Task("Task2", input2); final Task t3 = new Task("Task3", input3); t1.start(); t2.start(); t3.start(); Thread.sleep(300); InputStream input = null; try{ input = new FileInputStream("LargeInputFile.txt"); final PipedOutputStream out1 = new PipedOutputStream(input1); final PipedOutputStream out2 = new PipedOutputStream(input2); final PipedOutputStream out3 = new PipedOutputStream(input3); byte buf[] = new byte[8]; // 8 bytes for demo while(true){ if(input.available()>0){ int size = input.read(buf); if(size > 0){ out1.write(buf); out2.write(buf); out3.write(buf); out1.flush(); out2.flush(); out3.flush(); } } else{ System.out.println("Rread is finished!"); break; } } } finally{ if(input!=null){ input.close(); } } t1.join(); t2.join(); t3.join(); } catch(Exception e){ e.printStackTrace(System.err); } }
Comments