2016-11-18 4 views
2

私はS3フォルダにストリーミングされたXMLファイルを処理する必要があります。現在、以下のように実装しています。スパークストリーミングxmlファイル

まず、各RDD用スパークのFILESTREAM

val data = ssc.fileStream[LongWritable, Text, TextInputFormat]("s3://myfolder/",(t: org.apache.hadoop.fs.Path) => true, newFilesOnly = true, hadoopConf).map(_._2.toString())

を使用して読み取りファイル、任意のファイルを新しいHDFSディレクトリに

を文字列を書く

if (data.count() !=0) 

を読まれたかどうかを確認

上記のHDFS diから読み取るデータフレームを作成する牧師館

val loaddata = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "Trans").load(sdir) 

は、データフレームにいくつかの処理を行うと、どういうわけか、JSON

として
loaddata.write.mode("append").json("s3://mybucket/somefolder") 

を保存し、私は上記のアプローチはボーイッシュ非常に非効率的かつ率直に言って、非常に学校であると感じています。 もっと良い解決策はありますか?どんな助けでも大歓迎です。

フォローアップの質問 データフレーム内のフィールド(列ではない)を操作するにはどうすればよいですか? 私はvey複雑なネストされたxmlを持っています。上記のメソッドを使用すると、Dataframeに9列と50の奇妙な内部構造配列があります。それは、特定のフィールド名をトリミングする必要性を除いては問題ありません。同じ構造をもう一度構築する必要があるので、データフレームを爆発させることなく達成する方法はありますか?

答えて

1

あなたがスパーク2.0を使用している場合は、それが構造化されたストリーミングで動作させることができるかもしれ:

val inputDF = spark.readStream.format("com.databricks.spark.xml") 
    .option("rowTag", "Trans") 
    .load(path) 
+0

どうもありがとうございました。私のターゲットenvは、Spark 2.0.1でEMRスタックです。私はあなたの提案をEMRボックスで試してみます。 – Vamsi

+0

あなたが上記の解決策で大丈夫であれば、pls vote-up/accept。 –

関連する問題