2017-11-12 5 views
0

遅延を回避し、プロセスを高速化するために、私はスパークストリーミングでスレッドプールを構築します。スパークストリーミングプログラムでスレッドプールを構築する

stream.foreachRDD(rdd=> { 
    rdd.foreachPartition { rddPartition => { 
    val client: Client = ESClient.getInstance.getClient 
    var num = Random.nextInt() 
    val threadPool: ExecutorService = Executors.newFixedThreadPool(5) 
    val confs = new Configuration() 
    rddPartition.foreach(x => { 
     threadPool.execute(new esThread(x._2, num, client, confs)) 
    }  )  }  } } ) 

esThreadの機能は、まず、我々はelasticsearchを問い合わせることですし、我々は最終的に我々はHDFSに結果を書き込み、ESのクエリ結果を取得するには、次のようにメインプログラムが表示されます。しかし、HDFSの結果ファイルのデータはあまり多くありませんが、少し残っています。スパークストリーミングでスレッドプールを構築できるのだろうか。スパークストリーミングのスレッドプールでデータが欠落していますか?

ご協力いただきありがとうございます。

答えて

0

パーティションはすでに別のスレッドで処理されており、前のパーティションが終了するまでストリームは次のバッチに進みません。だからあなたに何かを買う可能性は低く、リソース使用量の追跡はあまり透明ではありません。

同時に、あなたのコードがこの時点で実装されているので、データが失われる可能性があります。 threadPoolawaitTerminationではないので、すべてのデータが処理される前に親スレッドが終了することがあります。

全体的には、有用なアプローチではありません。スループットを増やしたい場合は、パーティション数とコンピューティングリソースの量を調整する必要があります。

+0

答えがありがとう、ありがとうございます.1。親スレッドはパーティションスレッドを参照していますか?2.スレッドスレッドがawaitTerminationをしていないということを意味しますか?threadPool awaitTermination私は得ることができるコンピューティングリソースの限界のために、私はスループットを向上させるために、より多くのスレッドを作成したい。 – mumumuyanyanyan