2016-07-27 4 views
4

私は最近Spark Streamingアプリをテストしています。ストレステストは約20000メッセージ/秒のメッセージサイズを取り込み、Spark Streamingが4秒ごとにバッチを読み取っているKafkaに200バイトから1Kの間で変化します。スパークステートフルストリーミングジョブが、長い稼働時間の後にS3へのチェックポイント時にハングアップする

私たちのSparkクラスタは、スタンドアロンのクラスタマネージャでバージョン1.6.1で動作し、私たちのコードにはScala 2.10.6が使用されています。

約15〜20時間運転、(40秒間隔で行われる)チェックポイントを開始しているエグゼキュータの1の後、次のスタックトレースで立ち往生しているし、完了したことがない:

java.net .SocketInputStream.socketRead0(ネイティブメソッド) java.net.SocketInputStream.socketRead(SocketInputStream.java:116) java.net.SocketInputStream.read(SocketInputStream.java:170) java.net.SocketInputStream.read(SocketInputStream.java :141) sun.security.ssl.InputRecord.readFully(InputRecord.java:465) sun.security.ssl.InputRecord.readV3Record(InputRecord.java:593) ) sun.security.ssl.InputRecord.read(InputRecord.java:532) sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:973) sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java: 1375) sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1403) sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1387) org.apache.http.conn.ssl.SSLSocketFactory.connectSocket (SSLSocketFactory.java:533) org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:401) org.apache.http.impl.conn.DefaultClientConnectionOperator.openConnection(DefaultClientConnectionOperator.java:177) org.apache.http.impl.conn.AbstractPoolEntry.open(AbstractPoolEntry.java:144) org.apache.http.impl.conn.AbstractPooledConnAdapter.open(AbstractPooledConnAdapter.java:131) org.apache.http.impl.client.DefaultRequestDirector.tryConnect(DefaultRequestDirector.java:610) org.apache.http.impl .client.DefaultRequestDirector.execute(DefaultRequestDirector.java:445) org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863) org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient .java:82) org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57) org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:326) org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:277) org.jets3t.service.impl.rest.httpclient.RestStorageService.performRestHead(RestStorageService.java:1038) org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectImpl(RestStorageService.java:2250) org.jets3t .service.impl.rest.httpclient.RestStorageService.getObjectDetailsImpl(RestStorageService.java:2179) org.jets3t.service.StorageService.getObjectDetails(StorageService.java:1120) org.jets3t.service.StorageService.getObjectDetails(StorageService.java :575) org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:174) sun.reflect.GeneratedMethodAccessor32.invoke(不明なソース) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43 ) java.lang.reflect.Method.invoke(Method.java:497) org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) org.apache.hadoop.io.retry.RetryInvocationHandler .invoke(RetryInvocationHandler.java:102) org.apache.hadoop.fs.s3native。$ Proxy18.retrieveMetadata(不明 出所) org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:472) org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1424) (ReliableCheckpointRDD.scala:136) org.apache.spark.rdd.ReliableCheckpointRDD $$ anonfun $ writeRDDToCheckpointDirectory $ 1.apply(ReliableCheckpointRDD.scala:136) org.apache.spark .rdd.ReliableCheckpointRDD $$ anonfun $ writeRDDToCheckpointDirectory $ 1.apply(ReliableCheckpointRDD.scala:136) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) org.apache.spark.scheduler.Task.run (Task.scala:89) org.apache.spark.executor.Executor $ TaskRunner .run(Executor.scala:214) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617) java.lang。 Thread.run(Thread.java:745)

貼着され、点火ドライバは、着信バッチの処理を継続することを拒否し、タスクを解放するまで処理できないキューに入れられたバッチの巨大なバックログを作成しているが"つまらない"。さらに、より

streaming-job-executor-0の下にドライバスレッドダンプを見ていることは明確にこのタスクが完了するのを待っていることを示しています

java.lang.Object.wait(ネイティブメソッド) java.lang.Object上位.wait(Object.java:502) org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73) org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:612) org。 apache.spark.SparkContext.runJob(SparkContext.scala:1832) org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) org.apache.spark.SparkCon text.runJob(SparkContext.scala:1922) org.apache.spark.rdd.ReliableCheckpointRDD $ .writeRDDToCheckpointDirectory(ReliableCheckpointRDD.scala:135) org.apache.spark.rdd.ReliableRDDCheckpointData.doCheckpoint(ReliableRDDCheckpointData.scala:58) (RDD.scala:1682) org。apache.spark.rdd.RDDCheckpointData.checkpoint apache.spark.rdd.RDD $$ anonfun $ doCheckpoint $ 1.apply(RDD.scala:1679) org.apache.spark.rdd.RDD $$ anonfun $ doCheckpoint $ 1.apply(RDD.scala:1679) org。 apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:150) org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1678) org.apache.spark.rdd.RDD $$ anonfun $ doCheck $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply(RDD.scala:1684) org.apache.spark.rdd.RDD $$ anonfun $ doCheckpoint $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply RDD.scala:1684)は scala.collection.immutable.List.foreach(List.scala:318)

は誰でも、このような問題を経験していますか?

+0

EC2でAWS EMRまたはカスタム管理対象クラスタを使用していますか? Hadoopクラスターの状態が時間とともに衰退するにつれ、EMRは長いストレスを伴う仕事には適していないようです。 https://media.amazonwebservices.com/AWS_Amazon_EMR_Best_Practices.pdf 27-28 – eliasah

+1

@eliasahいいえ、これはEMRではありません。それはSparkクラスタです。スタンドアロンのクラスタマネージャで手動でセットアップしました –

+0

http://stackoverflow.com/q/34879092/1560062と多少似ていますか? – zero323

答えて

2

org.jets3tで使用されているHttpClientライブラリのバグのためにソケットハングが発生します。この場合、SSLハンドシェイクは指定されたタイムアウトを使用しません。問題の詳細hereが見つかります。

このバグは、修正されたv4.5.1より下のHttpClientバージョンで再現されます。残念ながら、Spark 1.6.xはv4.3.2を使用していますが、これには修正は含まれていません。spark.speculation構成設定を経由して

  1. 利用スパークの投機メカニズム:

    は、私がこれまで考えてきた三つの可能な回避策があります。これは、まれに負荷がかかって再現されるので、ハングのエッジケースに役立ちます。これは、sparkがメディアンタスクの実行時間の良い印象を持たないストリーミングジョブの最初に間違いを引き起こす可能性があることに注意してください。ただし、目立った遅延を引き起こすものではありません。

    ドキュメントは言う:

    "真" に設定すると、タスクの投機的実行を行います。つまり、1つまたは複数のタスクが1つのステージでゆっくりと実行されている場合は、 が再起動されます。

    あなたは火花提出するフラグを供給することにより、それをオン:異なる設定の詳細については

    spark-submit \ 
    --conf "spark.speculation=true" \ 
    --conf "spark.speculation.multiplier=5" \ 
    

    を手動でのHttpClient V4.5.1を渡す

  2. Spark Configurationページを参照してください渡すことができますまたはそれ以上をSparksのクラスパスに渡すので、このJARをuber JARの前にロードすることができます。これは、Sparkでのクラスローディングプロセスがちょっと面倒なので少し難しいかもしれません。

    CP=''; for f in /path/to/httpcomponents-client-4.5.2/lib/*.jar; do CP=$CP$f:; done 
    SPARK_CLASSPATH="$CP" sbin/start-master.sh # on your master machine 
    SPARK_CLASSPATH="$CP" sbin/start-slave.sh 'spark://master_name:7077' 
    

    それとも単にspark-env.shSPARK_CLASSPATHにJARの特定のバージョンを更新します。これは、あなたがの線に沿って何かを行うことができますことを意味します。

  3. Spark 2.0.0に更新中です。 Sparkの新しいバージョンでは、この問題を解決するHttpClient v4.5.2を使用しています。

関連する問題