2016-05-06 6 views
0

私は数百GBのデータセット(約2B行)を扱っています。操作の1つは、RDDまたはスカラケースオブジェクト(倍精度、マップ、セットを含む)を単一のエンティティに縮小することです。最初は私の操作はgroupByKeyでしたが、遅くて高GCを行っていました。だから私はaggregateByKeyに変換して、後でreduceByKeyにも変換して、ユーザーの高いメモリ割り当て、shuffleのアクティビティ、groupByと出会った高いgcの問題を避けることを望んでいました。スパーク1.5.2シャッフル/シリアル化 - メモリ不足

アプリケーションリソース: 23GB exec mem + 4GBのオーバーヘッド。インスタンス20個、コア6個。 0.2から0.4

可用性クラスタリソース 10ノード、 読み出された入力データは、約20フィールドを有する設定されたジョブについて糸のための600ギガバイトの合計、32ギガバイト最大コンテナサイズ

2016-05-02 22:38:53,595 INFO [sparkDriver-akka.actor.default-dispatcher-14] org.apache.spark.MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 3 to hdn2.mycorp:45993 
2016-05-02 22:38:53,832 INFO [sparkDriver-akka.actor.default-dispatcher-14] org.apache.spark.storage.BlockManagerInfo: Removed broadcast_4_piece0 on 10.250.70.117:52328 in memory (size: 2.1 KB, free: 15.5 MB) 
2016-05-02 22:39:03,704 WARN [New I/O worker #5] org.jboss.netty.channel.DefaultChannelPipeline: An exception was thrown by a user handler while handling an exception event ([id: 0xa8147f0c, /10.250.70.110:48056 => /10.250.70.117:38300] EXCEPTION: java.lang.OutOfMemoryError: Java heap space) 
java.lang.OutOfMemoryError: Java heap space 
     at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) 
     at java.nio.ByteBuffer.allocate(ByteBuffer.java:331) 
     at org.jboss.netty.buffer.CompositeChannelBuffer.toByteBuffer(CompositeChannelBuffer.java:649) 
     at org.jboss.netty.buffer.AbstractChannelBuffer.toByteBuffer(AbstractChannelBuffer.java:530) 
     at org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:77) 
     at org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:46) 
     at org.jboss.netty.channel.socket.nio.AbstractNioWorker.write0(AbstractNioWorker.java:194) 
     at org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromTaskLoop(AbstractNioWorker.java:152) 
     at org.jboss.netty.channel.socket.nio.AbstractNioChannel$WriteTask.run(AbstractNioChannel.java:335) 
     at org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:366) 
     at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:290) 
     at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90) 
     at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
     at java.lang.Thread.run(Thread.java:744) 
2016-05-02 22:39:05,783 ERROR [sparkDriver-akka.actor.default-dispatcher-14] org.apache.spark.rpc.akka.ErrorMonitor: Uncaught fatal error from thread [sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down ActorSystem [sparkDriver] 
java.lang.OutOfMemoryError: Java heap space 
     at java.util.Arrays.copyOf(Arrays.java:2271) 
     at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) 
     at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) 
     at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) 
     at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) 
     at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) 
     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) 
     at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129) 
     at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129) 
     at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129) 
     at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
     at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129) 
     at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36) 
     at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:843) 
     at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:843) 
     at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
     at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:842) 
     at akka.remote.EndpointWriter.writeSend(Endpoint.scala:743) 
     at akka.remote.EndpointWriter$$anonfun$2.applyOrElse(Endpoint.scala:718) 
     at akka.actor.Actor$class.aroundReceive(Actor.scala:467) 
     at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:411) 
     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) 
     at akka.actor.ActorCell.invoke(ActorCell.scala:487) 
     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) 
     at akka.dispatch.Mailbox.run(Mailbox.scala:220) 
     at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) 
     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
     at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
2016-05-02 22:39:05,783 ERROR [sparkDriver-akka.actor.default-dispatcher-2] akka.actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down ActorSystem [sparkDriver] 
java.lang.OutOfMemoryError: Java heap space 
     at java.util.Arrays.copyOf(Arrays.java:2271) 
     at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) 
     at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) 
     at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) 
     at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) 
     at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) 
     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) 
     at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129) 
     at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129) 
                                                           67247,1  99% 

にシャッフル比で再生。 1B-2B。 10以上の固有フィールドを集約した出力データセットを作成します。これは基本的にクエリ条件になります。しかし、これらの10個のフィールドのうち、3個のフィールドはそれらのさまざまな組み合わせを表しているため、複数のレコードを照会してセットを取得する必要はありません。これら3つのフィールドのうち、a、b、cはそれぞれ11,2および2の可能な値を持つことができます。与えられた鍵に対して2^11 -1 * 2^2 - 1 * 2^2 -1の組み合わせを得ることができた。

//pseudo code where I use aggregateByKey 

case class UserDataSet(salary: Double, members: Int, clicks: Map[Int, Long], 
    businesses: Map[Int, Set[Int]])...) //About 10 fileds with 5 of them are maps 

def main() = { 

     create combinationRDD of type (String, Set[Set]) Rdd from input dataset which represent all combination 
     create a joinedRdd of type (String, UserDataSet) - where key at this point already a final key which contains 10 unique fields; value is a UserDataSet 

//This is where things fails 
    val finalDataSet = joinedRdd.aggregateByKey(UserDataSet.getInstance())(processDataSeq, processDataMerge) 

}  

private def processDataMerge(map1: UserDataSet, map2: UserDataSet) = { 

    map1.clicks ++= map2.clicks (deep merge of course to avoid overwriting of map keys) 
    map1.salary += map2.salary 

    map1 
} 
+0

私たちがあなたを助けるためには、2Bの行で実際に何をしているのかを見るには、少なくともコードを見る必要があります。 –

+0

ジョブの説明と疑似コードを追加しました。もっと情報を提供する必要があるかどうかを教えてください。 – nir

答えて

0

助けを得るためには、コードを掲示し、入力データの説明も与えてください。

なぜデータですか? キー別に集計する場合、最適な並列性を実現し、問題を回避するには、キー配布の外観およびカーディナリティを理解することが重要です。

私はそれらが何であるか、なぜ重要であるのかを説明しましょう。 のは、あなたが国によって集計しているとしましょう...そこに地球上の約250カ国があるので、低カーディナリティがあなたの並列性を抑圧する可能性があるため、キーのカーディナリティは約250

カーディナリティが重要です。たとえば、データの90%が米国向けで、ノード数が250であれば、1ノードはデータの90%を処理します。

これは、ディストリビューションのコンセプトにつながります。つまり、キーごとにグループ化している場合、キーごとにいくつの値を持つかは値の分布です。最適な並列処理を実現するには、理想的にはすべてのキーにおおよそ同じ数の値が必要です。

データのカーディナリティが非常に高いものの、値の分布が最適でない場合、統計的には均等になるはずです。 たとえば、ほとんどのユーザーが数ページしか訪れていないが、ロボットを訪れている人の多くが訪問するApacheログがあるとします。 ユーザーの数がノードの数よりもはるかに多い場合、データの多いユーザーはノードの周りに分散し、並列性は影響を受けません。

カーディナリティの低いキーを使用すると、通常、問題が発生します。 値の分布が良好でない場合、不均衡な洗濯機では問題にならないでしょう。

最後に、aggregateByKeyで行っていることに大きく依存します。いずれかのマップにオブジェクトが漏れている場合、または処理のフェーズを短縮している場合は、メモリを使い尽くすことができます。

+0

ありがとうございます。私はデータがうまく分散していないことを知っている初期データセットと比較した最終キーのカーディナリティは100:1です。 1B入力セット - 10M出力セット。それは理にかなっていますか?しかし、私はシャッフル前とシャッフル後に、操作が失敗している場所を比較する必要があると思います。ドライバログにこれらのエラーが表示される理由を説明できますか?エグゼクティブやドライバーからのエラーですか?エグゼキュータやドライバに与えられた全体的なメモリに起因するのでしょうか、それともスパークシャッフルブロック自体の限界ですか? – nir

1

ドライバーが実際にメモリ不足であり、エグゼキュータではありませんでした。ドライバログにエラーがありました。デュ。しかし、ログからはそれほど明確ではありませんでした。ドライバは使い果たされました。なぜなら、1)デフォルトの-Xmx900mを使用していたからです。2)Sparkドライバはakka libsに依存しています。akka libsは、ストリームの代わりにByte配列を使用してオブジェクトを直列化する頑強なJavaSerializerに依存します。一時的な解決策として、私はspark.driver.memoryを私の場合には4096mに増やして以来、私はメモリエラーを見たことがありません。問題空間へのいくつかの洞察に感謝します。