2017-01-30 4 views
0

私はSparkアプリケーションを開発しました。私はSpark 1.3を使用しなければならないので、ウィンドウ関数は使用できません。私は要素の単一のグループを反復することを選択し、キーによってグループを作成しました。私が今までに見つけた解決策は、キーを収集し、ルックアップ(キー)を使って対応するRDDを取得することです。私のアプローチは非常に非効率的であることは分かっていますが、RDDをリストに変換して別の方法で別のリストを返す関数をどのように適用するのか分かりません。キーでグループ分けされたRDDでスパーク繰り返し

logon_dhcp = logons.map(lambda logon: (logon.user, (logon.dhcpscopename, logon.city, logon.timestamp))) 
logon_dhcp = logon_dhcp.groupByKey() 

dhcp_change_list = [] 
for key in logon_dhcp.keys().collect(): 
    new_list = dhcp_changed(key,logon_dhcp.lookup(key)) 
    dhcp_change_list = list(set().union(dhcp_change_list,new_list)) 

def dhcp_changed(key,group): 
    values = list(group[0]) 
    values_sorted = sorted(values, key=lambda tup: tup[2]) 
    prevCity = None 
    prevValue = None 
    prevTime = None 
    res = list() 
    for value in values_sorted: 
     if prevCity != None and prevCity != value[1] and notEnoughTime(prevTime,value[2]): 
      res.append((key, prevTime.strftime('%Y-%m-%d %H:%M:%S'), prevCity, value[2].strftime('%Y-%m-%d %H:%M:%S'), value[1])) 
     prevCity = value[1] 
     prevTime = value[2] 
     prevValue = value 
    return res 

私はaggregateByKey()のように同じことをすることができますか?

答えて

0

ように修正機能付き(キー、IterableList)

result = logon_dhcp.map(lambda x: dhcp_changed(x)) 

RDD形式で既にあることから、簡単な地図作品、OK:

def dchp_changed(group): 
    key = str(group[0]) 
    values = list(group[1]) 

私のコードのを改善するための任意の提案をパフォーマンスは歓迎です

関連する問題