私はS3フォルダにストリーミングされたXMLファイルを処理する必要があります。現在、以下のように実装しています。スパークストリーミングxmlファイル
まず、各RDD用スパークのFILESTREAM
val data = ssc.fileStream[LongWritable, Text, TextInputFormat]("s3://myfolder/",(t: org.apache.hadoop.fs.Path) => true, newFilesOnly = true, hadoopConf).map(_._2.toString())
を使用して読み取りファイル、任意のファイルを新しいHDFSディレクトリに
を文字列を書くif (data.count() !=0)
を読まれたかどうかを確認
上記のHDFS diから読み取るデータフレームを作成する牧師館
val loaddata = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "Trans").load(sdir)
は、データフレームにいくつかの処理を行うと、どういうわけか、JSON
としてloaddata.write.mode("append").json("s3://mybucket/somefolder")
を保存し、私は上記のアプローチはボーイッシュ非常に非効率的かつ率直に言って、非常に学校であると感じています。 もっと良い解決策はありますか?どんな助けでも大歓迎です。
フォローアップの質問 データフレーム内のフィールド(列ではない)を操作するにはどうすればよいですか? 私はvey複雑なネストされたxmlを持っています。上記のメソッドを使用すると、Dataframeに9列と50の奇妙な内部構造配列があります。それは、特定のフィールド名をトリミングする必要性を除いては問題ありません。同じ構造をもう一度構築する必要があるので、データフレームを爆発させることなく達成する方法はありますか?
どうもありがとうございました。私のターゲットenvは、Spark 2.0.1でEMRスタックです。私はあなたの提案をEMRボックスで試してみます。 – Vamsi
あなたが上記の解決策で大丈夫であれば、pls vote-up/accept。 –