私はSparkとScalaの初心者です。スパークストリーミングを使用してリアルタイムストリーミングデータ/ログを処理するにはどうすればよいですか?
私は、Kafka Publisherから1分ごとに[1GB程度のJSONログライン/分を取り込む]ネットワークログを読み込み、最後にElasticSearchに集計値を保存することができるREAL TIME Spark Consumerを実装したいと考えています。
集約はコンポジットキー[クライアントMAC、クライアントIP、サーバーMAC、サーバーIPなど]を使用していくつかの値[like bytes_in、bytes_outなど]に基づいています。私が書いた
スパーク消費者は次のとおりです。
object LogsAnalyzerScalaCS{
def main(args : Array[String]) {
val sparkConf = new SparkConf().setAppName("LOGS-AGGREGATION")
sparkConf.set("es.nodes", "my ip address")
sparkConf.set("es.port", "9200")
sparkConf.set("es.index.auto.create", "true")
sparkConf.set("es.nodes.discovery", "false")
val elasticResource = "conrec_1min/1minute"
val ssc = new StreamingContext(sparkConf, Seconds(30))
val zkQuorum = "my zk quorum IPs:2181"
val consumerGroupId = "LogsConsumer"
val topics = "Logs"
val topicMap = topics.split(",").map((_,3)).toMap
val json = KafkaUtils.createStream(ssc, zkQuorum, consumerGroupId, topicMap)
val logJSON = json.map(_._2)
try{
logJSON.foreachRDD(rdd =>{
if(!rdd.isEmpty()){
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
val df = sqlContext.read.json(rdd)
val groupedData =
((df.groupBy("id","start_time_formated","l2_c","l3_c",
"l4_c","l2_s","l3_s","l4_s")).agg(count("f_id") as "total_f", sum("p_out") as "total_p_out",sum("p_in") as "total_p_in",sum("b_out") as "total_b_out",sum("b_in") as "total_b_in", sum("duration") as "total_duration"))
val dataForES = groupedData.withColumnRenamed("start_time_formated", "start_time")
dataForES.saveToEs(elasticResource)
dataForES.show();
}
})
}
catch{
case e: Exception => print("Exception has occurred : "+e.getMessage)
}
ssc.start()
ssc.awaitTermination()
}
object SQLContextSingleton {
@transient private var instance: org.apache.spark.sql.SQLContext = _
def getInstance(sparkContext: SparkContext): org.apache.spark.sql.SQLContext = {
if (instance == null) {
instance = new org.apache.spark.sql.SQLContext(sparkContext)
}
instance
}
}
}
私はすべての私のアプローチでは正確であるかどうかを知りたいのですが、すべての最初に[私は1分ログの集約が必要考慮]?
このコードを使用して問題があるようです:
- この消費者は、カフカブローカーから30秒ごとに をデータを取得し、それゆえ、その30 秒のデータのためにElasticsearchする最終集計を保存します ユニークキー[少なくとも1分間に2つのエントリ]のElasticsearchの行数を増やします。 UIツール[木場屋]がさらに集計する必要があるとします。 のポーリング時間を30秒から60秒に増やすと、 には多くの時間がかかり、したがってリアルタイムでは残りません。
- 私は、ElasticSearchでキーごとに1行だけ 行を保存するように実装したいと思います。したがって、 カフカブローカーから[1分単位]を取得する私のデータセットに新しいキー値を取得していない時間まで、 という集計を実行したいとします。 私はこれを groupByKey()とupdateStateByKey()関数を使用して達成できることがわかりましたが、私はこれをどのように私のケースで使用することができるかを確認することができません [JSONを変換する必要があります フラットな値を持つログ行の文字列を使用して、 これらの関数]を使用しますか?これらの関数を使用する場合は、 を最終的な集計値をElasticSearchに保存する必要がありますか?
- これを達成する他の方法はありますか?
あなたの迅速なヘルプは高く評価されます。
よろしく、 Bhupesh
アプローチがよさそうです。スループットを向上させるために、エグゼキュータを追加して起動することはできますか? – maasg
応答ありがとうございます。私が記載したポイントで確認してください。 –