2016-09-12 6 views
2

Apache Spark(Pyspark)を初めて使用しています。この問題を解決する助けをしてくれることをうれしく思います。私は現在、Pyspark 1.6を使用しています(MQTTサポートがないので、2.0を捨てなければなりませんでした)。Apache Spark Countステートの変更

だから、私は、これは基本的にドアの時間とステータスです

+----------+-----------+ 
|  time|door_status| 
+----------+-----------+ 
|1473678846|   2| 
|1473678852|   1| 
|1473679029|   3| 
|1473679039|   3| 
|1473679045|   2| 
|1473679055|   1| 

、以下の情報を持つデータフレームを持っています。私はドアが開いた回数を計算する必要があります&が閉じました。だから、私は状態の変化を特定し、それぞれの状態について独立したカウンターを保持する必要があります。

私はこれに新しいので、これを実装する方法の考え方を考えるのは非常に難しいと感じています。もし誰かが正しい方向に私を指す考えを提案することができれば、それは素晴らしいでしょう。

ありがとうございます!

答えて

0

この場合、アキュムレータを使用して問題を解決する必要があります。 基本的に、3つのステータスの3つの異なるアキュムレータを作成します。

status_1 = sc.accumulator(0) 
status_2 = sc.accumulator(0) 
status_3 = sc.accumulator(0) 

あなたは、このアウトオブボックスのような操作を行うことができます何効率的な方法はありません、以下

if (status == 1): 
    status_1 += 1 
1

ような何かを行うことができます。

from pyspark.sql.window import Window 
from pyspark.sql.functions import col, lag 

df = sc.parallelize([ 
    (1473678846, 2), (1473678852, 1), 
    (1473679029, 3), (1473679039, 3), 
    (1473679045, 2), (1473679055, 1) 
]).toDF(["time", "door_status"]) 


w = Window().orderBy("time") 
(df 
    .withColumn("prev_status", lag("door_status", 1).over(w)) 
    .where(col("door_status") != col("prev_status")) 
    .groupBy("door_status", "prev_status") 
    .count()) 

これは縮尺通りではありません。 mapParitionsを試すことができます。私たちは、このように操作を行うことができ、これらの2つと

def merge(x, y): 
    """Given a pair of tuples: 
    (first-state, last-state, counter_of changes) 
    return a tuple of the same shape representing aggregated results 

    >>> merge((None, None, Counter()), (1, 1, Counter())) 
    (None, 1, Counter()) 
    >>> merge((1, 2, Counter([(1, 2)])), (2, 2, Counter())) 
    (None, 2, Counter({(1, 2): 1})) 
    >>> merge((1, 2, Counter([(1, 2)])), (3, 2, Counter([(3, 2)]) 
    (None, 2, Counter({(1, 2): 1, (2, 3): 1, (3, 2): 1})) 
    """ 
    (_, last_x, cnt_x), (first_y, last_y, cnt_y) = x, y 

    # If previous partition wasn't empty update counter 
    if last_x is not None and first_y is not None and last_x != first_y: 
     cnt_y[(last_x, first_y)] += 1 

    # Merge counters 
    cnt_y.update(cnt_x) 

    return (None, last_y, cnt_y) 

partials = (df.rdd 
    .mapPartitions(process_partition) 
    .collect()) 

reduce(merge, [(None, None, Counter())] + partials) 
+0

@ zero323ありがとうございます。最初の方法が働いた。あなたが述べたように、大量のデータでは遅く見えます。私はまだ2番目の方法を試しています。 – Sisyphus

+0

@DavidArenburg私が従うかわからない。あなたは詳しく説明できますか? – zero323

+1

@DavidArenburgいいえ、もちろんです。私は、異なるパーティションレイアウトでテストスイートを実行するために使用しました。ありがとうございました! – zero323

0

from collections import Counter 

def process_partition(iter): 
    """Given an iterator of (time, state) 
    return the first state, the last state and 
    a counter of state changes 

    >>> process_partition([]) 
    [(None, None, Counter())] 
    >>> process_partition(enumerate([1, 1, 1])) 
    [(1, 1, Counter())] 
    >>> process_partition(enumerate([1, 2, 3])) 
    [(1, 3, Counter({(1, 2): 1, (2, 3): 1}))] 
    """ 

    first = None 
    prev = None 
    cnt = Counter() 

    for i, (_, x) in enumerate(iter): 
     # Store the first object per partition 
     if i == 0: 
      first = x 

     # If change of state update couter 
     if prev is not None and prev != x: 
      cnt[(prev, x)] += 1 

     prev = x 

    return [(first, prev, cnt)] 

とシンプルな合併機能:拳さんは、私たちはパーティションをマップするために使用します関数を定義してみましょう次の解決策を試すことができます:

import org.apache.spark.sql.expressions.Window 

var data = Seq((1473678846, 2), (1473678852, 1), (1473679029, 3), (1473679039, 3), (1473679045, 2), (1473679055, 1),(1473779045, 1), (1474679055, 2), (1475679055, 1), (1476679055, 3)).toDF("time","door_status") 

data. 
    select(
    $"*", 
    coalesce(lead($"door_status", 1).over(Window.orderBy($"time")), $"door_status").as("next_door_status") 
). 
    groupBy($"door_status"). 
    agg(
    sum(($"door_status" !== $"next_door_status").cast("int")).as("door_changes") 
). 
    show 

スカラ言語です。あなたはPythonで同じものを作る必要があります。

0

私はそれをJavaで試してみましたが、実際にはPythonではdataframes APIも同様の方法で可能です。

次のことを行う:一時テーブルとしてデータフレーム/データセットと

  • あなたのデータをロード
  • レジスタのデータフレーム
  • このクエリを実行します。状態
  • BY doorstatesグループから状態、COUNT(*)を選択

ヘッダーを削除してください。