遅延を回避し、プロセスを高速化するために、私はスパークストリーミングでスレッドプールを構築します。スパークストリーミングプログラムでスレッドプールを構築する
stream.foreachRDD(rdd=> {
rdd.foreachPartition { rddPartition => {
val client: Client = ESClient.getInstance.getClient
var num = Random.nextInt()
val threadPool: ExecutorService = Executors.newFixedThreadPool(5)
val confs = new Configuration()
rddPartition.foreach(x => {
threadPool.execute(new esThread(x._2, num, client, confs))
} ) } } } )
esThreadの機能は、まず、我々はelasticsearchを問い合わせることですし、我々は最終的に我々はHDFSに結果を書き込み、ESのクエリ結果を取得するには、次のようにメインプログラムが表示されます。しかし、HDFSの結果ファイルのデータはあまり多くありませんが、少し残っています。スパークストリーミングでスレッドプールを構築できるのだろうか。スパークストリーミングのスレッドプールでデータが欠落していますか?
ご協力いただきありがとうございます。
答えがありがとう、ありがとうございます.1。親スレッドはパーティションスレッドを参照していますか?2.スレッドスレッドがawaitTerminationをしていないということを意味しますか?threadPool awaitTermination私は得ることができるコンピューティングリソースの限界のために、私はスループットを向上させるために、より多くのスレッドを作成したい。 – mumumuyanyanyan