編集:応答の瞬間はpyspark
だったことはわかりませんでした。しかし、私はそれを適応させる考えとして残しておきます。
私は同様の問題を抱えていましたが、パフォーマンスを改善することはできましたが、それでも私にとって理想的な解決策ではありませんでした。多分あなたのために働くかもしれません。
アイデアは、多くの小さなRDD(ユーザーIDごとに新しいもの)でRDDを分解し、アレイに保存してから、それぞれの "サブRDD"に対して処理機能(あなたのケースではクラスタリング)を呼び出していました。提案されたコードは(コメント欄で説明)以下の通りである:
// A case class just to use as example
case class MyClass(userId: Long, value: Long, ...)
// A Scala local array with the user IDs (Could be another iterator, such as List or Array):
val userList: Seq[Long] = rdd.map{ _.userId }.distinct.collect.toSeq // Just a suggestion!
// Now we can create the new rdds:
val rddsList: Seq[RDD[MyClass]] = userList.map {
userId => rdd.filter({ item: MyClass => item.userId == userId })
}.toSeq
// Finally, we call the function we want for each RDD, saving the results in a new list.
// Note the ".par" call, which is used to start the expensive execution for multiple RDDs at the same time
val results = rddsList.par.map {
r => myFunction(r)
}
私はこれが大体あなたの最初のオプションと同じですけど、.par
コールを使用することによって、私はパフォーマンスを向上させることができました。
この呼び出しは、rddsList
オブジェクトをParSeq
オブジェクトに変換します。この新しいScalaオブジェクトは並列計算を可能にするので、マップ関数は複数のRDDに対して同時にmyFunction(r)
を呼び出してパフォーマンスを向上させるのが理想的です。
並列コレクションの詳細については、Scala Documentationをご確認ください。
どのようなタイプのuserListですか?アレイ?私は 'par'メソッドを見つけようとしています –
userListはスカラローカルイテレータ(配列、リスト、Seq、...) –
まあ、Spark APIの部分ですか?それはネイティブのScala型にどのように結びついていますか? 私がドキュメントで見つけることができる最も近いものはvar rdd = sc.parallelize(data)です。それはあなたがここで言うことと同じことをしません。これについてのdocページを指摘できますか? –