2016-12-10 8 views
3

Spark Structured StreamingでS3バケットからデータを読み取ろうとしています。以下のコードは、既存のデータを取得するために機能します。しかし、新しいデータがバケットに追加されると、Sparkはこれを選択しません。Spark Structured Streamingで新しいデータがS3から取得されない

val lines = spark.readStream.schema(schemaImp).format("com.databricks.spark.avro").load("s3n://bucket/*") 
val query = lines.writeStream.outputMode("append").format("memory").queryName("memtable").start() 
query.processAllAvailable() 
spark.sql("select * from memtable").show() 

新しいデータをフェッチする方法を教えてください。または、これはまだサポートされていない機能ですか?

答えて

1

最初にローカルFSに対してテストします。そこでは動作しますが、S3に対しては動作しない場合は、s3 rename/commitを使ったいくつかの変です。それがローカルFSに対して動作しない場合は、ストリーミングを使用している方法です。たぶん、.map()が呼び出されるたびにログを記録するテストを試してみてください。

ストリーミングおよびオブジェクトストアを使用している場合は、(a)s3nをs3n上で使用し、(b)save + renameではなくオブジェクトストアパスに直接保存します。不完全なデータの処理を避けるためにのみ必要です。ファイルの書き込み中にファイルが表示されるファイルシステム

+0

ローカルファイルシステムを試してみましたが、これは同じ動作です。私はメモリ内のテーブルに出力をストリームすると推測していますが、出力は新しいデータを取得しません。私は入力上の複数の集約操作を防ぐ構造化ストリーミングの制限を回避しようとしています。私がアップデートを受け取るために出力ストリーミングを得ることができるなら、私は任意の数の集約操作を行うことができます。これが理にかなってほしい。 – Kaptrain

+0

@Kaptrainログはありますか?これはバグのようです。 – zsxwing

+0

出力ログを[このリンク](https://www.dropbox.com/s/nauda93y6nzz154/S3Table.log?dl=0)にアップロードしました。小さなテーブルが表示されている部分がS3の第1ファイルから取り出されます。その下の行は、S3に新しいファイルを追加した後に生成されます。したがって、Sparkは新しいファイルを検出しますが、出力テーブルへの更新はストリーミングしません。 – Kaptrain

関連する問題