2016-12-30 13 views
2

spark version 1.6.3を使用し、yarn version 2.7.1.2.3にはHDP-2.3.0.0-2557が付いています。私が使用しているHDPバージョンでは、スパークバージョンが古すぎるので、別のスパークを遠隔に糸モードとして使用することをお勧めします。Connectionが拒否されたため、YARNのSparkアプリケーションがFetchFailedExceptionで失敗するのはなぜですか?

ここで私はsparkシェルを実行します。

./spark-shell --master yarn-client 

すべてがsparkContextsqlContextが初期化され、初期化され、罰金に見えます。ハイブテーブルにもアクセスできます。しかし、場合によっては、ブロックマネージャーに接続しようとすると問題になります。

私は専門家ではありませんが、私が糸モードで実行している間、ブロックマネージャーが私の糸クラスターで動作していると思います。私にとって初めてのネットワーク上の問題だったようで、ここでそれを聞きたくありませんでした。しかし、これは私がまだ理解できなかったいくつかのケースで起こります。だから、これはネットワーク上の問題ではないかもしれないと思う。

ここにコードがあります。

def df = sqlContext.sql("select * from city_table") 

以下のコードは正常です。

df.limit(10).count() 

サイズは10以上ですが、これは実行ごとに変わります。

df.count() 

これにより例外が発生します。

6/12/30 07:31:04 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 2 is 157 bytes 
16/12/30 07:31:19 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID 8, 172.27.247.204): FetchFailed(BlockManagerId(2, 172.27.247.204, 56093), shuffleId=2, mapId=0, reduceId=0, message= 
org.apache.spark.shuffle.FetchFailedException: Failed to connect to /172.27.247.204:56093 
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:323) 
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:300) 
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:51) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) 
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:504) 
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init>(TungstenAggregationIterator.scala:686) 
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95) 
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.io.IOException: Failed to connect to /172.27.247.204:56093 
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:216) 
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167) 
    at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:90) 
    at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) 
    at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43) 
    at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    ... 3 more 
Caused by: java.net.ConnectException: Connection refused: /172.27.247.204:56093 
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) 
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) 
    at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224) 
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) 
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) 
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) 
    ... 1 more 

) 

シャッフルするタスクが複数ある場合、これが起こっていることがわかりました。

問題は何か、パフォーマンスの問題か、私が見ることができなかった別のネットワークの問題ですか?シャッフルって何?それがネットワークの問題であれば、それは私のスパークと糸の間か、糸自体の問題ですか?

ありがとうございます。

編集:

ログには何かが表示されます。

時には
17/01/02 06:45:17 INFO DAGScheduler: Executor lost: 2 (epoch 13) 
17/01/02 06:45:17 INFO BlockManagerMasterEndpoint: Trying to remove executor 2 from BlockManagerMaster. 
17/01/02 06:45:17 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(2, 172.27.247.204, 51809) 
17/01/02 06:45:17 INFO BlockManagerMaster: Removed 2 successfully in removeExecutor 
17/01/02 06:45:17 INFO YarnScheduler: Removed TaskSet 3.0, whose tasks have all completed, from pool 
17/01/02 06:45:24 INFO BlockManagerMasterEndpoint: Registering block manager 172.27.247.204:51809 with 511.1 MB RAM, BlockManagerId(2, 172.27.247.204, 51809) 

、別のブロックマネージャの動作に再試行することを、しかし、デフォルトとして4倍の最大許容数を超えているので、それはほとんどの時間を終了することはありません。

編集2:

糸はそのことについて本当に静かですが、私は、これはネットワークの問題は、私がどこかに問題を繰り返す可能性があると思います。

このスパークは、HDP環境の外部に配置されています。ヤーンにアプリケーションを発射すると、ヤーンはブロックマネージャーとエグゼクティブをスパークドライバーに知らせます。エグゼキュータは、HDPクラスタ内のデータノードであり、プライベートネットワークに異なるIPを持っています。しかし、クラスターの外でスパーク・ドライバーに通知することになると、すべてのエグゼキューターに対して同じIPアドレスと単一のIPアドレスが与えられます。これは、HDPクラスタ内のすべてのノードがルータ上で同じIPを使用しているためです。 IPアドレスが150.150.150.150で、スパークドライバが接続してそのエグゼキュータから何か質問する必要がある場合、このIPで試してみます。しかし、このIPは、個々のデータノードIPではなく、実際にはクラスタ全体の外部IPアドレスです。

エグゼキュータ(ブロックマネージャ)についてプライベートIPを使って糸に通知する方法はありますか?彼らのプライベートIPには、このスパークドライバが取り組んでいるマシンからもアクセスできるからです。

答えて

2

FetchFailedExceptionリデューサタスク(ShuffleDependencyの場合)がシャッフルブロックをフェッチできなかった場合に例外がスローされます。

Caused by: java.io.IOException: Failed to connect to /172.27.247.204:56093 

エグゼOOMed(=のOutOfMemoryErrorがスローされた)または糸が過度のメモリ使用量にそれを殺すことを決めたことができます:それは通常(シャッフルブロックのBlockManager付き)executorが死んだので、例外があることを意味します。

yarn logsコマンドを使用してSparkアプリケーションのログを確認し、問題の根本原因を調べる必要があります。

yarn logs -applicationId <application ID> [options] 

Sparkアプリケーションのエグゼキュータのステータスは、Web UIの[Executors]タブで確認できます。

スパークは通常、影響を受けるタスクを再実行してFetchFailedExceptionから回復します。 Web UIを使用してSparkアプリケーションの動作を確認してください。 FetchFailedExceptionは一時的なメモリ "hiccup"が原因である可能性があります。

+0

糸ログ、同じ接続問題ログには何も表示されていません。私はおそらくヤーンがBlockManagerのログを別の場所に保存していて、ログは何とか無効になっているかもしれません。 BlockManager側からのログがないので、成功しています。 現時点ではspark uiにアクセスできません。それは私が修正しようとしている別の問題です。プロキシが機能しません。 –

+0

ExecutorLostイベントを探します。 –

+0

私はそれについてのログを見ることができませんでした。糸は何とかそれを静かにしてくれると思います。 –

関連する問題