0

KafkaとSparkを介してAvroメッセージのストリームを処理している間に、処理済みのデータをElasticSearchインデックスに文書として保存しています。 は、ここでは、コード(簡体字)です:予想通りSparkタスクでElasticSearchにデータを保存する

directKafkaStream.foreachRDD(rdd ->{ 

     rdd.foreach(avroRecord -> { 
      byte[] encodedAvroData = avroRecord._2; 
      MyType t = deserialize(encodedAvroData); 

    // Creating the ElasticSearch Transport client 
    Settings settings = Settings.builder() 
      .put("client.transport.ping_timeout", 5, TimeUnit.SECONDS).build(); 
    TransportClient client = new PreBuiltTransportClient(settings) 
      .addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300)); 

    IndexRequest indexRequest = new IndexRequest("index", "item", id) 
      .source(jsonBuilder() 
        .startObject() 
        .field("name", name) 
        .field("timestamp", new Timestamp(System.currentTimeMillis())) 
        .endObject()); 

    UpdateRequest updateRequest = new UpdateRequest("index", "item", id) 
      .doc(jsonBuilder() 
        .startObject() 
        .field("name", name) 
        .field("timestamp", new Timestamp(System.currentTimeMillis())) 
        .endObject()) 
      .upsert(indexRequest); 

    client.update(updateRequest).get(); 

    client.close(); 

すべてが動作します。唯一の問題はパフォーマンスです.SESへの保存には時間がかかります。これは、各RDDのESトランスポートクライアントを開いたり閉じたりするためです。 Spark documentationは、このアプローチがきわめて正しいことを示唆しています。私が理解するとすぐに、可能な最適化はrdd.foreachPartitionを使用していますが、パーティションが1つしかないので、これが有益であるとは確信していません。 パフォーマンスを向上させるための他のソリューションはありますか?

+0

- なぜあなたはelasticsearch-のHadoopを使用していませんか? –

答えて

0

RDDのレコードを処理するたびに新しいconnectを作成するためです。 したがって、foreachPartitionは、ES接続インスタンスを外部に持ってきて、ループ内で再利用するのに役立ちますので、1つのパーティションに関係なくパフォーマンスを向上させると思います。

関連する問題