8

など。使用可能なすべてのエグゼキュータとそのそれぞれのマルチスレッディング能力の一覧を取得する必要があります(マルチスレッドの総容量ではなく、sc.defaultParallelismがすでにそれを処理しています)。タスクノード上のエグゼキュータのコア数を取得する方法は?

このパラメータはインプリメンテーション依存(YARNとspark-standaloneはコア割り当ての戦略が異なります)と状況(動的割り当てと長期ジョブ実行のために変動する可能性があるため)です。私はこれを推定するために他の方法を使用することはできません。分散変換でSpark APIを使用してこの情報を取得する方法はありますか?多数のパーティション(>> defaultParallelism)と1段ジョブを実行し、数を数える)

1:(例:TaskContext、SparkEnv)

UPDATEはスパーク1.6に関しては、私は次の方法を試してみました各executorIDための独特のthreadIDs:

val n = sc.defaultParallelism * 16 
sc.parallelize(n, n).map(v => SparkEnv.get.executorID -> Thread.currentThread().getID) 
.groupByKey() 
.mapValue(_.distinct) 
.collect() 

各スパークexecutorがオーバープロビジョニングスレッドプールを使用していますので、これはしかし、実際のマルチスレッド処理能力よりも高い推定につながります。

2)n = defaultParallesimを除いて、すべてのタスクで、リソースネゴシエーターが不均衡なシャーディングを起こさないように遅延を追加します(高速ノードはタスクを完了し、低速ノードが実行を開始する前にさらに要求します)。

val n = sc.defaultParallelism 
sc.parallelize(n, n).map{ 
    v => 
    Thread.sleep(5000) 
    SparkEnv.get.executorID -> Thread.currentThread().getID 
} 
.groupByKey() 
.mapValue(_.distinct) 
.collect() 

それはほとんどの時間を動作しますが、必要以上に遅く、非常に不均衡なクラスタまたはタスクの投機によって破壊することができます。

3)私はこれを試していません:BlockManager.numUsableCoresを読むためにJavaのリフレクションを使用してください。これは明らかに安定した解決策ではなく、内部実装はいつでも変更される可能性があります。

より良いものを見つけたら教えてください。

+0

ありがとうポール、これはスカラーのため、夜遅く投稿して捜査を書き留めませんでした。後で追加されます – tribbloid

+1

@Paulが更新されました。十分ですか? – tribbloid

+0

それよりもずっとよく見えます。 – Paul

答えて

2

スパークレストAPIでかなり簡単です。

val applicationId = spark.sparkContext.applicationId 

UI URL:あなたは、アプリケーションIDを取得する必要があります

val baseUrl = spark.sparkContext.uiWebUrl 

とクエリ:

val url = baseUrl.map { url => 
    s"${url}/api/v1/applications/${applicationId}/executors" 
} 

(すでにhttps://alvinalexander.com/scala/scala-rest-client-apache-httpclient-restful-clientsから適応スパークの依存関係、中)のApache HTTPライブラリーと:

import org.apache.http.impl.client.DefaultHttpClient 
import org.apache.http.client.methods.HttpGet 
import scala.util.Try 

val client = new DefaultHttpClient() 

val response = url 
    .flatMap(url => Try{client.execute(new HttpGet(url))}.toOption) 
    .flatMap(response => Try{ 
    val s = response.getEntity().getContent() 
    val json = scala.io.Source.fromInputStream(s).getLines.mkString 
    s.close 
    json 
    }.toOption) 

とjson4s:限り、あなたは、あなたが任意のタスクから同じことを行うことができ、外部接続に手やオープンUIポートでアプリケーションIDとUIのURLを保つよう

import org.json4s._ 
import org.json4s.jackson.JsonMethods._ 
implicit val formats = DefaultFormats 

case class ExecutorInfo(hostPort: String, totalCores: Int) 

val executors: Option[List[ExecutorInfo]] = response.flatMap(json => Try { 
    parse(json).extract[List[ExecutorInfo]] 
}.toOption) 

+0

お返事ありがとうございました!数週間待機しましょう。慎重に使用しないとアンチパターンになる可能性があります。スパークマスターは何千ものノードを管理し、そのUIはあまり効率的ではないデータによってDDoSされるように設計されていませんシリアライゼーションプロトコル。 – tribbloid

2

ウェブUIと同様の方法でSparkListenerを実装しようとします。 This codeが参考になります。

+0

良いアイデア! Spark 1.6では、これはExecutorInfoが読み取り可能な唯一の場所なので、試してみる価値があります。唯一の欠点は、リスナーがドライバでのみ起動されるため、実行がローカルではないことです。 – tribbloid

関連する問題