DataStreamWriter
のクエリプランに最終操作を追加する方法がSparkの構造化ストリーミングにありますか?ストリーミングデータソースから読み込み、何らかの方法でデータを充実させようとしていて、分割された外部テーブル(Hiveと仮定)を寄木張りの形式で書き戻そうとしています。書き込み操作はうまく動作し、私のためにディレクトリ内のデータを分割していますが、作成された可能性のある新しいパーティションのデータをディスクに書き込んだ後に、さらにMSCK REPAIR TABLE
またはALTER TABLE ADD PARTITION
オペレーションを実行する方法はわかりません。Sparkストラクチャードストリーミング外部テーブルの最終操作を実行する(MSCK REPAIR)
することは簡単にするため、一例として、以下のScalaのコードを取る:
SparkSession
.builder()
.appName("some name")
.enableHiveSupport()
.getOrCreate()
.readStream
.format("text")
.load("/path/from/somewhere")
// additional transformations
.writeStream
.format("parquet")
.partitionBy("some_column")
.start("/path/to/somewhere")
<-------------------- something I can place here for an additional operation?
.awaitTermination()
潜在的な回避策?:
1:たぶんでしょう - ForeachWriterを使用しても、バッチ完了後に.foreach(new ForeachWriter[Row])
ようなものを使用して、同様のFileStreamSink
か何かを渡します(外部クエリを実行するのにdef close()
を使用して)作業していますが、私はそれを十分に把握して使用することはできませんでした。close()
メソッドが呼び出されることはありません。
2:ストリームをフォークします。次の行に沿って何か:
val stream = SparkSession
.builder()
.appName("some name")
.enableHiveSupport()
.getOrCreate()
.readStream
.format("text")
.load("/path/from/somewhere")
// additional transformations
stream
.writeStream
.format("parquet")
.partitionBy("some_column")
.start("/path/to/somewhere")
.awaitTermination()
stream
.map(getPartitionName).distinct
.map { partition =>
// Run query here
partition
}
.writeStream
.start()
.awaitTermination()
ここでの問題は、最初の操作が2番目の操作の前に完了することです。
3:クエリの命名と、すべてのパーティションを手動で追加する完了バッチのリスナーの追加。少し無駄ですが、潜在的に実行可能ですか?
これをカバーするドキュメントで何も表示されませんでしたが、うまくいけば何も見逃しませんでした。前もって感謝します。