2016-03-23 10 views
3

私は現在、イベントを受信して​​Kafkaクラスタに公開する高速データ集約モジュールを設計中です。その後、KafkaとSpark Streamingの統合が行われました。 Spark StreamingはKafkaからストリームを読み込み、計算を実行します。計算が完了したら、結果を別のアプリケーションに送る必要があります。このアプリケーションは、WebサービスまたはKafkaクラスターである可能性があります。他のアプリケーション/ KafkaにApache Sparkの結果を公開

どうすればいいですか?私が読んだことから、Spark Streamはデータベースやファイルシステムのように下流にデータをプッシュします。

このようなアプリケーションをデザインするにはどうしたらいいですか?結果を別のアプリケーションに公開するには、Spark StreamをStormに置き換える必要がありますか?

答えて

1

データを外部システムに送信できる強力なプリミティブであるdstream.foreachRDDを参照してください。以下は
Design Patterns for using foreachRDD

が最適化されていない、ただPOCのために、KafkaProducerオブジェクトはforeachRDDで再利用することができる)あなたの参照のための私のカフカ統合コードです:私はどのように疑問に思って

DStream.foreachRDD(rdd => { 
     rdd.foreachPartition { partitionOfRecords => 
     val kafkaProps = new Properties() 
     kafkaProps.put("bootstrap.servers", props("bootstrap.servers")) 
     kafkaProps.put("client.id", "KafkaIntegration Producer"); 
     kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); 
     kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); 
     val producer = new KafkaProducer[String, String](kafkaProps); 

     partitionOfRecords.foreach(record => { 
     val message = new ProducerRecord[String, String]("hdfs_log_test", record.asInstanceOf[String]) 
      producer.send(message) 
     }) 
     producer.close() 
     } 
    }) 
1

できるよ?私が読んだことから、Spark Streamはデータベースやファイルシステムのように下流にデータをプッシュします。

SparkはHDFSまたはデータベースに限定されず、利用可能な外部リソースへの接続を自由に初期化することができます。 Kafka、RabbitMQ、またはWebServiceに戻ることができます。

mapfilterreduceByKeyなどのような単純な変換を行っている場合は、DStream.foreachRDDを使用すると問題ありません。 DStream.mapWithStateのようなステートフルな計算を実行する場合は、状態の処理が完了したらすぐに外部のサービスにデータを送信できます。

たとえば、データの入力ストリームとしてKafka、ステートフルな計算を行った後のRabbitMQおよび出力を使用しています。

関連する問題