KafkaとSparkを介してAvroメッセージのストリームを処理している間に、処理済みのデータをElasticSearchインデックスに文書として保存しています。 は、ここでは、コード(簡体字)です:予想通りSparkタスクでElasticSearchにデータを保存する
directKafkaStream.foreachRDD(rdd ->{
rdd.foreach(avroRecord -> {
byte[] encodedAvroData = avroRecord._2;
MyType t = deserialize(encodedAvroData);
// Creating the ElasticSearch Transport client
Settings settings = Settings.builder()
.put("client.transport.ping_timeout", 5, TimeUnit.SECONDS).build();
TransportClient client = new PreBuiltTransportClient(settings)
.addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300));
IndexRequest indexRequest = new IndexRequest("index", "item", id)
.source(jsonBuilder()
.startObject()
.field("name", name)
.field("timestamp", new Timestamp(System.currentTimeMillis()))
.endObject());
UpdateRequest updateRequest = new UpdateRequest("index", "item", id)
.doc(jsonBuilder()
.startObject()
.field("name", name)
.field("timestamp", new Timestamp(System.currentTimeMillis()))
.endObject())
.upsert(indexRequest);
client.update(updateRequest).get();
client.close();
すべてが動作します。唯一の問題はパフォーマンスです.SESへの保存には時間がかかります。これは、各RDDのESトランスポートクライアントを開いたり閉じたりするためです。 Spark documentationは、このアプローチがきわめて正しいことを示唆しています。私が理解するとすぐに、可能な最適化はrdd.foreachPartitionを使用していますが、パーティションが1つしかないので、これが有益であるとは確信していません。 パフォーマンスを向上させるための他のソリューションはありますか?
- なぜあなたはelasticsearch-のHadoopを使用していませんか? –