2016-04-22 8 views
2

Graph.pregelアルゴリズムをGraphFrame.aggregateMessagesに移植しています。私は少し煩わしいGraphFrame APIを見つけています。Sparkで複雑なカラム構造を作成するためのショートカット

Graph APIでは、メッセージタイプとしてcase classを送信できます。しかし、GraphFrame APIでは、aggregateMessages.sendToSrc.sendToDstは、SQL式StringまたはColumnのいずれかで動作します。私はこれが尻の痛みであるほど強力であることを見出しています。

は、あなたが持っていると言う:私は Columnを構築する必要が GraphFramesIterator((1L, Send(Vote(yay = true), from = 2L)))

case class Vote(yay: Boolean, voters: Long = 1L) 
case class Send(vote: Vote, from: Long) 

が、私はのようなものかもしれないIterator[(VertexId,Send)]返しsendMsg、構築することができGraphXpregel機能を使います上記のサンプルよりも複雑な私の定義済みのcase classesを完全に破棄することなく、理想的にはIterator[(VertexId,Send)]と同じ目的を果たします。

これにはどのようなショートカットがありますか?

対応する構造体へcase classのインスタンスを変換するにはかなり簡単だった:私は今のところ得たもの

。これは主にそこに私を取得します。

def ccToStruct(cc: Product) : Column = { 
    val values = cc.productIterator 
    var seq = Seq[Column]() 
    while (values.hasNext) { 
    val field = values.next() match { 
     case p: Some[Product @unchecked] if (p.get.productArity > 0) => ccToStruct(p.get) 
     case p: Product if (p.productArity > 0) => ccToStruct(p) 
     case x => lit(x) 
    } 
    seq = seq :+ field 
    } 
    struct(seq:_*) 
} 

これは私が行うことができます:

ccToStruct(Send(Vote(true, 1L), 123L)) 
// res4: org.apache.spark.sql.Column = struct(struct(true,1),123) 

私はそれを正しく動作させるために少しスキーマをアップパッチを適用する必要があるだろうが、私はそれを行うために開始する前にこれはまったく役に立たないアプローチだと気付きました。 case classの値をstructに変換することは決してありません。ccToStruct(Send(Vote(true, 1L), 123L))は、かなり役に立たないメッセージを作成します。 lit()がケースクラスをサポートしていないことを除いて、lit(Send(..))値を送信するのと同じです。あなたの代わりに何をしたいのか

はミックスと一致​​3210値をAM.dst("*")AM.src("*")列ではなく、case classのスキーマに対応するそうすることです。 (私はケースクラスを完全に放棄することを考えましたが、UDAFsumメッセージがあり、そのロジックはケースクラスを使用している限り、移植が非常に簡単でした)。

私は答えが可能であると信じています。このような構造を作成:

import org.graphframes.lib.AggregateMessages 
val AM = AggregateMessages 

val msg = Seq[Any](Seq[Any](true, 1L), AM.src("id")) 

をそしてstruct()と私の場合クラスのスキーマを使用してColumnにそれを変換します。

誰もこれを行う良い方法がない場合(そしておそらく誰かが行っても)、私は自分の質問に答えて後で解決します。

答えて

0

ここで私が思いついたのです。

ケースクラスの構造を持つColumnオブジェクトを作成することですが、DataFrame.columnsにバインドする機能を使用して、私のプライマリデータ構造はSeq[Any]であると判断しました。 Seqは、私のケースクラスの構造と一致する必要があります。Seqは、基本的にケースクラスのコンストラクタ引数です。私の場合クラスがある場合:

case class Vote(yay: Boolean, voters: Long) 

それから私は、次の投票のようなSeq作成することができます。

val voteSeq = Seq[Any](true, 1L) 

をしかし、さらに興味深いのは、私が作成することができますので、私はSeq[Any]を使用する必要があります理由は、あります:

val boundVote = Seq[Any](true, AM.edge("voters")) 

私はColumnSeqを変換するために使用できる関数のカップルを思い付きました。私はColumnをSQL関数struct()で作成します。これはSQL文字列式でもすべて実行できます。しかし、代わりにColumnsに行くことに決めました。私はそれをできるだけきれいにしたいと思っていました。そして、String SQL式が乱雑になりました。あなたが正しく、構造体の中にあなたの列に名前を付けていない場合は

、あなたがどのように見える構造体を得る:つもりあなたはケースクラスにそれを変換しようとしているときに挑戦し、後に吸うだ

vote: struct (nullable = false) 
    |-- col1: boolean (nullable = false) 
    |-- col2: long (nullable = false) 

。代わりに、あなたはあなたが得るので、すべての列に対してasを使用する必要があります。

vote: struct (nullable = false) 
    |-- yay: boolean (nullable = false) 
    |-- voters: long (nullable = false) 

ソリューションはStructTypeを取り、フィールド名を作成するためにそれを使用することです。それが判明したので、私はすでにケースクラスからStructTypeを自動的に取り上げてカバーしていたので、簡単な部分でした。最初の関数は、ハードの部分を行う - それは再帰的Seqとスキーマの両方をウォークスルーし、最終的に最終的に没頭Columnsを生成します:struct(colSeq:_*)

def seqToColumnSchema(anySeq: Seq[Any], schema: StructType) : Column = { 
    var colSeq = Seq[Column]() 
    anySeq.zip(schema.fields).foreach{ case (value, field) => { 
    colSeq = colSeq :+ (value match { 
     case c: Column => c as field.name 
     case p: Seq[Any] if (p.length > 0) => { 
     field.dataType match { 
      case s: StructType => seqToColumnSchema(p, s) as field.name 
      case a: ArrayType => array(p.map(v => lit(v)):_*) as field.name 
      case x => lit(x) as field.name 
     } 
     } 
     case x => lit(x) as field.name 
    }) 
    }} 
    struct(colSeq:_*) 
} 

この第二の機能は、最初の単なるラッパーでありますそれはあなたが行うことができます:

seqToColumn[Vote](Seq(true, AM.edge("voters"))) 

代わりStructTypeを提供するので、あなただけの[...]

内側ケースクラスの名前を与える必要があり

すべてのこと、私はこれを行うことができますちょうどように:

import org.graphframes.lib.AggregateMessages 
val AM = AggregateMessages 

case class Vote(yay: Boolean, voters: Long) 

val voteSeq = Seq[Any](true, AM.edge("voters")) 
val voteMsg = seqToColumn[Vote](voteSeq) 
// voteMsg: org.apache.spark.sql.Column = struct(true AS yay#18,edge[voters] AS voters#19) 

graphFrame.aggregateMessages.sendToDst(voteMsg).agg(voteSum(AM.msg) as "out").printSchema 
root 
|-- id: long (nullable = false) 
|-- out: struct (nullable = true) 
| |-- vote: struct (nullable = false) 
| | |-- yay: boolean (nullable = false) 
| | |-- voters: long (nullable = false) 
+0

ハズレなしバグ - あなただけを指定する必要がありますする必要があり 'のSeq [任意]'配列(1.0、4L、123) 'ので、 'は' Seq [Any](1.0、4L、123) 'と同じではなく、2番目のものだけがあなたの値を互換性のある型にスカッシュしません。 –

関連する問題