2016-04-27 8 views
0

BSONダンプを解析するスクリプトがありますが、圧縮されていないファイルでのみ動作します。 gz bsonファイルを読みながら空のRDDを取得します。PySpark:gzipped BSONファイルの読み込み時に空のRDD

pyspark_location = 'lib/pymongo_spark.py' 
HDFS_HOME = 'hdfs://1.1.1.1/' 
INPUT_FILE = 'big_bson.gz' 


class BsonEncoder(JSONEncoder): 
    def default(self, obj): 
     if isinstance(obj, ObjectId): 
      return str(obj) 
     elif isinstance(obj, datetime): 
      return obj.isoformat() 
     return JSONEncoder.default(self, obj) 


def setup_spark_with_pymongo(app_name='App'): 
    conf = SparkConf().setAppName(app_name) 
    sc = SparkContext(conf=conf) 
    sc.addPyFile(pyspark_location) 
    return sc 


def main(): 
    spark_context = setup_spark_with_pymongo('PysparkApp') 
    filename = HDFS_HOME + INPUT_FILE 
    import pymongo_spark 
    pymongo_spark.activate() 
    rdd = spark_context.BSONFileRDD(filename) 
    print(rdd.first()) #Raises ValueError("RDD is empty") 

私はモンゴ-javaのドライバ-3.2.2.jar、モンゴ-Hadoopの-火花1.5.2.jar、pymongo-3.2.2-py2.7-のlinux-x86_64版とpymongo_sparkでを使用していますspark-submitと一緒に。 展開されたSparkのバージョンは、Hadoop 2.6.4とともに1.6.1です。

私はライブラリが圧縮されたBSONファイルの分割をサポートしていないことを認識していますが、分割は1つで済むはずです。 私は、分析するために圧縮されたBSONファイルが何百もあり、それらのそれぞれを実行可能なオプションではないように思われます。

どのようにすればよいでしょうか? ありがとうございます!

答えて

0

環境でテストしたところ:mongo-hadoop-spark-1.5.2.jar、Hadoop 2.6.4、Pymongo 3.2.2のスパークバージョン1.6.1です。ソースファイルはmongodump compressedの出力で、1つの分割(圧縮されていない収集サイズは105MB)の小さなサイズのファイルです。 PySparkを通じて実行:

from pyspark import SparkContext, SparkConf 
import pymongo_spark 
pymongo_spark.activate() 
conf = SparkConf().setAppName("pyspark-bson") 
file_path = "/file/example_bson.gz" 
rdd = sc.BSONFileRDD(file_path) 
rdd.first() 

圧縮されたBSONファイルを読み取ることができ、そして最初のドキュメントを記載されています。入力ファイルにアクセスできることを確認し、ファイルが正しいBSON形式であることを確認してください。

関連する問題