Apache Spark Javaを使用してhdfsからファイルをストリームとして読み込むにはどうすればよいですか? ファイル全体を読み込みたくない場合は、条件が満たされたときにファイルの読み込みを停止するためにファイルストリームを作成したいのですが、どのようにApache Sparkで行うことができますか?Apache SparkファイルをHDFSからストリームとして読み込みます
6
A
答えて
1
あなたはSSCメソッドを使用してストリーミングHDFSファイルを使用することができます
ヴァルSSC =新しいStreamingContext(sparkConf、秒(batchTime))
ヴァルDSTREAM = ssc.fileStream [LongWritable、テキスト、TextInputFormat]( streamDirectory 、(x:パス) => true、newFilesOnly = false)
上記の使用api 処理するパスをフィルタリングする機能。
条件がファイルパス/名前ではなく、データに基づいている場合は、条件が満たされればストリーミングコンテキストを停止する必要があります。
1つのスレッドでは、ストリーミングコンテキストが停止していることを確認し、sscが停止している場合は、別のスレッドに待機して新しいストリーミングコンテキストを作成するよう通知する必要があります。
2)2番目のスレッドでは、条件を確認し、条件が満たされた場合にストリーミングコンテキストを停止する必要があります。
説明が必要な場合はお知らせください。
+0
私は例えば2つのファイルがあり、それぞれから数行から数えてN行だけを読みたいという問題があります。あなたのソリューションは非常に高価になります。 – Maksym
関連する問題
- 1. SparkのストリームからH2OへのJSONファイルの読み込み
- 2. ストリームからストリームを読み込み[C#]
- 3. ファイルからバイト[]を読み込み、読み込みます
- 4. Apache Sparkのテキストファイルを読み込みできません。
- 5. スカラーを使用してhdfsからデータを読み込みます
- 6. ストリーム画像としてPCから画像を読み込みます
- 7. リモートHDFSからファイルを読み取る
- 8. fread()とgrep-lostカラム名を使用してHDFSからRにcsvファイルを読み込みます
- 9. WPF - ストリームからフォントを読み込みますか?
- 10. sc.textFile(APACHE SPARK RDD)を使用して読み込み中にカンマをエスケープする
- 11. BigQueryからSparkに効率的に読み込みますか?
- 12. アンドロイドボックスのHDMI入力から生ストリームを読み込みます
- 13. ルビからUSB入力ストリームを読み込みます。
- 14. Spark/ScalaのHDFSへの書き込み
- 15. Apache Samzaのファイルをローカルファイルシステムとhdfsシステムから読み取る方法
- 16. ファイルから読み込み、\ nとスペースを削除します
- 17. Apache SparkがS3を読み込みます:thread.lockオブジェクトをpickleできません
- 18. ストリームを含むファイルを読み込む
- 19. namenode.LeaseExpiredException非hdfsソースからの読み込み時
- 20. テキストファイルから読み込むときにApache Ignite Cacheを読み込む方法
- 21. リポジトリクラスをjsonファイルから読み込み、ファイルの読み込み方法は?
- 22. Akkaストリームを使用したCSVファイルの読み込み
- 23. ストリームで読み込み中にファイルに書き込んでいますか?
- 24. xlsとxlsxからの読み込みと書き込みは、Javaのファイルexcher
- 25. ファイルからクラスオブジェクトを読み込みC++
- 26. アセンブリMipsはファイルとバッファからテキストを読み込みます
- 27. .htaccessファイルからApacheモジュールを読み込むことはできますか?
- 28. 読み込み用のストリームとしてのJava文字列
- 29. Spark Streaming:ストリームにパイプラインを読み込む方法
- 30. Unicodeファイルの読み込みUnicodeファイルの読み込み
Chck this :: https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala – yoga
この例はありません私の質問に関連して。 – Maksym
達成しようとしていることをよりよく説明できますか?なぜそれをストリームとして必要とするのですか(単にRDD /データフレームとして読むのではなく)?スパークストリーミングを使用してHDFSディレクトリの内容を読み込み、次の時間を待たずに終了する方法を尋ねていますか? DStreamや構造化ストリーミングについても話していますか? –