2017-02-14 6 views
0

ログファイルの作成日に基づいて、異なるディレクトリにログファイルが移動しています。Sparkの複数のディレクトリからのログを結合する

> /mypath/2017/01/20/... 
. 
. 
. 
> /mypath/2017/02/13/... 
> /mypath/2017/02/14/... 

例えば

私は、このマスターファイルに集計を行うことができるようにpysparkを使用して1つのRDDにすべてのこれらのログファイルを結合したいと思います。

日付までに、sqlContextという名前の個別のディレクトリを作成し、特定の日付のすべてのログファイルを結合するためにUnionを使用しました。

DF1 = (sqlContext.read.schema(schema).json("/mypath/2017/02/13")).union(sqlContext.read.schema(schema).json("/mypath/2017/02/14")) 

日付の範囲からログファイルを指定すると、マスターrddを簡単に取得できますか? (つまり、2017/01/20から2017/02/14まで)

私は非常に新しいスパークです。私が間違っていたら私を修正してください。あなたはsqlContextに固執する場合

+0

また、新しいデータフレーム上のすべてのビジネスロジックを実行することであろう別の解決策は、ベースのフィルタへこれらのログ(DF1など)にすべて参加した後、「タイプ」列に表示されます。そうする最適なプロセスは何でしょうか? (私は通常、DF1.filter()を使用します)。他の効率的な方法はありますか? – SpaceOddity

+0

sqlContext.read.schema(schema).json( "/ mypath/2017/02/[13-14]"))が機能していません。それは "不正なファイルパターン:インデックス4の近くの不正な文字範囲" – SpaceOddity

答えて

1

その後、簡単な解決策は、それはあなたがしたい場合は、できるソースからの労働組合への入力ディレクトリ

case class FileWithDate(basePath: String, year: Int, month: Int, day: Int) { 
def path = s"${basePath}/${year}/${month}/${day}" 
} 

def listFileSources() : List[FileWithDate] = ??? // implement here 

内のすべてのデータフレームを、すべてのファイルを一覧表示するメソッドを定義することになりますこのようにそれを実行します。

// create an empty dataframe with the strucutre for the json 
val files = listSources() 
val allDFs = files.foldLeft(emptyDF){case (df, f) => df.union(sqlContext.read.schema(schema).json(f.path))} 

日付で入力ファイルをフィルタリングする場合は、それが簡単になります。この

files.filter(_.year == 2016 && (_.month >=2 || _.month <=3)) 

のようなもの、それは、年、月、日を使用してデータフレームを(追加の列を入れる)強化し、私がしたい場合は、

+0

私はあなたがlistFileSourcesメソッドを実装することができる最小限のScalaの知識を持っていると思います。あなたがするべきことは、mypathフォルダ内のすべてのファイルを取得し(サブフォルダを再帰的に反復する)、FileWithDate型のオブジェクトを作成することです。これらのオブジェクトは、メソッドによって返されるリストに追加されます。 – dumitru

関連する問題