2016-04-03 24 views
0

私は分散マップドローイングプログラムを次のデザインを持つJava 8 map-reduceフレームワークを使ってJavaで書こうとしています:Javaのソケットを介して(パラレル)ストリームを送信

クライアント3つのマッパー(それぞれ異なるマシン/スタンドアロンJavaアプリケーション)にデータを送信します。マッパーは、データのリストからparallelStream()を作成して、データを並行して処理します。

各マッパーは、parallelStreamで.map(...)を呼び出す必要があります。次に、マッピングされたデータを別のノードReducerに送信するという考え方です。

リデューサーはStreamを取得し、.reduce(...)を呼び出し、最後に.get()を呼び出してクライアントに返されます。

私のプログラムは、同じプログラムで.map(...).reduce(...).get()を呼び出すと機能しますが、私は別のレデューサーノードを持つことができます。

ソケットプログラミングで新しく、またJava 8を使用しているときに、 "java.io.NotSerializableException:java.util.stream.ReferencePipeline $ 3"をスローするため、ソケットを介してストリームを送信する際に問題があります。 WriteObjectでストリームを書き込もうとする瞬間です。

ここに進むにはどうすればよいですか?ストリームを別のものに変えて送信してから、Reducerノードでストリームを再びストリームにすることはできますか? ObjectOutputStreamよりもストリームを送信する方が良いでしょうか?

どのようなアイデアも非常に高く評価されています。どうもありがとうございました!

P .:ストリームはStream<Map<String, Integer>>です。

+4

ストリームを通常のコレクションまたは配列にダンプする必要があります。ストリーム自体は送信できません。それは計算であり、データではありません。 –

+1

それを明確にしていただきありがとうございます。私がJava 8の仕組みを理解していれば、map()。 .reduce()は同じマシンで同時に動作するはずですか? 私がやったことは、マップを作成するためにreduceとも呼ばれているのですか?またマップを3つのマップを結合する「減速機」に送りました。これは良い練習ですか? –

+0

あなたが 'Stream'を使って何をしているのかは、そのマシンにとってローカルであり、あなたがそのマシンで何をしたいのかを、あなたが他の場所に送る前に終了したはずです。 –

答えて

0

1つのアプローチは、データをソケットにプッシュするforEachを使用してマップノードを終了することです。コレクションが非常に大きく(または理論的に無限に)なる可能性がある場合、この手法はコレクション手法よりも優れています。スペース効率が良く、バッファリングされ、ダウンストリームノードは収集プロセスの完了を待ってアイドル状態ではありません。

次に、Spliterator(extends AbstractSpliterator)でreduceノードのソケットリーダーをラップします。 SpliteratorのtryAdvanceメソッドは、ソケットからデータを読み取り、提供された呼び出し元を介してストリームに利用可能にします。 tryAdvanceは、データがなくなるとfalseを返します(ストリーム終了マーカー、ソケットのストリーム終了、またはソケット例外)。 AbstractSpliterator.trySplitは、限られた並列性を実装しています。

Spliteratorの実装からストリームを構築するには、StreamSupport.stream(Spliterator spliterator、boolean parallel)を使用します。 reduce操作では、このストリームからデータを取得します。

あなたはソケットを保持することができ、ストリーム終わりのマーカーはメッセージ終わりのマーカーのようになります(パイプラインのバッチブタを思い出させます)。

関連する問題