2016-09-29 6 views
0

私はKafkaによってキーでソートされたデータを自分のSpark Streamingパーティションに持っています。つまり、あるノードで見つかったキーは他のノードでは見つかりません。おそらくcombineByKeyを使って、Spark Streamingのパーティション内でのみ `reduce`する方法はありますか?

状態エンジンとしてredisとそのincrby(increment by)コマンドを使用し、redisに送信されるリクエスト数を減らしたいと思っています。各ワーカーノードでワードカウントを部分的に減らしたいと思いますそれだけで。 (キーは、単語数から自分の機能を得るためのtag + timestampです)。 シャッフルを避け、redisがワーカーノード間でデータを追加できるようにしたいと思います。

データが正常にワーカーノードに分割されていることを確認しても、.reduce(_ + _)(Scalaの構文)は、HashPartitionerが自分のデータをランダムなノードを追加してそこに追加します。

スパークストリーミングでScalaのシャッフルステップを起動せずに、各パーティショナーで簡単な単語数削減を書くにはどうすればよいですか?

注意DStreamオブジェクトには、一部のRDDメソッドがありません。これはtransformメソッドでのみ使用できます。

combineByKeyを使用することができるようです。 mergeCombiners()のステップをスキップして、蓄積されたタプルをそのまま残しておきたいと思います。 著書「学習スパークは」enigmatically言う:我々は我々のデータは、そこから利益を得ないであろうことを知っていれば我々はcombineByKey()にマップ側の集約を無効にすることができ

。たとえば、groupByKey()は集計関数(リストに追加)がスペースを節約しないため、map-side集計を無効にします。マップ側の組み合わせを無効にしたい場合は、パーティショナーを指定する必要があります。今のところ、rdd.partitionerを渡すことで、ソースRDDでパーティショナーを使用することができます。

https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html

本は、その後、これを行う方法のための構文を供給しないし続け、また私は、これまでのGoogleとの任意の運を持っていました。

私が知る限り、パーティショナーはSpark StreamingのDStream RDD用に設定されていないので、データをシャッフルしないcombineByKeyにパーティショナーを与える方法はわかりません。

「地図側」は実際にはどういう意味ですか、mapSideCombine = falseにはどんな結果がありますか。

combineByKeyのためのScalaの実装はcombineByKeyWithClassTagため https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala ルックで見つけることができます。

解決策にカスタムパーティショナーが含まれている場合は、そのパーティショナーを着信DStreamに適用する方法のコードサンプルも含めてください。

+0

Kafkaから 'DStream'を読み込むと、KafkaパーティションとSparkパーティションの間に1:1の対応があります。しかし、同じワーカー・ノードが常に同じパーティションを読み取るという保証はありません。つまり、すべてのキーが単一のスパーク・ワーカー・ノードに終わる保証はありません。したがって、データをシャッフルする必要があります。 –

+0

ご意見ありがとうございます。私はredisとインクリメントby incrbyコマンドをステートエンジンとして使用する予定ですので、これは問題ではありません。各ワーカーノードにあるものだけを減らすことができれば、redisに送信されるメッセージを減らすのに役立ちます。どうやってやるの? – Andreas

答えて

0

mapPartitionsは、入力RDDのイテレータを1つのパーティションにマップし、出力RDD上のイテレータにマップする機能を使用して実行できます。

ワードカウントを実装するために、私はその後、出力RDDを形成するためにイテレータに変換されるmutable.hashMapを、初期化、カフカのキーを削除しfoldLeftを用いた高速イテレータのワードカウントを実行するために_._2にマップされます。

val myDstream = messages 
    .mapPartitions(it => 
    it.map(_._2) 
    .foldLeft(new mutable.HashMap[String, Int])(
     (count, key) => count += (key -> (count.getOrElse(key, 0) + 1)) 
    ).toIterator 
) 
関連する問題