2016-04-28 8 views
1

Spark Streamingの新機能です。ssc.filestream()を使用してJavaのzipディレクトリを処理する方法

特定のディレクトリにあるすべての.zipファイルを監視して解凍します。 は私がhttp://cutler.io/2012/07/hadoop-processing-zip-files-in-mapreduce/を参照して

JavaPairInputDStream<Text, BytesWritable> streamlogFiles=ssc.fileStream(logDir, Text.class, BytesWritable.class, ZipFileInputFormat.class); 

は、しかし、私はFILESTREAMは()zipファイルのexsitedinを処理していないことが判明し、次のコードを記述している/指定したディレクトリに移動しました。

私は何かお見逃しですか?

答えて

0

あなたはここにZipFileInputFormatを使用することができます。https://github.com/cotdp/com-cotdp-hadoop/tree/master/src/main/java/com/cotdp/hadoop

をしてrecord._2は、そのファイルのBytesWriteableあるようrecord._1.toStringはファイル名です

val files = ssc.fileStream[Text, BytesWritable, ZipFileInputFormat](someInputDirectory) 

files.foreachRDD{ rdd => 
    rdd.foreachPartition { partition => 
    partition.foreach { record => 
     process(record._1.toString, record._2) 
    } 
    } 
} 

を使用してFILESTREAMを作成します。 InputFormatで.zipを解凍したくない場合は、別のカスタムFileInputFormatが必要になるか、ZipFileInputFormatを変更する必要があります。

これをテストするには、someInputDirectoryに追加する.zipファイルが最後に変更されたことを確認してください。< 1分前に、それ以外の場合、SparkStreamingはデフォルトで無視します。

関連する問題