Spark StreamingからレコードをKafkaにプッシュするサンプルコードを提供できますか?Spark StreamingからKafkaにデータをプッシュ
-1
A
答えて
0
スパークストリーミングを使用すると、カフカのトピックからデータを消費することができます。
あなたはカフカのトピックにレコードを公開したい場合は、カフカProducerを使用することができます[https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example]
それとも、複数のソースのコネクタを使用してカフカトピックにデータを公開するカフカConnectを使用することができます。[http://www.confluent.io/product/connectors/]
ご覧ください。 SparkストリーミングとKafkaの統合の詳細については、以下のリンクを参照してください。
http://spark.apache.org/docs/latest/streaming-kafka-integration.html
0
私はJavaを使用してそれを行っています。この関数をJavaDStream<String>
に渡して、.foreachRDD()
の引数として使用できます。各RDDに対してKafkaProducer
を作成するための最良の方法ではありません。socket example in Spark documentationのようなKafkaProducers
の「プール」を使用してこれを行うことができます。ここで
は私のコードです:
public static class KafkaPublisher implements VoidFunction<JavaRDD<String>> {
private static final long serialVersionUID = 1L;
public void call(JavaRDD<String> rdd) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "loca192.168.0.155lhost:9092");
props.put("acks", "1");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1000);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
private static final long serialVersionUID = 1L;
public void call(Iterator<String> partitionOfRecords) throws Exception {
Producer<String, String> producer = new KafkaProducer<>(props);
while(partitionOfRecords.hasNext()) {
producer.send(new ProducerRecord<String, String>("topic", partitionOfRecords.next()));
}
producer.close();
}
});
}
}
関連する問題
- 1. Spark Streaming Kafka backpressure
- 2. Spark Streaming Kafka Consumer
- 3. Kafka Streaming with apache spark
- 4. Kafka - Spark Streaming - 1つのパーティションからのデータの読み出し
- 5. Flafka(Http - > Flume - > Kafka - > Spark Streaming)
- 6. Spark Streaming with Kafka:空のコレクション例外
- 7. kafkaで他のVMからのSpark Streamingの使用方法
- 8. Spark Streamingの最初からKafkaトピックからレコードを読み取る方法は?
- 9. Spark Streaming Kafka Consumerの "java.io.NotSerializableException:org.apache.kafka.clients.consumer.ConsumerRecord"を修正するには?
- 10. kafka connect 0.10とSpark Structured Streamingでfrom_jsonを使用するには?
- 11. Spark Streamingを使用してKafkaからbinlogデータを読み取るときに "ClassNotFoundException"が発生しました
- 12. kafka-Spark-Streamingからデータを読み取っているときにEmptyが取得される
- 13. Spark StreamingでHbaseデータを読み取る
- 14. spark streaming fileStream
- 15. KafkaUtils java.lang.NoClassDefFoundError Spark Streaming
- 16. spark-streaming-kafka-0-10 auto.offset.resetは常にnoneに設定されています
- 17. dockerを使ってMacでkafkaとspark-streamingをセットアップする方法は?
- 18. FiloDB + Spark Streaming Data Loss
- 19. Spark SQL over Streaming - ArrayIndexOutOfBoundsException
- 20. Java Spark Streaming with Cassandra
- 21. Spark Streaming - updateStateByKeyとキャッシュデータ
- 22. YARNでKafkaから消費するとSpark Streamingアプリケーションが停止するのはなぜですか?
- 23. JSONのUpdateStateByKey SparkのKafkaのデータ
- 24. Kafka + Sparkストリーミング:ClosedChannelException
- 25. spark kafka security kerberos
- 26. Spark Structured Streamingで新しいデータがS3から取得されない
- 27. Spark Streamingの各カフカパーティションから同時に読み取る方法
- 28. KafkaトピックパーティションとSparkエグゼキュータマッピング
- 29. Spark-StreamingからElastic Searchへの出力のフィールド名のマッピング
- 30. Apache Sparkからのプッシュ通知