1
私はSpark初心者です。スパークrddフィールドの値を別の値で置き換えてください。
私が使用して私のelasticsearchデータベースの最初のRDDの内容を見ることができます:私も私のDSTREAMのために必要な値を使用して取得することができ
print(es_rdd.first())
>>>(u'1', {u'name': u'john'})
を:
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list":brokers})
name=kvs.map(lambda x: x[1])
name.pprint()
>>>>robert
を私は交換するつもりrdd "name": "john"を "robert"で置き換えて、elasticAsarchにsaveAsNewAPIHadoopFile()で新しいrddを挿入してください。
どうすればいいですか? "robert"を新しいrddにマップする方法はありますか?以下のような何か...
new_rdd=es_rdd.map(lambda item: {item[0]:name})
おかげ