2017-10-29 3 views
0

私は以下のようなデータを持っている:PySpark相当

+----+----+ 
|user|item| 
+----+----+ 
| a| 1| 
| a| 2| 
| a| 3| 
| b| 1| 
| b| 5| 
| b| 4| 
| b| 7| 
| c| 10| 
| c| 2| 
+----+----+ 

私は次のようにのようないくつかの変換後のデータを持っているしたいと思います:

(a,(a,1)) 
(a,(a,2)) 
(a,(a,3)) 
(b,(b,1)) 
(b,(b,5)) 
(b,(b,4)) 
(b,(b,7)) 
(c,(c,10)) 
(c,(c,2)) 

彼らは別々のRDDSかもしれません。それは私にとっては大丈夫だろう。

データセットとscalaとjavaのgroupbykeyとflatmapgroupsの組み合わせを使って行うことができますが、残念ながらpysparkにはデータセットやフラットマップグループはありません。

pypsarkでフラットマップとフラットマップの変換を試しましたが、正しい結果を得ることができませんでした。

pysparkを使用して予期した結果を得る方法はありますか?

答えて

0

下のコードをご覧ください。このコードスニペットを使用して解決策を見つけることができると思います。

[ルート@サンドボックス作業]#のHadoopのDFS -put SAMPLE.TXT /ユーザ/

SAMPLE.TXT

a|1 
a|2 
a|3 
b|1 
b|5 
b|4 
b|7 
c|10 
c|2 

[ルート@サンドボックス作業は]

を#pyspark
lines = sc.textFile("hdfs://sandbox/user/sample.txt") 

def parse(line): 
    return (line.split('|')[0], (line.split('|')[0], line.split('|')[1])) 

parsed_lines = lines.map(parse) 

parsed_lines.collect() 

[(u'a', (u'a', u'1')), (u'a', (u'a', u'2')), (u'a', (u'a', u'3')), (u'b', (u'b', u'1')), (u'b', (u'b', u'5')), (u'b', (u'b', u'4')), (u'b', (u'b', u'7')), (u'c', (u'c', u'10')), (u'c', (u'c', u'2'))]