私はSparkとHadoopエコシステムを初めて使い、すでに恋に落ちています。 今、私はSparkに既存のJavaアプリケーションを移植しようとしています。ファイル入力用の手動分割アルゴリズムの定義
このJavaアプリケーションは、次のように構成されています
- 読むファイル(複数可)入力データにいくつかの重いコンピューティングを行うカスタムパーサクラスと
BufferedReader
と一つずつ。入力ファイルのサイズは、1〜最大2.5 GBです。 - メモリにデータを格納する(
HashMap<String, TreeMap<DateTime, List<DataObjectInterface>>>
) - メモリ内データストアをJSONとして書き出します。これらのJSONファイルはサイズが小さいです。
ファイルを1人の作業者が処理するScalaアプリケーションを作成しましたが、これは明らかにSparkから得られるパフォーマンス上のメリットではありません。
これをSparkに移植することで問題が発生しました。 入力ファイルは行ベースです。私は通常、1行に1つのメッセージがあります。ただし、一部のメッセージは、前の行に依存して、実際の有効なメッセージをパーサーに形成します。
- {タイムスタンプ}#0x033#{data_bytes} \ nは
- {タイムスタンプ}#0x034:例えば、私が入力ファイルに次の順序でデータを取得することを起こる可能性#{data_bytes} \ n
- {タイムスタンプ}#0x035#{data_bytes} \ n
- {タイムスタンプ}#0 "組成物メッセージ" 0x036のうち、パーサーも必要であることを実際のメッセージを形成するx0FE位{data_bytes} \ n
- {タイムスタンプ}#0x036#{data_bytes} \ n
メッセージ0x033、0x034、0x035の行他のメッセージもまた、これらの必要なメッセージのセットの間に入ることができる。しかし、ほとんどのメッセージは1行を読むことで解析できます。
最後に私の質問 私の目的のためにSparkにファイルを正しく分割させる方法を教えてください。ファイルを「ランダムに」分割することはできません。それらは、すべてのメッセージが解析され、パーサーが決して得ることのない入力を待たないように分割されなければなりません。つまり、各コンポジションメッセージ(前の行に依存するメッセージ)を1つの分割にする必要があります。
- は、ファイル入力のためのマニュアルの分割アルゴリズムを定義します。
は、私はそこに正しい出力を達成するために、いくつかの方法があるが、私は私もこの記事に持っていたいくつかのアイデアを投げますね?これは、分割の最後の数行に "大きな"メッセージ[0x033,0x034,0x035]の先頭が含まれていないことを確認します。
- ファイルを分割しますが、最後の分割から次の分割に固定数の行を追加します(これは確かにジョブを実行します)。複数のデータはParserクラスによって正しく処理され、問題は導入されません。
2番目の方法は簡単かもしれませんが、私はSparkでこれをどのように実装するのか手がかりがありません。誰かが私を正しい方向に向けることができますか?
ありがとうございます!
なぜ私はもともとこの質問を始めました:私は、スパークが間違った行でファイルを分割する可能性があると考えました。 これはSparkが最後の行が「合成メッセージ」の開始点であるときに新しい分割を行うことを決めることができ、私の例では[0x033]を含んでいます。私のJava-Parsing-Codeは、各Splitが別のワーカーによって処理される可能性があるため、以前の行にアクセスすることができないと考えられています。 これは、私がカスタム入力形式で "前処理"する必要があると考えているため、これらの依存している行のメッセージを正しく解析できるかどうかを確認することです。 – j9dy
2番目の提案が問題を解決する別の方法かもしれません。これを持っていただきありがとうございます。 私はそれに関するさらに多くの質問を持っています: - 私の各ファイルには50.000.000行までが含まれています。これはあなたが提案したフォーマットにラインをマッピングする際に問題を引き起こすでしょうか? - 後でこれらのKey/Value-List([オブジェクト33、0x033]、[オブジェクト33、0x034]、...)を処理する方法はありますか? - 名前が "オブジェクト33"のような1つのキーが合計値の70%のようになったらどうなりますか?これは何らかの形でさらなる処理のために分割されますか? – j9dy
あなたの最初のコメントについて:Sparkは、あなたがそのようにプログラムするときに、別の場所に新しいスプリットを作ることしかできません。私。ファイルの分割方法を定義するfileinputformatを作成します。分割されたレコードを別の作業者が処理できると言うと、あなたは正しいです。私はfileinputformatがあなたの問題の有効な解決策かもしれないと思います! – Gurdt