2016-09-01 3 views
2

私は、たとえばApacheのスパークストリーミングアプリケーション提出してみてください:私は(:nc -lk 9999を別のコンソールで実行されている)、マスタIPとローカルポートを入力パラメータとしてNetworkWordCount Spark Streamingアプリケーションで「org.apache.spark.shuffle.FetchFailedException:接続に失敗しました」を修正するには?

/opt/spark/bin/spark-submit --class org.apache.spark.examples.streaming.NetworkWordCount --deploy-mode cluster --master yarn --driver-memory 2g --executor-memory 2g /opt/spark/examples/jars/spark-examples_2.11-2.0.0.jar 172.29.74.68 9999 

を。

そして、いつも私はエラーを取得:

WARN scheduler.TaskSetManager: Lost task 0.0 in stage 1.0 (TID 50, iws1): FetchFailed(BlockManagerId(2, iws2, 41569), shuffleId=0, mapId=19, reduceId=0, message= 
org.apache.spark.shuffle.FetchFailedException: Failed to connect to iws2/172.29.77.40:41569 
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:357) 
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:332) 
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:54) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) 
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) 
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) 
    at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:154) 
    at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50) 
    at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:85) 
    at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:85) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.io.IOException: Failed to connect to iws2/172.29.77.40:41569 
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228) 
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179) 
    at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:96) 
    at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) 
    at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43) 
    at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    ... 3 more 
Caused by: java.net.ConnectException: Connection refused: iws2/172.29.77.40:41569 
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) 
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) 
    at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224) 
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) 
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) 
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) 
    ... 1 more 

Full log here

例 "HdfsWordCountは、" 正常に動作します。他の "非ストリーミング"アプリケーション。

答えて

2

溶液を、socketTextStream方法にStorageLevel.MEMORY_ONLY_SERを追加(下記のように)spark-defaults.confを変更しyarn-site.xmlファイル内のハードウェアリソースを増加することでした。

火花defaults.confに

spark.core.connection.ack.wait.timeout 600s 
spark.default.parallelism 4 
spark.driver.memory 6g 
spark.executor.memory 10g 
spark.cores.max 4 
spark.executor.cores 2 
関連する問題