2016-08-02 10 views
0

ストリーム全体をキャッシュせずに、ストリーム(たとえば、7番目の要素を置換する、最後の要素を削除する、などの隣接する重複を削除するなど)で複雑な操作を行うメソッドを作成したいとします。java 8ストリーム:複雑なストリーム処理

しかし、どのストリームAPIを使用してこのメ​​ソッドをプラグインできますか?収集中にアイテムを他のストリームに放出する自分のコレクターを作成する必要がありますか?データフローの方向はプルからプッシュに変わるでしょうか?

このような方法の可能な署名は何ですか?結果だけ

別のアイデア全体の入力ストリームを収集した後に返される可能性があるため

Stream<T> process(Stream<T> in) 

は(シングルスレッドコードで)おそらく不可能です:

void process(Stream<T> in, Stream<T> out) 

をまた、Javaので、少し欠陥があると思われます(outパラメータとして提供される)既存のストリームに項目を挿入することはできません。

どうすればJavaで複雑なストリーム処理を行うことができますか?

+1

あなたの例でストリーミングされませコレクションを持つことが必要です。ここでは

が変更されたコードです、再度隣接する複製は不可能です。私はあなたがどうにかして、例えば、 7番目の要素のみ。 –

+0

まあ、 'distinct'はどういうわけか隣接する重複を削除するので、明らかに可能です。しかし、最後の要素を削除することが適切に定義されていない可能性があることに同意します。 – piotrek

+0

'distinct'は単純なアルゴリズムで、Linuxのuniqコマンドと同じように動作します。あなたがする必要があるのは、以前見た価値を把握することだけです。現在の値が異なる場合は、前の値として記録します。同じ場合は、この要素をスキップして続けます。多くの場合、あなたはいつでも2つの連続した要素を見ています。あなたの要件は 'Stream'についての仮定を真実ではないかもしれないし、あなたが' Stream'を処理するまで見つけることができません。 – nickb

答えて

1

filtermapreduceなどの標準ストリーム操作を呼び出して返すだけで、いくつかの複雑な操作を行うことができます。外部データを必要とするもの例えば、filterAdjacentDuplicatesreplaceNthElementは、このように実装することができます

public static <T> Stream<T> filterAdjacentDupes(Stream<T> stream) { 
    AtomicReference<T> last = new AtomicReference<>(); 
    return stream.filter(t -> ! t.equals(last.getAndSet(t))); 
} 

public static <T> Stream<T> replaceNthElement(Stream<T> stream, int n, T repl) { 
    AtomicInteger count = new AtomicInteger(); 
    return stream.map(t -> count.incrementAndGet() == n ? repl : t); 
} 

使用例:コメントで述べたように

List<String> lst = Arrays.asList("foo", "bar", "bar", "bar", "blub", "foo"); 
replaceNthElement(filterAdjacentDupes(lst.stream()), 3, "BAR").forEach(System.out::println); 
// Output: foo bar BAR foo 

しかし、これはストリームAPIを使用することになっているか、実際にはありません。特に、これら2つのような操作は、並列ストリームが与えられると失敗します。

+2

入力ストリームがパラレルの場合、 'filterAdjacentDupes'と' replaceNthElement'の両方が壊れていることに注意してください。 – Tunaki

+2

ストリームのドキュメンテーションは、ステートフルな非終端操作に対してアドバイスすることに注意してください。 (この回答のように)そのアドバイスを無視することは可能ですが、結果は言語によって定義されていません。したがって、これがすべての実装または将来のバージョンで動作するという保証はありません。 – sprinter

+0

これはおそらく完璧ではないことに同意しますが、OPの要件には適切かもしれません。しかし、警告をありがとう。 –

4

例として使用する複雑な操作はすべて、ストリーム内の他の要素に応じて、ストリーム内の1つの要素に対する操作のパターンに従います。 Javaストリームは、コレクションや縮小なしでこれらのタイプの操作を許可しないように特別に設計されています。ストリーム操作では他のメンバーに直接アクセスすることはできません。一般に、副作用のある非終端操作は悪い考えです。

Streamのjavadocから、次のいくつかの表面的な類似性を保有する一方で、

コレクションとストリームは、異なる目的を持っています。コレクションは主に、要素の効率的な管理とアクセスに関係しています。対照的に、ストリームは要素に直接アクセスしたり操作したりする手段を提供せず、代わりにそのソースと、そのソースで集約して実行される計算操作を宣言的に記述することに関係します。

具体的に:

ほとんどのストリーム操作は、ユーザー指定の振る舞いを記述するパラメータを受け入れ...正しい動作を維持するには、これらの動作パラメータ:

は干渉しないでください(ストリームソースは変更されません)。多くの場合、 はステートレスでなければなりません(その結果は、ストリームパイプラインの実行中に変更される可能性のある状態に依存してはいけません)。ストリーム操作に行動パラメーターがステートフルである場合に

ストリームパイプラインの結果が非決定的か間違っている可能性があります。ステートフルラムダ(又は適切な機能インタフェースを実装する他のオブジェクト)は、その結果、任意の状態に依存するストリームパイプラインの実行中に変更される可能性があり

itermediateと端末ステートレスとステートフル動作のすべて複雑さが十分であるものですhttps://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.htmlおよびhttp://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html

この方法には長所と短所があります。重要な利点は、ストリームの並列処理を可能にすることです。重要な欠点は、他の言語では簡単な操作(ストリームの3番目の要素をスキップするなど)はJavaでは困難であることです。

ストリーム操作のビヘイビアパラメータがステートレスであるべきであるというアドバイスを無視する多くのコード(SOの回答を含む)があります。動作させるには、このコードは言語仕様で定義されていないJava実装の動作に依存します。つまり、ストリームは順番に処理されます。 はありません。は、Java処理要素の実装を逆順またはランダムな順序で停止します。そのような実装は、ステートフルなストリーム操作が直ちに異なる振る舞いをするようにします。ステートレスな操作はまったく同じように動作し続けます。要約すると、ステートフルなオペレーションは、の実装ではなく、Javaのの実装の詳細に依存しています。

安全なステートフルな中間操作を行うことも可能です。彼らは、要素が処理される順序に特に依存しないように設計する必要があります。 Stream.distinctおよびStream.sortedがこれの良い例です。彼らは働くために状態を維持する必要がありますが、要素が処理される順序に関係なく機能するように設計されています。

質問に答えるために、これらのタイプの操作はJavaでは可能ですが、単純で安全ではありません(前の段落で説明した理由で)。私はリダクションやコレクションを使用することを提案しています(Tagir Valeevの答えを参照)。スプライテータを使用して新しいストリームを作成することをお勧めします。あるいは、従来の反復を使用します。

+0

*例として使用する複雑な操作はすべて、ストリーム内の他の要素に応じて、ストリーム内の1つの要素に対する操作のパターンに従います*要素の「置き換え」は、ステートレスマップ操作として実装できます。 * Javaストリームは、コレクションやリダクションなしでこれらのタイプの操作を許可しないように特別に設計されています*標準ストリームAPIは 'distinct()'や 'sorted() 'のようなステートフルな操作を提供します。別の – shmosel

+0

@shmosel https://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.htmlのすべての詳細を再解読しないように、私の答えを簡略化しました。ステートフルな中間操作が終了することは間違いありません。その文書に記載されているように、ステートフルな操作(ソート済み)は、動作するためにストリーム全体を収集する必要があるかもしれない。それは私が「集団や減少なし」という意味のものです。私は少し明確にするために私の答えを編集します。 – sprinter

1

これを行うには正しい方法ではありませんが、自分でSpliteratorを書きます。一般的なアルゴリズムは以下の通りです:

  1. は、いくつかの追加の操作を行って、おそらく進める際に、既存のものの要素を消費する可能性がある独自のSpliteratorを書くstream.spliterator()
  2. を使用して、既存のストリームSpliteratorしてください。
  3. .onClose(stream::close)のような元のストリームにStreamSupport.stream(spliterator, stream.isParallel())
  4. 委任close()呼び出しを介して、あなたのspliteratorに基づいて新しいストリームを作成します。

よく平行化する優れたスプライテータを書くことは、しばしば非常に重要ではありません。しかし、並列化を気にしなければ、より簡単なサブクラスAbstractSpliteratorを作成することができます。ここで指定した位置にある要素を削除し、新たなストリーム操作を作成する方法の例です:

public static <T> Stream<T> removeAt(Stream<T> src, int idx) { 
    Spliterator<T> spltr = src.spliterator(); 
    Spliterator<T> res = new AbstractSpliterator<T>(Math.max(0, spltr.estimateSize()-1), 
      spltr.characteristics()) { 
     long cnt = 0; 

     @Override 
     public boolean tryAdvance(Consumer<? super T> action) { 
      if(cnt++ == idx && !spltr.tryAdvance(x -> {})) 
       return false; 
      return spltr.tryAdvance(action); 
     } 
    }; 
    return StreamSupport.stream(res, src.isParallel()).onClose(src::close); 
} 

これは、最小限の実装であり、より良いパフォーマンスと並列性を示すように改善することができます。

私のStreamExライブラリでは、headTailを介してこのようなカスタムストリーム操作の追加を簡素化しようとしました。ここでStreamExを使用して同じことをする方法は次のとおりです。

public static <T> StreamEx<T> removeAt(StreamEx<T> src, int idx) { 
    // head is the first stream element 
    // tail is the stream of the rest elements 
    // want to remove first element? ok, just remove tail 
    // otherwise call itself with decremented idx and prepend the head element to the result 
    return src.headTail(
     (head, tail) -> idx == 0 ? tail : removeAt(tail, idx-1).prepend(head)); 
} 

あなたもchain()方法で連鎖をサポートすることができます:

public static <T> Function<StreamEx<T>, StreamEx<T>> removeAt(int idx) { 
    return s -> removeAt(s, idx); 
} 

使用例:

StreamEx.of("Java 8", "Stream", "API", "is", "not", "great") 
     .chain(removeAt(4)).forEach(System.out::println); 

が最後にさえなしheadTailがあることに注意してくださいStreamExを使用して問題を解決するいくつかの方法。あなたが増えてジッパーがあり、特定のインデックスに削除するには、このようにインデックスをフィルタリングし、ドロップ:

StreamEx.of(stream) 
     .zipWith(IntStreamEx.ints().boxed()) 
     .removeValues(pos -> pos == idx) 
     .keys(); 

collapse方法を捧げていますが、隣接する繰り返しを折りたたむには(それも非常によく並列化!):

StreamEx.of(stream).collapse(Object::equals); 
0

tobias_kの答えと考え方を構築してthis question/update 2で表現すると、ローカル変数をキャプチャする適切な述語関数とマップ関数を返すことがあります。 (これらの関数は結果としてステートフルであり、ストリームには理想的ではありませんが、ストリームAPIのdistinct()メソッドはおそらくステートフルです)。 - あなたは最後のものを削除できる要素の数がわからない場合は、ストリームは、複数のスレッドによって処理することができ

public class Foo { 
    public static void run() { 
     List<String> lst = Arrays.asList("foo", "bar", "bar", "bar", "blub", "foo"); 
     lst.stream() 
       .filter(Foo.filterAdjacentDupes()) 
       .map(Foo.replaceNthElement(3, "BAR")) 
       .forEach(System.out::println); 
     // Output: foo bar BAR foo 
    } 

    public static <T> Predicate<T> filterAdjacentDupes() { 
     final AtomicReference<T> last = new AtomicReference<>(); 
     return t -> ! t.equals(last.getAndSet(t)); 
    } 

    public static <T> UnaryOperator<T> replaceNthElement(int n, T repl) { 
     final AtomicInteger count = new AtomicInteger(); 
     return t -> count.incrementAndGet() == n ? repl : t; 
    } 
} 
関連する問題