2015-11-12 10 views
5

Scalaを使ってsparkでデータフレームを操作するのは難しいです。一意のエントリの列を抽出するデータフレームがある場合、groupByを使用するとデータフレームが返されません。sparkでgroupByを使用してDataFrameに戻す

machine_id | event  | other_stuff 
34131231 | thing  | stuff 
83423984 | notathing | notstuff 
34131231 | thing | morestuff 

と私は、私はいくつかのフィルタリングを行うことができるようにイベントが事の新しいDataFrameに格納されたユニークなマシンIDを希望:

例えば、私は次の形式を持っているDataFrameと呼ばれるログを持っていますある種の私が使用するお尻の痛みであるバックグループ化されたデータのヴァルを取得(または私が適切にオブジェクトのこの種を使用する方法がわからない)

val machineId = logs 
    .where($"event" === "thing") 
    .select("machine_id") 
    .groupBy("machine_id") 

を使用します。一意のマシンIDのリストを得たので、別のDataFrameをフィルタリングして個々のマシンIDのすべてのイベントを抽出したいと考えています。

私はかなり定期的にこの種のものをやりたいだろう見ることができますし、基本的なワークフローは次のとおりです。

  1. ログテーブルからユニークなidを抽出します。
  2. 固有IDを使用して、特定のIDのすべてのイベントを抽出します。
  3. 抽出されたこのデータに対して何らかの分析を行います。

これは最初の2つのステップですが、ここではいくつかの指針をお読みいただければ幸いです。

私はこの例がうまくいくと思っていますが、うまくいけば私の問題が何であるかを説明します。 GroupedDataオブジェクトについては十分に分かっていないかもしれませんが、(私が望んでいるように)これは簡単にデータフレームに何かがありません。 Scala 2.10.4上に構築されたspark 1.5を使用しています。

おかげ

答えて

7

ジャストdistinctないgroupByを使用します。

SQLと同等になります
val machineId = logs.where($"event"==="thing").select("machine_id").distinct 

SELECT DISTINCT machine_id FROM logs WHERE event = 'thing' 

GroupedDataを直接使用するためのものではありません。これは多くのメソッドを提供します。aggが最も一般的です。異なる集計関数を適用してDataFrameに変換するために使用できます。 SQLの面では、wheregroupBy後に持っているもの...aggまたは同等の方法によって提供されている必要があり、この

SELECT machine_id, ... FROM logs WHERE event = 'thing' GROUP BY machine_id 

のようなものと同じです。

1

sparkでグループ化してから集計し、select文でデータフレームを返します。あなたの例では、次のようになります:

val machineId = logs 
    .groupBy("machine_id", "event") 
    .agg(max("other_stuff")) 
    .select($"machine_id").where($"event" === "thing") 
関連する問題