2016-03-18 11 views
2

Java InputStreamからバイトを読み込み、ストリームが非常に大きい場合でも効率的な方法が何であるか知りたいと思います。 doStuff1doStuff2doStuff3すべて同じバイトで作業する必要があるが、別のことを行う非常に大量のデータを複数回Java InputStreamから読み取る

public void handleBytes(InputStream in) { 
    doStuff1(in); 
    doStuff2(in); 
    doStuff3(in); 
} 

: のは、私は次のコードがあるとしましょう。また、私はこれらの関数が非同期であると仮定しています。

は、私はそれがmarkすることが可能であることを知っているし、その後resetストリームが、これはinは、大量のデータを持っているときに移動するための方法である場合、私は疑問に思って。 doStuff-Xごとにスレッドワーカーを用意したい場合は、実際にresetを使用することはできません。

doStuff-Xメソッドごとにストリームのコピーを用意する必要がありますか?しかし、再び大量のデータが効率的になるかどうかはわかりません。

+0

あなたはdoStuffの方法は何をすべきかについての詳細な情報を提供する必要があります。 – Raedwald

+0

入力ストリームが大きすぎない場合は、バイトのマスターセットに対してさまざまなプロセスを実行できる場合は、そのバイトをメモリにロードしてください。 – ManoDestra

答えて

1

あなたは3 doStuff()関数は非同期で実行されていることを知って場合は、doStuff2(によって読み込まれているPipedInputStreamに接続されているPipedOutputStreamに初期InputStreamの内容をコピーするApache Commons IO TeeInputStreamを使用して試みることができます)。同様に、doStuff3()の2番目のPipedInputStreamに接続された2番目のPipedOutputStreamを使用して構築された2番目のTeeInputStreamを設定することもできます。

このアプローチにはいくつかの制限があります。

1)doStuff1()、doStuff2()とdoStuff3は、()別のスレッドで実行されている必要がありさもなければ、あなたは(doStuff1ながら二回ファイル全体をバッファリングします)が実行されていますdoStuff2()およびdoStuff3()が実行される前に実行されます。このアプローチでは、doStuff1()が最初にデータを読み込んでいる間に、doStuff2()およびdoStuff3()がデータを読み込んで処理していることを前提としています。

2)doStuff1())(スキップマーク()を使用を使用するか、TeeInputStreamのJavaDocに概説されるように(この意志台無しとして()下流の機能をリセットすることができない。

このアプローチを合理的でなければなりませんメモリ効率的な3つのすべてのdoStuff()関数は、ほぼ同じ速度でデータを処理できるように長い。

+0

これは私が望むものを達成するようです。しかし、doStuff1がデータを最初に読み込んでいる間にdoStuff2とdoStuff3がデータを読み込んで処理していると仮定していると言うとき、最初の点ではどういう意味ですか?それはなぜですか? – DrChess

+0

3つの関数が非同期に実行されていない場合(つまり別のスレッドで)、doStuff1()が完了するまでdoStuff2()およびdoStuff3()は実行されません。 doStuff1が実行されている間に実行されておらず、PipedInputStreamから読み込んでいる場合、データはパイプに蓄積されます(実際にはディスクに書き込まれる可能性がありますが、メモリに格納されます。 2本のパイプがあるので、2倍のデータを保存します。あなたはデータが "非常に大きい"と指示したので、私はそれをメモリに2つコピーしたくないと仮定しています。 –

+0

そのため、doStuff1()の実行中にdoStuff2()/ doStuff3()関数を実行することが重要です。 doStuff1()がパイプにデータを追加している間、それらの2つの関数(doStuff2()/ doStuff3())はパイプから同時に読み込み(データをクリアする)ようになっています。それは役に立ちますか? –

1

入力全体をバッファリングすることなく、InputStreamを1回だけ読み取ることができます。

GB以上のメモリを搭載している場合はメモリにロードしたり、GB以上の場合はファイルにコピーして再生したりできます。あるスレッドでデータを解析できる場合は、他のスレッドに渡すことができます。

+1

それをファイルにコピーするのは本当に簡単な方法です。しかし、doStuffsメソッドで処理する前に、すべてのバイトをファイルに書き込む必要があります。 – DrChess

1

一般的に言えば、これは悪い考えです。 markは、ストリームによってサポートされることはまったく保証されておらず、サポートされている場合でも、resetが呼び出される前に、読み取ることができるバイト数の制限を指定する必要があります。

dostuffは非同期に実行できると言われているので、それぞれのスレッドを開始し、メインスレッドからの入力を同時に3つのキューに入力するのはなぜですか?何らかの同期が必要ですが、この方法では入力音量に制限はなく、メモリ使用量を制限することができます。

1

あなたが持つPipedOutputStreamと持つPipedInputStreamを採用できました。

static class Task extends Thread{ 
    private final String taskName; 
    private final BufferedInputStream input; 
    public Task(String taskName, PipedInputStream input){ 
     this.taskName = taskName; 
     this.input = new BufferedInputStream(input); 
    } 

    public void run(){ 
     try { 
      System.out.println("Thread "+this.taskName+" Start"); 

      final byte buf[] = new byte[8]; // 8 bytes for demo 
      while(true){ 
       if(input.available() > 0){ 
        input.read(buf); 
        System.out.println(String.format("Task Name %s, read:%s", this.taskName, new String(buf))); 
       } 
       else{ 
        // TODO: Set break Condition:Ex: Check the expected read size 
        Thread.sleep(1000); 
       } 
      } 
     } catch (IOException | InterruptedException e) { 
      throw new RuntimeException(e); 
     } 
    } 
} 
public static void main(String args[]) { 
    try{ 
     final PipedInputStream input1 = new PipedInputStream(); 
     final PipedInputStream input2 = new PipedInputStream(); 
     final PipedInputStream input3 = new PipedInputStream(); 

     final Task t1 = new Task("Task1", input1); 
     final Task t2 = new Task("Task2", input2); 
     final Task t3 = new Task("Task3", input3); 
     t1.start(); 
     t2.start(); 
     t3.start(); 

     Thread.sleep(300); 

     InputStream input = null; 
     try{ 
      input = new FileInputStream("LargeInputFile.txt"); 

      final PipedOutputStream out1 = new PipedOutputStream(input1); 
      final PipedOutputStream out2 = new PipedOutputStream(input2); 
      final PipedOutputStream out3 = new PipedOutputStream(input3); 

      byte buf[] = new byte[8]; // 8 bytes for demo 
      while(true){ 

       if(input.available()>0){ 
        int size = input.read(buf); 

        if(size > 0){ 
         out1.write(buf); 
         out2.write(buf); 
         out3.write(buf); 
         out1.flush(); 
         out2.flush(); 
         out3.flush(); 
        }      
       } 
       else{ 
        System.out.println("Rread is finished!"); 
        break; 
       } 
      } 
     } 
     finally{ 
      if(input!=null){ 
       input.close(); 
      } 
     } 
     t1.join(); 
     t2.join(); 
     t3.join(); 
    } 
    catch(Exception e){ 
     e.printStackTrace(System.err); 
    } 
} 
関連する問題