2017-12-07 11 views
0

私はsqlppydataとpysparkを使ってSQLクエリを実行し、出力DFを辞書に変換してmongoに一括挿入しています。 私はスパークDFの辞書への変換をテストするために、多くの同様の質問を行ってきました。pysparkから辞書を作成する最速の方法DF

現在、私はこのメソッドをbulk DFに変換するためにこのメソッドを使用しています。そして、10Kレコードで2〜3秒かかります。

私は私のアイデアをimpliment方法を以下に記載しました:

x = snappySession.sql("select * from test") 
df = map(lambda row: row.asDict(), x.collect()) 
db.collection.insert_many(df) 

は、任意のより高速な方法はありますか?

答えて

0

私はforeachPartitionを使用してお勧めします:

(snappySession 
    .sql("select * from test") 
    .foreachPartition(insert_to_mongo)) 

insert_to_mongo

def insert_to_mongo(rows): 
    client = ... 
    db = ... 
    db.collection.insert_many((row.asDict() for row in rows)) 
+0

コードを確認または実行しましたか?それは私にエラー 'AttributeErrorを与えています: 'itertools.chain'オブジェクトに属性 'asDict''がありません – techie95

0

私はSparkからMongoに直接書き込むことができるかどうかを調べるのは、これが最善の方法だからです。この方法を使用することができ、それに失敗

x = snappySession.sql("select * from test") 
dictionary_rdd = x.rdd.map(lambda row: row.asDict()) 

for d in dictionary_rdd.toLocalIterator(): 
    db.collection.insert_many(d) 

これは、分散的にスパークのすべての辞書を作成します。行はドライバに返され、Mongoに一度に1行ずつ挿入されるため、メモリが不足することはありません。

+0

私はMongoDBのに直接送信するDFの意識です。与えられた[documentation](https://docs.mongodb.com/spark-connector/master/python-api/)から、db認証はありません。これが私がこのようにした理由です。 – techie95

+0

@ShaikRizwana私は答えを更新しました – Anake

+0

@ Anakeに感謝しますが、それは12-15秒近くかかります。あなたが他に提案する方法はありますか? – techie95

関連する問題