2016-05-25 6 views
2

私はSparkとHadoopエコシステムを初めて使い、すでに恋に落ちています。 今、私はSparkに既存のJavaアプリケーションを移植しようとしています。ファイル入力用の手動分割アルゴリズムの定義

このJavaアプリケーションは、次のように構成されています

  1. 読むファイル(複数可)入力データにいくつかの重いコンピューティングを行うカスタムパーサクラスとBufferedReaderと一つずつ。入力ファイルのサイズは、1〜最大2.5 GBです。
  2. メモリにデータを格納する(HashMap<String, TreeMap<DateTime, List<DataObjectInterface>>>
  3. メモリ内データストアをJSONとして書き出します。これらのJSONファイルはサイズが小さいです。

ファイルを1人の作業者が処理するScalaアプリケーションを作成しましたが、これは明らかにSparkから得られるパフォーマンス上のメリットではありません。

これをSparkに移植することで問題が発生しました。 入力ファイルは行ベースです。私は通常、1行に1つのメッセージがあります。ただし、一部のメッセージは、前の行に依存して、実際の有効なメッセージをパーサーに形成します。

  1. {タイムスタンプ}#0x033#{data_bytes} \ nは
  2. {タイムスタンプ}#0x034:例えば、私が入力ファイルに次の順序でデータを取得することを起こる可能性#{data_bytes} \ n
  3. {タイムスタンプ}#0x035#{data_bytes} \ n
  4. {タイムスタンプ}#0 "組成物メッセージ" 0x036のうち、パーサーも必要であることを実際のメッセージを形成するx0FE位{data_bytes} \ n
  5. {タイムスタンプ}#0x036#{data_bytes} \ n

メッセージ0x033、0x034、0x035の行他のメッセージもまた、これらの必要なメッセージのセットの間に入ることができる。しかし、ほとんどのメッセージは1行を読むことで解析できます。

最後に私の質問 私の目的のためにSparkにファイルを正しく分割させる方法を教えてください。ファイルを「ランダムに」分割することはできません。それらは、すべてのメッセージが解析され、パーサーが決して得ることのない入力を待たないように分割されなければなりません。つまり、各コンポジションメッセージ(前の行に依存するメッセージ)を1つの分割にする必要があります。

  • は、ファイル入力のためのマニュアルの分割アルゴリズムを定義します。

    は、私はそこに正しい出力を達成するために、いくつかの方法があるが、私は私もこの記事に持っていたいくつかのアイデアを投げますね?これは、分割の最後の数行に "大きな"メッセージ[0x033,0x034,0x035]の先頭が含まれていないことを確認します。

  • ファイルを分割しますが、最後の分割から次の分割に固定数の行を追加します(これは確かにジョブを実行します)。複数のデータはParserクラスによって正しく処理され、問題は導入されません。

2番目の方法は簡単かもしれませんが、私はSparkでこれをどのように実装するのか手がかりがありません。誰かが私を正しい方向に向けることができますか?

ありがとうございます!

答えて

1

私のblogpostへのあなたのコメントをhttp://blog.ae.be/ingesting-data-spark-using-custom-hadoop-fileinputformat/に見て、私の意見をここに伝えました。

まず第一に、私はあなたが何をしようとしているか完全にはわかりません。ここで助けてください:あなたのファイルに0x033、0x034、0x035、0x036を含む行が含まれていますので、Sparkはそれらを別々に処理しますか?実際にこれらの回線は一緒に処理する必要がありますか?

この場合、これを「破損した分割」と解釈しないでください。 blogpostで読むことができるように、Sparkは別々に処理できるファイルをレコードに分割します。デフォルトでは、改行にレコードを分割することでこれを行います。あなたの場合、あなたの "記録"は実際には複数の行に分散しています。だから、あなたはカスタムのfileinputformatを使うことができます。私はこれが最も簡単な解決策になるとは思わない。

これを解決するには、次のことを行うカスタムのfileinputformatを使用します。デフォルトのfileinputformatのように行単位で入力するのではなく、ファイルを解析して遭遇したレコード(0x033,0x034など)を追跡します。その間、0x0FEのようなレコードをフィルタリングすることができます(他の場所で使用したいかどうかはわかりません)。この結果、Sparkはこれらすべての物理レコードを1つの論理レコードとして取得します。

一方、ファンクションキー([オブジェクト33、0x033]、[オブジェクト33、0x034]など)を使用して、ファイルを1行ずつ読み込み、マップする方が簡単かもしれません。この方法で、選択したキーを使用してこれらの行を組み合わせることができます。

確かに他のオプションがあります。どちらを選択するかは、ユースケースによって異なります。

+0

なぜ私はもともとこの質問を始めました:私は、スパークが間違った行でファイルを分割する可能性があると考えました。 これはSparkが最後の行が「合成メッセージ」の開始点であるときに新しい分割を行うことを決めることができ、私の例では[0x033]を含んでいます。私のJava-Parsing-Codeは、各Splitが別のワーカーによって処理される可能性があるため、以前の行にアクセスすることができないと考えられています。 これは、私がカスタム入力形式で "前処理"する必要があると考えているため、これらの依存している行のメッセージを正しく解析できるかどうかを確認することです。 – j9dy

+0

2番目の提案が問題を解決する別の方法かもしれません。これを持っていただきありがとうございます。 私はそれに関するさらに多くの質問を持っています: - 私の各ファイルには50.000.000行までが含まれています。これはあなたが提案したフォーマットにラインをマッピングする際に問題を引き起こすでしょうか? - 後でこれらのKey/Value-List([オブジェクト33、0x033]、[オブジェクト33、0x034]、...)を処理する方法はありますか? - 名前が "オブジェクト33"のような1つのキーが合計値の70%のようになったらどうなりますか?これは何らかの形でさらなる処理のために分割されますか? – j9dy

+0

あなたの最初のコメントについて:Sparkは、あなたがそのようにプログラムするときに、別の場所に新しいスプリットを作ることしかできません。私。ファイルの分割方法を定義するfileinputformatを作成します。分割されたレコードを別の作業者が処理できると言うと、あなたは正しいです。私はfileinputformatがあなたの問題の有効な解決策かもしれないと思います! – Gurdt

関連する問題