私は、ZIPファイルを読み込み、解凍して内容を新しいファイルのセットに書き込むためのspark/scalaプログラムを作成しています。これをローカルファイルシステムに書き込むことができますが、HDFSなどの分散ファイルシステムに出力ファイルを書き込む方法があるのかどうか疑問に思っています。コードは(あなたが依存manangementツールとしてSBTを使用している場合、依存関係にthathライブラリを追加し、)あなたが簡単にHadoop-共通ライブラリを使用してHDFSにデータを書き込むことができSpark/ScalaのHDFSへの書き込み
import java.util.zip.ZipInputStream
import org.apache.spark.input.PortableDataStream
import java.io._
var i =1
sc.binaryFiles("file:///d/tmp/zips/").flatMap((file: (String, PortableDataStream)) =>
{
val zipStream = new ZipInputStream(file._2.open)
val entry = zipStream.getNextEntry
val iter = scala.io.Source.fromInputStream(zipStream).getLines
val fname = f"/d/tmp/myfile$i.txt"
i = i + 1
val xx = iter.mkString
val writer = new PrintWriter(new File(fname))
writer.write(xx)
writer.close()
iter
}).collect()
`
このコードでは、iterはRDDではないため、書き込むことはできません。たぶん、コンバージョンを最初にしています。 – dumitru
はい、私たちはここで良いキャストになると思います。 RDDは、クラスタ上で分散データを取得するためにsparkを操作するデータ型でなければなりません。 – chateaur
それは問題の要です。私はsaveasTextFileの使用を有効にするためにRDDに自分のiterでデータを取得するために考えることができるすべてを試したが、短くなってきた。もし誰かがこれを解決したら、私に知らせてください – user2699504