spark version 1.6.3
を使用し、yarn version 2.7.1.2.3
にはHDP-2.3.0.0-2557
が付いています。私が使用しているHDPバージョンでは、スパークバージョンが古すぎるので、別のスパークを遠隔に糸モードとして使用することをお勧めします。Connectionが拒否されたため、YARNのSparkアプリケーションがFetchFailedExceptionで失敗するのはなぜですか?
ここで私はsparkシェルを実行します。
./spark-shell --master yarn-client
すべてがsparkContext
がsqlContext
が初期化され、初期化され、罰金に見えます。ハイブテーブルにもアクセスできます。しかし、場合によっては、ブロックマネージャーに接続しようとすると問題になります。
私は専門家ではありませんが、私が糸モードで実行している間、ブロックマネージャーが私の糸クラスターで動作していると思います。私にとって初めてのネットワーク上の問題だったようで、ここでそれを聞きたくありませんでした。しかし、これは私がまだ理解できなかったいくつかのケースで起こります。だから、これはネットワーク上の問題ではないかもしれないと思う。
ここにコードがあります。
def df = sqlContext.sql("select * from city_table")
以下のコードは正常です。
df.limit(10).count()
サイズは10以上ですが、これは実行ごとに変わります。
df.count()
これにより例外が発生します。
6/12/30 07:31:04 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 2 is 157 bytes
16/12/30 07:31:19 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID 8, 172.27.247.204): FetchFailed(BlockManagerId(2, 172.27.247.204, 56093), shuffleId=2, mapId=0, reduceId=0, message=
org.apache.spark.shuffle.FetchFailedException: Failed to connect to /172.27.247.204:56093
at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:323)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:300)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:51)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:504)
at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init>(TungstenAggregationIterator.scala:686)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Failed to connect to /172.27.247.204:56093
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:216)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167)
at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:90)
at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
... 3 more
Caused by: java.net.ConnectException: Connection refused: /172.27.247.204:56093
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
... 1 more
)
シャッフルするタスクが複数ある場合、これが起こっていることがわかりました。
問題は何か、パフォーマンスの問題か、私が見ることができなかった別のネットワークの問題ですか?シャッフルって何?それがネットワークの問題であれば、それは私のスパークと糸の間か、糸自体の問題ですか?
ありがとうございます。
編集:
ログには何かが表示されます。
時には17/01/02 06:45:17 INFO DAGScheduler: Executor lost: 2 (epoch 13)
17/01/02 06:45:17 INFO BlockManagerMasterEndpoint: Trying to remove executor 2 from BlockManagerMaster.
17/01/02 06:45:17 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(2, 172.27.247.204, 51809)
17/01/02 06:45:17 INFO BlockManagerMaster: Removed 2 successfully in removeExecutor
17/01/02 06:45:17 INFO YarnScheduler: Removed TaskSet 3.0, whose tasks have all completed, from pool
17/01/02 06:45:24 INFO BlockManagerMasterEndpoint: Registering block manager 172.27.247.204:51809 with 511.1 MB RAM, BlockManagerId(2, 172.27.247.204, 51809)
、別のブロックマネージャの動作に再試行することを、しかし、デフォルトとして4倍の最大許容数を超えているので、それはほとんどの時間を終了することはありません。
編集2:
糸はそのことについて本当に静かですが、私は、これはネットワークの問題は、私がどこかに問題を繰り返す可能性があると思います。
このスパークは、HDP環境の外部に配置されています。ヤーンにアプリケーションを発射すると、ヤーンはブロックマネージャーとエグゼクティブをスパークドライバーに知らせます。エグゼキュータは、HDPクラスタ内のデータノードであり、プライベートネットワークに異なるIPを持っています。しかし、クラスターの外でスパーク・ドライバーに通知することになると、すべてのエグゼキューターに対して同じIPアドレスと単一のIPアドレスが与えられます。これは、HDPクラスタ内のすべてのノードがルータ上で同じIPを使用しているためです。 IPアドレスが150.150.150.150
で、スパークドライバが接続してそのエグゼキュータから何か質問する必要がある場合、このIPで試してみます。しかし、このIPは、個々のデータノードIPではなく、実際にはクラスタ全体の外部IPアドレスです。
エグゼキュータ(ブロックマネージャ)についてプライベートIPを使って糸に通知する方法はありますか?彼らのプライベートIPには、このスパークドライバが取り組んでいるマシンからもアクセスできるからです。
糸ログ、同じ接続問題ログには何も表示されていません。私はおそらくヤーンがBlockManagerのログを別の場所に保存していて、ログは何とか無効になっているかもしれません。 BlockManager側からのログがないので、成功しています。 現時点ではspark uiにアクセスできません。それは私が修正しようとしている別の問題です。プロキシが機能しません。 –
ExecutorLostイベントを探します。 –
私はそれについてのログを見ることができませんでした。糸は何とかそれを静かにしてくれると思います。 –