2017-08-24 1 views
1

DataStreamWriterのクエリプランに最終操作を追加する方法がSparkの構造化ストリーミングにありますか?ストリーミングデータソースから読み込み、何らかの方法でデータを充実させようとしていて、分割された外部テーブル(Hiveと仮定)を寄木張りの形式で書き戻そうとしています。書き込み操作はうまく動作し、私のためにディレクトリ内のデータを分割していますが、作成された可能性のある新しいパーティションのデータをディスクに書き込んだ後に、さらにMSCK REPAIR TABLEまたはALTER TABLE ADD PARTITIONオペレーションを実行する方法はわかりません。Sparkストラクチャードストリーミング外部テーブルの最終操作を実行する(MSCK REPAIR)

することは簡単にするため、一例として、以下のScalaのコードを取る:

SparkSession 
    .builder() 
    .appName("some name") 
    .enableHiveSupport() 
    .getOrCreate() 
    .readStream 
    .format("text") 
    .load("/path/from/somewhere") 
    // additional transformations 
    .writeStream 
    .format("parquet") 
    .partitionBy("some_column") 
    .start("/path/to/somewhere") 
    <-------------------- something I can place here for an additional operation? 
    .awaitTermination() 

潜在的な回避策?:

1:たぶんでしょう.foreach(new ForeachWriter[Row])ようなものを使用して、同様のFileStreamSinkか何かを渡します(外部クエリを実行するのにdef close()を使用して)作業していますが、私はそれを十分に把握して使用することはできませんでした。 - ForeachWriterを使用しても、バッチ完了後にclose()メソッドが呼び出されることはありません。

2:ストリームをフォークします。次の行に沿って何か:

val stream = SparkSession 
    .builder() 
    .appName("some name") 
    .enableHiveSupport() 
    .getOrCreate() 
    .readStream 
    .format("text") 
    .load("/path/from/somewhere") 
    // additional transformations 

stream 
    .writeStream 
    .format("parquet") 
    .partitionBy("some_column") 
    .start("/path/to/somewhere") 
    .awaitTermination() 

stream 
    .map(getPartitionName).distinct 
    .map { partition => 

    // Run query here 

    partition 
    } 
    .writeStream 
    .start() 
    .awaitTermination() 

ここでの問題は、最初の操作が2番目の操作の前に完了することです。

3:クエリの命名と、すべてのパーティションを手動で追加する完了バッチのリスナーの追加。少し無駄ですが、潜在的に実行可能ですか?

これをカバーするドキュメントで何も表示されませんでしたが、うまくいけば何も見逃しませんでした。前もって感謝します。

答えて

0

StreamingQueryListenerを使用すると、良いか悪いのか分かりませんが、

spark.streams.addListener(new StreamingQueryListener() { 

    val client = new Client() 

    override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = Unit 
    override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = Unit 
    override def onQueryProgress(event: QueryProgressEvent): Unit = { 
    if (event.progress.numInputRows > 0 && event.progress.sink.description.startsWith("FileSink") && event.progress.sink.description.contains("/path/to/write/directory")) { 
     client.sql(s"MSCK REPAIR TABLE $db.$table") 
    } 
    } 
}) 

あなたは、時間ベースのパーティションを持ってしまった場合は、これはちゃんと限り、あなたはnow()に基づいてパーティションを作成するために、意図したとおりに動作します:

私はこの線に沿って何かを実装

spark.streams.addListener(new StreamingQueryListener() { 

    val client = new Client() 
    var lastPartition: String = "" 
    val dateTimeFormat: String = "yyyy-MM-dd" 

    override def onQueryStarted... 
    override onQueryTerminated... 
    override def onQueryProgress(event: QueryProgressEvent): Unit = { 
    if (event.progress.numInputRows > 0 && event.progress.sink.description.startsWith("FileSink[s3") && event.progress.sink.description.contains("/path/to/write/directory")) { 

     val newPartition = new DateTime().toString(dateTimeFormat) 

     if (newPartition != lastPartition) { 
     client.sql(s"ALTER TABLE $db.$table ADD IF NOT EXISTS PARTITION ($partitionColumn='$newPartition')") 
     lastPartition = newPartition 
    } 
    } 
} 
関連する問題