2017-01-09 14 views
2

Apache NiFiとApache Sparkを使用して大学向けの小さなプロジェクトを行っています。 HDFSからTSVファイルを読み込み、Spark Streamingを使ってNiFiでワークフローを作成したいのですが、ファイルを処理して必要な情報をMySQLに保存できます。私はすでにNiFiでワークフローを作成しており、ストレージ部分は既に動作しています。問題は、私がNiFiパッケージを解析することができないため、それらを使用できることです。スパイクを使用してNiFiデータパケットを解析する

ファイルは、このような行を含める:

linea1File1 TheReceptionist 653 Entertainment 424 13021 4.34 1305 744 DjdA-5oKYFQ NxTDlnOuybo c-8VuICzXtU 

各スペースは、タブ( "\ t")

ときは、このスカラを使用してスパークでの私のコードです:

val ssc = new StreamingContext(config, Seconds(10)) 
val packet = ssc.receiverStream(new NiFiReceiver(conf, StorageLevel.MEMORY_ONLY)) 
val file = packet.map(dataPacket => new String(dataPacket.getContent, StandardCharsets.UTF_8)) 

をここまで私は1つの文字列で私の全体のファイル(7000 +行)を取得することができます...残念ながら私は行にその文字列を分割することはできません。私は行全体のファイルを取得する必要があるので、私はオブジェクトでそれを解析し、いくつかの操作を適用し、私が望むものを格納することができます

誰もが私を助けることができますか?

答えて

3

各データパケットはNiFiからの1つのフローファイルの内容になります。したがって、NiFiがHDFSから1つのデータパケットに含まれる1つのTSVファイルをピックアップすると、

あなたのNiFiフローを見ることなく言うのは難しいですが、ストリーミングを開始する前に、Ni​​FiでTSVを分割するためにSplitTextをライン数1で使用できます。

+0

ありがとうございました...これは私の問題を完全に解決しました...私はNiFiで解決するとは思っていませんでした...私はSparkに焦点を合わせました... –

関連する問題