私はKafkaによってキーでソートされたデータを自分のSpark Streamingパーティションに持っています。つまり、あるノードで見つかったキーは他のノードでは見つかりません。おそらくcombineByKeyを使って、Spark Streamingのパーティション内でのみ `reduce`する方法はありますか?
状態エンジンとしてredisとそのincrby
(increment by)コマンドを使用し、redisに送信されるリクエスト数を減らしたいと思っています。各ワーカーノードでワードカウントを部分的に減らしたいと思いますそれだけで。 (キーは、単語数から自分の機能を得るためのtag + timestampです)。 シャッフルを避け、redisがワーカーノード間でデータを追加できるようにしたいと思います。
データが正常にワーカーノードに分割されていることを確認しても、.reduce(_ + _)
(Scalaの構文)は、HashPartitionerが自分のデータをランダムなノードを追加してそこに追加します。
スパークストリーミングでScalaのシャッフルステップを起動せずに、各パーティショナーで簡単な単語数削減を書くにはどうすればよいですか?
注意DStreamオブジェクトには、一部のRDDメソッドがありません。これはtransform
メソッドでのみ使用できます。
combineByKey
を使用することができるようです。 mergeCombiners()
のステップをスキップして、蓄積されたタプルをそのまま残しておきたいと思います。 著書「学習スパークは」enigmatically言う:我々は我々のデータは、そこから利益を得ないであろうことを知っていれば我々はcombineByKey()にマップ側の集約を無効にすることができ
。たとえば、groupByKey()は集計関数(リストに追加)がスペースを節約しないため、map-side集計を無効にします。マップ側の組み合わせを無効にしたい場合は、パーティショナーを指定する必要があります。今のところ、rdd.partitionerを渡すことで、ソースRDDでパーティショナーを使用することができます。
https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html
本は、その後、これを行う方法のための構文を供給しないし続け、また私は、これまでのGoogleとの任意の運を持っていました。
私が知る限り、パーティショナーはSpark StreamingのDStream RDD用に設定されていないので、データをシャッフルしないcombineByKeyにパーティショナーを与える方法はわかりません。
「地図側」は実際にはどういう意味ですか、mapSideCombine = false
にはどんな結果がありますか。
combineByKey
のためのScalaの実装はcombineByKeyWithClassTag
ため https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala ルックで見つけることができます。
解決策にカスタムパーティショナーが含まれている場合は、そのパーティショナーを着信DStreamに適用する方法のコードサンプルも含めてください。
Kafkaから 'DStream'を読み込むと、KafkaパーティションとSparkパーティションの間に1:1の対応があります。しかし、同じワーカー・ノードが常に同じパーティションを読み取るという保証はありません。つまり、すべてのキーが単一のスパーク・ワーカー・ノードに終わる保証はありません。したがって、データをシャッフルする必要があります。 –
ご意見ありがとうございます。私はredisとインクリメントby incrbyコマンドをステートエンジンとして使用する予定ですので、これは問題ではありません。各ワーカーノードにあるものだけを減らすことができれば、redisに送信されるメッセージを減らすのに役立ちます。どうやってやるの? – Andreas