など。使用可能なすべてのエグゼキュータとそのそれぞれのマルチスレッディング能力の一覧を取得する必要があります(マルチスレッドの総容量ではなく、sc.defaultParallelismがすでにそれを処理しています)。タスクノード上のエグゼキュータのコア数を取得する方法は?
このパラメータはインプリメンテーション依存(YARNとspark-standaloneはコア割り当ての戦略が異なります)と状況(動的割り当てと長期ジョブ実行のために変動する可能性があるため)です。私はこれを推定するために他の方法を使用することはできません。分散変換でSpark APIを使用してこの情報を取得する方法はありますか?多数のパーティション(>> defaultParallelism)と1段ジョブを実行し、数を数える)
1:(例:TaskContext、SparkEnv)
UPDATEはスパーク1.6に関しては、私は次の方法を試してみました各executorIDための独特のthreadIDs:
val n = sc.defaultParallelism * 16
sc.parallelize(n, n).map(v => SparkEnv.get.executorID -> Thread.currentThread().getID)
.groupByKey()
.mapValue(_.distinct)
.collect()
各スパークexecutorがオーバープロビジョニングスレッドプールを使用していますので、これはしかし、実際のマルチスレッド処理能力よりも高い推定につながります。
2)n = defaultParallesimを除いて、すべてのタスクで、リソースネゴシエーターが不均衡なシャーディングを起こさないように遅延を追加します(高速ノードはタスクを完了し、低速ノードが実行を開始する前にさらに要求します)。
val n = sc.defaultParallelism
sc.parallelize(n, n).map{
v =>
Thread.sleep(5000)
SparkEnv.get.executorID -> Thread.currentThread().getID
}
.groupByKey()
.mapValue(_.distinct)
.collect()
それはほとんどの時間を動作しますが、必要以上に遅く、非常に不均衡なクラスタまたはタスクの投機によって破壊することができます。
3)私はこれを試していません:BlockManager.numUsableCoresを読むためにJavaのリフレクションを使用してください。これは明らかに安定した解決策ではなく、内部実装はいつでも変更される可能性があります。
より良いものを見つけたら教えてください。
ありがとうポール、これはスカラーのため、夜遅く投稿して捜査を書き留めませんでした。後で追加されます – tribbloid
@Paulが更新されました。十分ですか? – tribbloid
それよりもずっとよく見えます。 – Paul