2016-04-06 15 views
0

SparkRを効率的に使用するために、プレーンなRコードをSparkRに変換しています。SparkRで列の長さを見つけるには

私は以下の列のCloseDateを持っています。

CloseDate 
2011-01-08 
2011-02-07 
2012-04-07 
2013-04-18 
2011-02-07 
2010-11-10 
2010-12-09 
2013-02-18 
2010-12-09 
2011-03-11 
2011-04-10 
2013-06-19 
2011-04-10 
2011-01-06 
2011-02-06 
2013-04-16 
2011-02-06 
2015-09-25 
2015-09-25 
2010-11-10 

日付を増やした回数をカウントしたいですか?私はそれを行うために以下のRコードを持っています。

dateChange <- function(closeDate, dir){ 
    close_dt <- as.Date(closeDate) 
    num_closedt_out = 0 
    num_closedt_in = 0 

    for(j in 1:length(close_dt)) 
    { 
    curr <- close_dt[j] 
    if (j > 1) 
     prev <- close_dt[j-1] 
    else 
     prev <- curr 
    if (curr > prev){ 
     num_closedt_out = num_closedt_out + 1 
    } 
    else if (curr < prev){ 
     num_closedt_in = num_closedt_in + 1 
    } 
    } 
    if (dir=="inc") 
    ret <- num_closedt_out 
    else if (dir=="dec") 
    ret <- num_closedt_in 
    ret 
} 

私はここでSparkR df $ colを使用しようとしました。スパークが遅れてコードを実行するので、この実行中に長さの値を取得せず、NaNエラーを取得しました。

私が試した変更されたコードは次のとおりです。

DateDirChanges <- function(closeDate, dir){ 
    close_dt <- to_date(closeDate) 
    num_closedt_out = 0 
    num_closedt_in = 0 

    col_len <- SparkR::count(close_dt) 
    for(j in 1:col_len) 
    { 
    curr <- close_dt[j] 
    if (j > 1) 
     prev <- close_dt[j-1] 
    else 
     prev <- curr 
    if (curr > prev){ 
     num_closedt_out = num_closedt_out + 1 
    } 
    else if (curr < prev){ 
     num_closedt_in = num_closedt_in + 1 
    } 
    } 
    if (dir=="inc") 
    ret <- num_closedt_out 
    else if (dir=="dec") 
    ret <- num_closedt_in 
    ret 
} 

このコードの実行中に列の長さを取得するにはどうすればよいですか?それとも、それをするのが他に良いですか?

答えて

2

Columnには長さがないため、できません。 Rの列とは異なり、列はデータ表現ではなく、SQL式と特定のデータ変換を表します。さらに、Spark DataFrameの値の順序は任意ですので、単に見過ごすことはできません。

以前の質問のようにデータを分割することができる場合は、表示されているのと同じ方法でウィンドウ関数を使用することができます。in the answer to your previous questionそれ以外の場合は、SparkRのみを使用してこれを処理する効率的な方法はありません。

SELECT 
    CAST(LAG(CloseDate, 1) OVER w > CloseDate AS INT) gt, 
    CAST(LAG(CloseDate, 1) OVER w < CloseDate AS INT) lt, 
    CAST(LAG(CloseDate, 1) OVER w = CloseDate AS INT) eq 
FROM DF 
WINDOW w AS (
    PARTITION BY partition_col ORDER BY order_col 
) 
+0

私は私のようにデータを分割することができると思う:あなたが必要とするすべては、このようなものであるがために(必要な)を決定する方法であり、あなたが(妥当なパフォーマンスを得るために必要な)データを分割することができますと仮定

よくしかし、そこでは必要な出力を得るためにdatediffを使用します。しかしここで私はカスタム関数を書く必要があります。同様に、LAGから値が増減するかどうかをチェックし、値が増減した回数だけ戻す必要があります。したがって、このカスタム関数は新しい列を作成するためにデータを読み取る必要があります。それを行う可能性はありますか? – sag

+0

注文(必須)とパーティション(パフォーマンス用)を決定する方法がある限り、それはかなり簡単です。 – zero323

+0

これは前の質問と全く同じです。私はちょうどtempTableに固執し、遅れを得ました。そこでは、違いを得るためにdatediffを使用します。ここでは、getIncrementCount(df $ closeDate、df $ lagCloseDate)のような記述が必要です。その機能では、反復してカウントを維持する必要があります。このカウントは、closeDateがlagCloseDate以上になるたびに1ずつ増加する必要があります。私はSparkRが提供するいくつかのデフォルト関数を参照しましたが、それらのすべてがそれを行うためにJavaを呼びます。それは可能ですか?R?質問があまりにも愚かであれば申し訳ありません。私はRとSparkRにとってとても新しいです – sag

関連する問題