2017-09-26 3 views
2

完全モードを使用してストリーミングデータフレームに集約を適用しました。ローカルのデータフレームを保存するために、私はforeachシンクを実装しました。私はテキスト形式でデータフレームを保存することができます。しかし私は寄木張りの形でそれを保存する必要があります。完全な出力モードでストリーミング集約を寄せ木にする方法

val writerForText = new ForeachWriter[Row] { 
    var fileWriter: FileWriter = _ 

    override def process(value: Row): Unit = { 
     fileWriter.append(value.toSeq.mkString(",")) 
    } 

    override def close(errorOrNull: Throwable): Unit = { 
     fileWriter.close() 
    } 

    override def open(partitionId: Long, version: Long): Boolean = { 
     FileUtils.forceMkdir(new File(s"src/test/resources/${partitionId}")) 
     fileWriter = new FileWriter(new File(s"src/test/resources/${partitionId}/temp")) 
     true 

    } 
    } 

val columnName = "col1" 
frame.select(count(columnName),count(columnName),min(columnName),mean(columnName),max(columnName),first(columnName), last(columnName), sum(columnName)) 
       .writeStream.outputMode(OutputMode.Complete()).foreach(writerForText).start() 

どうすれば実現できますか? ありがとうございます!

+0

あなたは完全/更新モードで寄木細工して保存する方法を発見しましたか? –

+0

はい、https://github.com/chtefi/parquet-custom-reader-writerを使用してカスタムライターを作成しました –

答えて

-1

ローカルでデータフレームを保存するには、foreach sinkを実装しました。私はテキスト形式でデータフレームを保存することができます。しかし私は寄木張りの形でそれを保存する必要があります。

ストリーミングデータセットを保存するデフォルトのフォーマットは... 寄木細工です。これで、かなり高度なforeachシンクを使用する必要はありませんが、単にparquetです。

次のようにクエリは次のようになります。

scala> :type in 
org.apache.spark.sql.DataFrame 

scala> in.isStreaming 
res0: Boolean = true 

in.writeStream. 
    option("checkpointLocation", "/tmp/checkpoint-so"). 
    start("/tmp/parquets") 
+0

ストラクチャードストリーミングでは、メモリ以外の完全モードでシンクにデータフレームを書き込むことはできません。それを保存するには、foreach sinkを実装する必要があります。私たちはあなたが提案した完全モードでこれを行うことはできません。 –

+0

あなたは正しいかもしれません...あまりにも急いでこれに答えてください。私はそれについて考えてみましょう... –

+0

@MaheshChandKandpalあなたのシンクはファイルであるので、** File Sink **をJacekの答えのように追加モードで使用するのが理にかなっています。 Foreachシンクを使用したソリューションが熟考されているようです –

関連する問題