2016-11-11 5 views
3

Elastic Search on Elastic Searchにバージョン2.4.0のペアを書き込もうとしています。 私はelasticsearch-spark_2.10-2.4.0プラグインを使用してESに書き込みます。ここ は、私がESへの書き込みに使用していたコードです:スパークからエラスティックサーチへのrddの書き込みに失敗しました

def predict_imgs(r): 
    import json 
    out_d = {} 
    out_d["pid"] = r["pid"] 
    out_d["other_stuff"] = r["other_stuff"] 

    return (r["pid"], json.dumps(out_d)) 

res2 = res1.map(predict_imgs) 

es_write_conf = { 
"es.nodes" : image_es, 
#"es.port" : "9243", 
"es.resource" : "index/type", 
"es.nodes.wan.only":"True", 
"es.write.operation":"upsert", 
"es.mapping.id":"product_id", 
"es.nodes.discovery" : "false", 
"es.net.http.auth.user": "username", 
"es.net.http.auth.pass": "pass", 
"es.input.json": "true", 
"es.http.timeout":"1m", 
"es.scroll.size":"10", 
"es.batch.size.bytes":"1mb", 
"es.http.retries":"1", 
"es.batch.size.entries":"5", 
"es.batch.write.refresh":"False", 
"es.batch.write.retry.count":"1", 
"es.batch.write.retry.wait":"10s"} 

res2.saveAsNewAPIHadoopFile(
path='-', 
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", 
keyClass="org.apache.hadoop.io.NullWritable", 
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
conf=es_write_conf) 

次のように私が手にエラーがある:

Py4JJavaError: An error occurred while calling  z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 744 in stage 26.0 failed 4 times, most recent failure: Lost task 744.3 in stage 26.0 (TID 2841, 10.181.252.29): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) 

興味深い部分は、私は最初の数のテイクを行うときに、この作品でありますRDD2上の要素と、それから新しいRDDを作成し、ESにそれを書き、それが完璧に動作します:私は弾性クラウド(弾性検索のクラウドサービス)とDatabricks(雲のを使用しています

x = sc.parallelize([res2.take(1)]) 
x.saveAsNewAPIHadoopFile(
path='-', 
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", 
keyClass="org.apache.hadoop.io.NullWritable", 
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
conf=es_write_conf) 

Apache Sparkの運営) ESがSparkのESへの書き込みスループットに追いつけないことはありますか? 私はElastic Cloudのサイズを2GB RAMから8GB RAMに増やしました。

上記のes_write_confにはどのような推奨設定がありますか?あなたが考えることができる他のconfs? ES 5.0へのアップデートは役に立ちますか?

何か助けていただければ幸いです。これで数日間は苦労してきました。ありがとうございました。

答えて

2

これは、pysparkの計算に問題があるように見えますが、必然的に弾性検索保存プロセスではありません。して、RDDSがOKであることを確認してください:

  1. がRDD1上RDD2にcount()を実行
  2. (結果 "実体化" へ)count()を実行

数がOKであれば、ESに保存する前に、キャッシングの結果としてみてください:

res2.cache() 
res2.count() # to fill the cache 
res2.saveAsNewAPIHadoopFile(... 

問題がまだ(あなたがSでエグゼキュー]タブでそれらを見つけることができ、死んだのエグゼキュータの標準エラー出力と標準出力を見てみてください表示されますparkUI)。

また、非常に小さなバッチサイズがes_write_confになっていることに気付きました。パフォーマンスを向上させるには、500または1000に増やしてみてください。

関連する問題