2016-11-12 4 views
0

rddの各行にsend_active_mqを適用しようとしています。rdd.foreach pysparkに関数を適用する

def send_to_active_mq(json_string) : 
    k = str(json_string) 
    conn.send(body=k,destination='dwEmailsQueue2') 

json_rdd_to_send.foreach(send_to_active_mq) 

私は火花のドキュメント http://spark.apache.org/docs/1.1.1/api/python/pyspark.rdd.RDD-class.html#foreach

で提案されているのと同じ方法を適用しかし、私は次のエラーを取得しています。

AttributeError: 'builtin_function_or_method' object has no attribute '__code__' 

ここでは何か根本的に間違っていますか?

+2

使用しているスパークのバージョンは何ですか? 1.5.0はあなたに似たシリアライゼーションの問題の影響を受けました:https://issues.apache.org/jira/browse/SPARK-10542 – Mariusz

+0

私はスパーク1.6.1を使用しています –

+0

OK、どのようなタイプのデータです。このRDD含まれますか?これがdictの場合は、すべてのdictのキーと値の型をチェックします。 – Mariusz

答えて

1

これはおそらくconnオブジェクトに接続されています。お試しください:

def send_to_active_mq(json_strings): 
    conn = ... # Initalize connection 
    for json_string in json_strings: 
     conn.send(body=str(json_string) ,destination='dwEmailsQueue2') 

json_rdd_to_send.foreachPartition(send_to_active_mq) 
+0

上記の方法を試しましたが、うまくいきましたが、問題はforeachpartitionです。データを送信して終了する新しい接続を開きます。これを行う効率的な方法はありますか? –

+0

これが唯一の方法です。 PySparkには共有状態はありません。 –

+0

助けてくれてありがとう。 –

関連する問題