5

私はCassandraから読み取り、データを処理/変換/フィルタリングし、その結果をElasticsearchに書き込みます。私は統合テストにドッカーを使用しています。私はスパークからElasticsearchに書き込むのに苦労しています。Elasticsearch-Hadoopライブラリがドッキング・コンテナに接続できません

依存性:私はセットアップに私のテンプレートとインデックス別名

をのtransportClientを使用してelasticsearchに接続することができ、私のユニットテストで

"joda-time"    % "joda-time"   % "2.9.4", 
"javax.servlet"   % "javax.servlet-api" % "3.1.0", 
"org.elasticsearch"  % "elasticsearch"  % "2.3.2", 
"org.scalatest"   %% "scalatest"   % "2.2.1", 
"com.github.nscala-time" %% "nscala-time"  % "2.10.0", 
"cascading"    % "cascading-hadoop" % "2.6.3", 
"cascading"    % "cascading-local" % "2.6.3", 
"com.datastax.spark"  %% "spark-cassandra-connector" % "1.4.2", 
"com.datastax.cassandra" % "cassandra-driver-core" % "2.1.5", 
"org.elasticsearch"  % "elasticsearch-hadoop"  % "2.3.2" excludeAll(ExclusionRule("org.apache.storm")), 
"org.apache.spark"  %% "spark-catalyst"   % "1.4.0" % "provided" 

。私は

EsSpark.saveToEs(
    myRDD, 
    "hot/mytype", 
    Map("es.mapping.id" -> "id", "es.mapping.parent" -> "parent_id") 
) 

を実行しようとすると、

val conf = new SparkConf().setAppName("test_reindex").setMaster("local") 
    .set("spark.cassandra.input.split.size_in_mb", "67108864") 
    .set("spark.cassandra.connection.host", cassandraHostString) 
    .set("es.nodes", elasticsearchHostString) 
    .set("es.port", "9200") 
    .set("http.publish_host", "") 
sc = new SparkContext(conf) 
esClient = TransportClient.builder().build() 
esClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(elasticsearchHostString), 9300)) 
esClient.admin().indices().preparePutTemplate(testTemplate).setSource(Source.fromInputStream(getClass.getResourceAsStream("/mytemplate.json")).mkString).execute().actionGet() 
esClient.admin().indices().prepareCreate(esTestIndex).execute().actionGet() 
esClient.admin().indices().prepareAliases().addAlias(esTestIndex, "hot").execute().actionGet() 

は、しかし、私はこのスタックトレース

org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[172.17.0.2:9200]] 
at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:142) 
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:434) 
at org.elasticsearch.hadoop.rest.RestClient.executeNotFoundAllowed(RestClient.java:442) 
at org.elasticsearch.hadoop.rest.RestClient.exists(RestClient.java:518) 
at org.elasticsearch.hadoop.rest.RestClient.touch(RestClient.java:524) 
at org.elasticsearch.hadoop.rest.RestRepository.touch(RestRepository.java:491) 
at org.elasticsearch.hadoop.rest.RestService.initSingleIndex(RestService.java:412) 
at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:400) 
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:40) 
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67) 
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
at org.apache.spark.scheduler.Task.run(Task.scala:89) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 
16/08/08 12:30:46 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2, localhost): org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[172.17.0.2:9200]] 
at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:142) 
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:434) 
at org.elasticsearch.hadoop.rest.RestClient.executeNotFoundAllowed(RestClient.java:442) 
at org.elasticsearch.hadoop.rest.RestClient.exists(RestClient.java:518) 
at org.elasticsearch.hadoop.rest.RestClient.touch(RestClient.java:524) 
at org.elasticsearch.hadoop.rest.RestRepository.touch(RestRepository.java:491) 
at org.elasticsearch.hadoop.rest.RestService.initSingleIndex(RestService.java:412) 
at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:400) 
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:40) 
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67) 
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
at org.apache.spark.scheduler.Task.run(Task.scala:89) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 

を受け取る。この作品に私は「ドッカネットワークを使用して確認することができ、接続しようとしている橋を点検正しいIPアドレス。

docker network inspect bridge 
[ 
{ 
    "Name": "bridge", 
    "Id": "ef184e3be3637be28f854c3278f1c8647be822a9413120a8957de6d2d5355de1", 
    "Scope": "local", 
    "Driver": "bridge", 
    "EnableIPv6": false, 
    "IPAM": { 
     "Driver": "default", 
     "Options": null, 
     "Config": [ 
      { 
       "Subnet": "172.17.0.0/16", 
       "Gateway": "172.17.0.1" 
      } 
     ] 
    }, 
    "Internal": false, 
    "Containers": { 
     "0c79680de8ef815bbe4bdd297a6f845cce97ef18bb2f2c12da7fe364906c3676": { 
      "Name": "analytics_rabbitmq_1", 
      "EndpointID": "3f03fdabd015fa1e2af802558aa59523f4a3c8c72f1231d07c47a6c8e60ae0d4", 
      "MacAddress": "02:42:ac:11:00:04", 
      "IPv4Address": "172.17.0.4/16", 
      "IPv6Address": "" 
     }, 
     "9b1f37c8df344c50e042c4b3c75fcb2774888f93fd7a77719fb286bb13f76f38": { 
      "Name": "analytics_elasticsearch_1", 
      "EndpointID": "fb083d27aaf8c0db1aac90c2a1ea2f752c46d8ac045e365f4b9b7d1651038a56", 
      "MacAddress": "02:42:ac:11:00:02", 
      "IPv4Address": "172.17.0.2/16", 
      "IPv6Address": "" 
     }, 
     "ed0cfad868dbac29bda66de6bee93e7c8caf04d623d9442737a00de0d43c372a": { 
      "Name": "analytics_cassandra_1", 
      "EndpointID": "2efa95980d681b3627a7c5e952e2f01980cf5ffd0fe4ba6185b2cab735784df6", 
      "MacAddress": "02:42:ac:11:00:03", 
      "IPv4Address": "172.17.0.3/16", 
      "IPv6Address": "" 
     } 
    }, 
    "Options": { 
     "com.docker.network.bridge.default_bridge": "true", 
     "com.docker.network.bridge.enable_icc": "true", 
     "com.docker.network.bridge.enable_ip_masquerade": "true", 
     "com.docker.network.bridge.host_binding_ipv4": "0.0.0.0", 
     "com.docker.network.bridge.name": "docker0", 
     "com.docker.network.driver.mtu": "1500" 
    }, 
    "Labels": {} 
} 
] 

私はすべてローカルでMacBook/osxで実行しています。 TransportClientを使用してブラウザを使用してドッキング・コンテナに接続することができますが、EsSpark.saveToES(...)関数は常に失敗します。

.config("es.nodes.wan.only", "true") 

を設定することにより

+0

あなたのアプリ名は同じですか? – alpert

+1

'es.nodes.wan.only'パラメータをtrueに設定しようとしますか? –

答えて

0

はElasticsearch インジェスト・ノードを使用するかどうかを

es.nodes.ingest.only

(デフォルトはfalse)この問題を解決することができます。有効にすると、elasticsearch-hadoopは、クラスタ内のノード のインジェストを通じて、すべての の要求(ノード検出後、有効になっている場合)をルーティングします。このコンフィグレーション設定の目的は、で、パイプライン用のデータをノンインジェストノードから転送するコストを回避します。 Ingest Pipeline(上記のes.ingest.pipelineを参照)にデータを書き込む場合にのみ有効です。

関連する問題