2016-07-25 10 views
3

S3からsparkに100万の圧縮されたテキストファイルを読み込もうとしています。各ファイルの圧縮サイズは、50 MBから80 MBです。それは約6.5テラバイトのデータです。S3からスパークするためにネストされたテキストファイルを読み込むときにメモリエラーが発生する

残念ながら私は解決方法がわからないメモリ不足例外が発生しています。

raw_file_list = subprocess.Popen("aws s3 ls --recursive s3://my-bucket/export/", shell=True, stdout=subprocess.PIPE).stdout.read().strip().split('\n') 
cleaned_names = ["s3://my-bucket/" + f.split()[3] for f in raw_file_list if not f.endswith('_SUCCESS')] 
dat = sc.textFile(','.join(cleaned_names)) 
dat.count() 

収量:

--------------------------------------------------------------------------- 
Py4JJavaError        Traceback (most recent call last) 
<ipython-input-22-8ce3c7d1073e> in <module>() ----> 1 dat.count() 

/tmp/spark-tmp-lminer/spark-1.6.1-bin-hadoop2.6/python/pyspark/rdd.pyc in count(self) 
    1002   3 
    1003   """ 
-> 1004   return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 
    1005 
    1006  def stats(self): 

/tmp/spark-tmp-lminer/spark-1.6.1-bin-hadoop2.6/python/pyspark/rdd.pyc in sum(self) 
    993   6.0 
    994   """ 
--> 995   return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add) 
    996 
    997  def count(self): 

/tmp/spark-tmp-lminer/spark-1.6.1-bin-hadoop2.6/python/pyspark/rdd.pyc in fold(self, zeroValue, op) 
    867   # zeroValue provided to each partition is unique from the one provided 
    868   # to the final reduce call 
--> 869   vals = self.mapPartitions(func).collect() 
    870   return reduce(op, vals, zeroValue) 
    871 

/tmp/spark-tmp-lminer/spark-1.6.1-bin-hadoop2.6/python/pyspark/rdd.pyc in collect(self) 
    769   """ 
    770   with SCCallSiteSync(self.context) as css: 
--> 771    port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 
    772   return list(_load_from_socket(port, self._jrdd_deserializer)) 
    773 

/tmp/spark-tmp-lminer/spark-1.6.1-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args) 
    811   answer = self.gateway_client.send_command(command) 
    812   return_value = get_return_value(
--> 813    answer, self.gateway_client, self.target_id, self.name) 
    814 
    815   for temp_arg in temp_args: 

/tmp/spark-tmp-lminer/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/utils.pyc in deco(*a, **kw) 
    43  def deco(*a, **kw): 
    44   try: 
---> 45    return f(*a, **kw) 
    46   except py4j.protocol.Py4JJavaError as e: 
    47    s = e.java_exception.toString() 

/tmp/spark-tmp-lminer/spark-1.6.1-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 
    306     raise Py4JJavaError(
    307      "An error occurred while calling {0}{1}{2}.\n". 
--> 308      format(target_id, ".", name), value) 
    309    else: 
    310     raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. 
: java.lang.OutOfMemoryError: GC overhead limit exceeded 

を更新:のような単純なもの

問題の一部は、このpostによって解決されているようです。 S3からたくさんのファイルが消えていたようだ。エラーが更新され、メモリの問題のみが反映されるようになりました。

+0

'spark.driver.memory'、' spark.executor.memory'、おそらくはJava Heapサイズにもっと多くのメモリを割り当てることができますか? – KartikKannapur

答えて

0

問題は、ファイルが多すぎるということでした。解決策は、ファイルのサブセットを読み込み、それらをより小さな数にまとめることによって、パーティションの数を減らすように思われる。しかし、パーティションを大き過ぎることはできません:500 - 1000 MBのファイルは、独自の問題を引き起こします。

関連する問題