2016-10-05 9 views
2

sparkで操作するために〜20MBの圧縮ファイルが2MBあります。私の最初のアイデアはwholeTextFiles()を使用して、filename - > contentタプルを得ることでした。これは、この種のペアリングを維持する必要があるので便利です(処理がファイルごとに行われ、各ファイルが収集されたデータの1分分を表すためです)。 I /フィルタ/等にデータをマッピングし、このファイル名を維持する必要があるときはいつでもしかし、 - >の関連付けを、コードは醜い取得すなわちwholeTextFilesを使用したスパークデータ操作

Data.map(lambda (x,y) : (x, y.changeSomehow)) 

データ自体ので、それぞれの含有量(効率的かつおそらくありません?)ファイルには、10kのデータ行が含まれているため、別個のRDDとして読むのが良いでしょう。しかし、(私が知る限り)rddのrddを持つことはできません。

処理を簡単にする方法はありますか?基本的に私は各ファイルの内容をrddとして使用できるので、ファイル名(および変換の代わりにリスト内包表記の使用法)を追跡することなく、rdd.map(lambda x: change(x))を行うことができます。

もちろん、分散型アプローチを維持し、いかなる方法でもそれを抑制しないことが目標です。

処理の最後のステップは、すべてをまとめて削減することです。

詳しい背景:あなたは、通常のmap機能(O1-> O2)を持っている場合は、そのパス

+0

を減らすことができます。 Pythonは、Pythonが本質的に遅いので、一方ではPythonが約10倍遅くなり、jvmからpythonにデータを配送する必要があるので、もう一方の側では – Reactormonk

+0

@Reactormonkとしたいと思います。しかし、ファイル内のすべての行を実質的にデコードするには、Pythonスクリプトを使用する必要があります。もっと正確に言えば、私はPythonでコードが見つかったAISメッセージについて話しています。回避策が分かっている場合は、 – Dimebag

+1

https://github.com/dma-ais/AisLibに教えてください。 – Reactormonk

答えて

1

をプロットし、分ごとに(近く)船の衝突を特定しようとしている、あなたはmapValues機能を使用することができます。あなたはflatMap(o1 - > Collection())関数も持っています:flatMapValues。

キー(あなたの場合 - ファイル名)を保持し、値のみを変更します。例えば

reduceByKeyを使用して
rdd = sc.wholeTextFiles (...) 
# RDD of i.e. one pair, /test/file.txt -> Apache Spark 
rddMapped = rdd.mapValues (lambda x: veryImportantDataOf(x)) 
# result: one pair: /test/file.txt -> Spark 

あなたはスピードが必要な場合は、私はスカラ座を示唆して結果

+0

これはすでに改善されました。ありがとうございます!ですから、私の行のいくつかは 'data.mapValues(lambda x:[list comprehension])'のようになります。ここで、listはファイル内のすべての行を含んでいます。 xが効率と可読性の面ではなく、リストの代わりにそれ自身のrddだったのはすばらしいことです。あなたは同意しますか、そうする方法を考えているかもしれませんか? – Dimebag

+0

キーペアRDDの値はRDDにすることはできません。それは任意のシリアライズ可能なデータ構造にすることができます。つまり、クラス 'DocumentContent'を作成することができます。 RDDはデータの論理的な変換計画のようなものですが、コレクションだけではありません。ドライバはRDDアクションを続行するために一連の呼び出しを構成する必要があります。埋め込まれたRDDは多くの問題を引き起こすため、RDDのRDDは許可されません –

関連する問題