2017-02-17 19 views
1

私は、ZIPファイルを読み込み、解凍して内容を新しいファイルのセットに書き込むためのspark/scalaプログラムを作成しています。これをローカルファイルシステムに書き込むことができますが、HDFSなどの分散ファイルシステムに出力ファイルを書き込む方法があるのか​​どうか疑問に思っています。コードは(あなたが依存manangementツールとしてSBTを使用している場合、依存関係にthathライブラリを追加し、)あなたが簡単にHadoop-共通ライブラリを使用してHDFSにデータを書き込むことができSpark/ScalaのHDFSへの書き込み

import java.util.zip.ZipInputStream 
import org.apache.spark.input.PortableDataStream 
import java.io._ 

var i =1 
sc.binaryFiles("file:///d/tmp/zips/").flatMap((file: (String,  PortableDataStream)) => 
    { 


    val zipStream = new ZipInputStream(file._2.open)    
    val entry = zipStream.getNextEntry        
    val iter = scala.io.Source.fromInputStream(zipStream).getLines   

    val fname = f"/d/tmp/myfile$i.txt" 


    i = i + 1 

    val xx = iter.mkString 
    val writer = new PrintWriter(new File(fname)) 
    writer.write(xx) 
    writer.close() 

    iter              
    }).collect() 

`

答えて

3

below`示されています。そして、あなたは、例えば、Aのために、書くことができ

private val fs = { 
    val conf = new Configuration() 
    FileSystem.get(conf) 
    } 

は(などコア-site.xmlに、)あなたのHadoopクラスタ情報とファイルシステムを設定してください

:それを使用すると、FileSystemのオブジェクトを作成することができますパスの文字列(あなたのケースでは、あなたがストリームに対処する必要があります)、HDFS上で次のように:

@throws[IOException] 
    def writeAsString(hdfsPath: String, content: String) { 
    val path: Path = new Path(hdfsPath) 
    if (fs.exists(path)) { 
     fs.delete(path, true) 
    } 
    val dataOutputStream: FSDataOutputStream = fs.create(path) 
    val bw: BufferedWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream, "UTF-8")) 
    bw.write(content) 
    bw.close 
    } 
0

あなたは公式ドキュメントからの方法saveAsTextFileを見ている必要があります。http://spark.apache.org/docs/latest/programming-guide.html

それはあなたがHDFSに保存することができます:

iter.saveAsTextFile("hdfs://...") 
+0

このコードでは、iterはRDDではないため、書き込むことはできません。たぶん、コンバージョンを最初にしています。 – dumitru

+0

はい、私たちはここで良いキャストになると思います。 RDDは、クラスタ上で分散データを取得するためにsparkを操作するデータ型でなければなりません。 – chateaur

+0

それは問題の要です。私はsaveasTextFileの使用を有効にするためにRDDに自分のiterでデータを取得するために考えることができるすべてを試したが、短くなってきた。もし誰かがこれを解決したら、私に知らせてください – user2699504

0

あなたはsaveAsTextFile方法を試すことができます。

データファイルの要素を、ローカルファイルシステム、HDFSまたは他のHadoopでサポートされているファイルシステムの特定のディレクトリにテキストファイル(またはテキストファイルのセット)として書き込みます。 Sparkは各要素のtoStringを呼び出して、ファイル内のテキスト行に変換します。

パーティションを別のファイルとして保存します。パーティションを分割したり、合体したりしない限り、最終的なパーティションの数は入力ファイルの数と同じになります。

+0

saveasTextFileを使っている理由について私の上のコメントを参照してください。 – user2699504

+0

RDD全体を個別に書くことはできません。代わりにsaveAsTextファイルを使用しますか? – NetanelRabinowitz

+0

これは、解凍されたすべてのデータを1つのファイルに連結します。それは私が望むものではありません。私はそれぞれの解凍されたファイルを別々のファイルに入れたい – user2699504

0
sc.binaryFiles("/user/example/zip_dir", 10)        //make an RDD from *.zip files in HDFS 
      .flatMap((file: (String, PortableDataStream)) => {     //flatmap to unzip each file 
       val zipStream = new ZipInputStream(file._2.open)    //open a java.util.zip.ZipInputStream 
       val entry = zipStream.getNextEntry        //get the first entry in the stream 
       val iter = Source.fromInputStream(zipStream).getLines   //place entry lines into an iterator 
       iter.next              //pop off the iterator's first line 
       iter               //return the iterator 
      }) 
      .saveAsTextFile("/user/example/quoteTable_csv/result.csv") 
関連する問題