2016-12-30 5 views
1

私はSpark初心者です。スパークrddフィールドの値を別の値で置き換えてください。

私が使用して私のelasticsearchデータベースの最初のRDDの内容を見ることができます:私も私のDSTREAMのために必要な値を使用して取得することができ

print(es_rdd.first()) 
>>>(u'1', {u'name': u'john'}) 

を:

kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list":brokers}) 
name=kvs.map(lambda x: x[1]) 
name.pprint() 
>>>>robert 

を私は交換するつもりrdd "name": "john"を "robert"で置き換えて、elasticAsarchにsaveAsNewAPIHadoopFile()で新しいrddを挿入してください。

どうすればいいですか? "robert"を新しいrddにマップする方法はありますか?以下のような何か...

new_rdd=es_rdd.map(lambda item: {item[0]:name}) 

おかげ

答えて

2

私たちはインデックスのリストによると、別のRDDとRDDの一部を置き換えることができます。たとえば、(RDD)の要素を1,2,3,4から2,3,4,4に置き換えます。

a = sc.parallelize([1,2,3,4]) 
repVals = sc.parallelize([2,3,4]) 
idx = sc.parallelize([0,1,2]) . # idx has the same number of values with repVals 

a = a.zipWithIndex() 
ref = idx.zip(repVals).collectAsMap() # create a dictionary of format {idex:repValue} 

anew = a.map(lambda x:ref[x[1]] if x[1] in ref else x[0]) 
anew.collect() 

結果ショー[2,3,4,4-]

関連する問題