2016-03-23 17 views
0

最初の列にhh:mm:ssという形式のタイムスタンプを持つcsvログを読み込みたいとします。エントリをバケットに分割したいと思います。 Scalaのセマンティクス、つまりファイルをストリームとして読み込み、それを解析して(おそらくマッチ述語ですか?)、csvエントリをタプルとして出力するのが最も良い方法は何か不思議です。Scalaを使用してデータをバケットに分割して処理する方法

私はScalaを見てから数年経っていますが、この問題は特に言語に適しているようです。

ログのフォーマット例: [時間]、[文字列]、[INT]、[INT]、[INT]、[INT]、入力で[文字列]

最後のフィールドをにマッピングすることができ出力タプルのエミュムですが、その中に価値があるかどうかはわかりません。

私は使用できる一般的なレシピに満足していますし、問題に適した特定の組み込み関数の提案もあります。

全体的な目標はmap-reduceです。ここで私は時間ウィンドウ内の要素を数えたいが、それらの要素は並べ替えとカウントを行う前にregex replaceで前処理する必要がある。

私はこの問題を抽象的に保つように努めました。そのため、問題は次のパターンとして近づく可能性があります。

ありがとうございました。

+0

ファイルがすでに最初の列でソートされているかどうかは大きな違いがあります。 –

+0

本当に十分です。ログ順序はタイムスタンプによるものです。 –

答えて

1

おそらく、最初のパスとして、単純ながこのトリックを行いますか?

GROUPBYイディオム、およびいくつかのフィルタリングを使用して
logLines.groupBy(line => line.timestamp.hours) 
+0

このパターンはうまくいくかもしれませんが、ログの中の時間フィールドの変換、つまり文字列をパーティション化可能な型に変換することについて、より多くの洞察を得たいと考えていました。最初にタプルにラインを分割し、最初のエレメントからタイムスタンプフィールドを作成することをお勧めしますか? –

+0

あなたは確かにそうすることができます。おそらく、HH:mmコンポーネントに正規表現を適用しますか? –

0

、私の解決策は

val lines: Traversable[String] = source.getLines.map(_.trim).toTraversable 
val events: List[String] = lines.filter(line => line.matches("[\\d]+:.*")).toList 
val buckets: Map[String, List[String]] = events.groupBy { line => line.substring(0, line.indexOf(":")) } 

のように見えるこれは私の24のバケット、各時間の1を提供します。今度は、各バケットを処理し、正規表現の置換を実行してURIのパラメータを解除し、最後に各経路が発生した頻度を見つけるためにmap-reduceを行う必要があります。

重要なメモ。最初にTraversableストリームからListを作成せずに、groupByが必要に応じて動作しないことを知りました。そのステップがなければ、最終結果は1時間ごとの単一値マップになります。パーティション化する前にすべてのイベントをメモリーにロードする必要があるため、おそらく最もパフォーマンスの高いソリューションではありません。ストリームを分割できる優れたソリューションはありますか?おそらく、ストリームが処理されるときに変更可能なSetにイベントを追加するものがありますか?

関連する問題