スタンドアロンのスパークの中で私はデータフレームからElasticsearchに書き込もうとしています。私はそれを動作させることができますが、私が理解できないことは、 'index_name- {ts_col:{YYYY-mm-dd}}'のようなフォーマットの動的な名前付きインデックスに書き込む方法です。 'ts_col'はデータセットのdatetimeフィールド。elasticsearch-hadoop/sparkを使用して、フォーマットされた日付の動的に作成されたElasticsearchインデックスに書き込むことはできますか?
文法の種類はうまくいくはずだと私は思っていましたが、試してみると一番下にエラーがあります。最初に索引が作成される前に存在するかどうかを調べているようですが、動的に作成された索引名ではなく、書式なしの索引名を渡しています。私はpython elasticsearchモジュールを使用して同じ構文でインデックスを作成しようとしましたが、動的インデックス名は処理できません。
解決策はありますか?spark内のデータセットをループして、それぞれの日付を検索し、必要なインデックスを作成し、各インデックスに一度に1つずつ書き込む必要があります?私は明白な何かを欠いていますかLogstashはこれを簡単に実行しますが、なぜSpark内で動作させることができないのですか?ここで
は、私が使用している書き込みコマンドは(あまりにもそれのさまざまなバリエーションを試してみました)です:
df.write.format("org.elasticsearch.spark.sql")
.option('es.index.auto.create', 'true')
.option('es.resource', 'index_name-{ts_col:{YYYY.mm.dd}}/type_name')
.option('es.mapping.id', 'es_id')
.save()
ここで私が使用しているJARの:
elasticsearch-hadoop-5.0.0/dist/elasticsearch-spark-20_2.11-5.0.0.jar
は、ここで私、私が取得エラーです上記の書き込みコマンドを使用します。
ERROR NetworkClient: Node [##.##.##.##:9200] failed (Invalid target URI [email protected]/index_name-{ts_col:{YYYY.mm.dd}}/type_name); selected next node [##.##.##.##:9200]
...
...
Py4JJavaError: An error occurred while calling o114.save. : org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed;
そして、私はTrueに上書き設定している場合、私が取得:
Py4JJavaError: An error occurred while calling o58.save. : org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: no such index null at org.elasticsearch.hadoop.rest.RestClient.checkResponse(RestClient.java:488) at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:446) at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:436) at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:363) at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:92) at org.elasticsearch.hadoop.rest.RestRepository.delete(RestRepository.java:455) at org.elasticsearch.spark.sql.ElasticsearchRelation.insert(DefaultSource.scala:500) at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:94) at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:442) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:194) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:745)
そして、私は先に私が得る時間のインデックスを作成するためにElasticsearchのPythonクライアントを使用しようとすると:
RequestError: TransportError(400, u'invalid_index_name_exception', u'Invalid index name [index_name-{ts_col:YYYY.MM.dd}], must be lowercase')
遅れて申し訳ありませんが、ついにこれを試してみました。私は2つの問題があった。私は中括弧が多すぎて、日付欄だけでなく、タイムスタンプ欄も使用していました。新しい日付列を追加すると、それに基づいてインデックスを簡単に作成することができました。 df.write \t .format( "org.elasticsearch.spark.sql")\ \t .OPTION( 'es.index.auto.create'、 '真')\ を\:ここで働いていたサンプルコードです\t .OPTION( 'es.write.operation'、 'UPSERT') \t .mode( 'APPEND')\ \t .SAVE \ \t .OPTION( 'es.mapping.id'、 'ES_ID')(\ "%s- {es_date:YYYY.MM.dd} /%s"%(index、type)) – Jim