2017-02-15 6 views
1

イベントハブからメッセージを引き出し、スパーク/スパークストリーミングで処理するコードを正常に統合しました。私は今メッセージが通過するときに管理状態に移っています。これは、ほとんどの部分はhttps://docs.cloud.databricks.com/docs/spark/1.6/examples/Streaming%20mapWithState.htmlスパークストリーミングとAzureイベントハブmapWithState

基本的に、これは、ダミーのソースで動作する、それが単一のパーティション上の単一のストリームで動作しますが、それはイオン化していないために動作しないの適応がある私が使用しているコードであり、私は各パーティションのストリーム1の複数のインスタンスを作成することができましたが、ちょっとユニオンとウィンドウのポイントを打ち負かしました。誰もが壮大な任意のアイデアを持っている場合、私はちょっと私は、私の問題を解決することで、私は直接ストリームを使用して終了し、すべての私の問題..今どこへ行くにインスピレーションを得るため

val sparkSession = SparkSession.builder().master("local[2]").config(sparkConfiguration).getOrCreate() 

val streamingContext = new StreamingContext(sparkSession.sparkContext, Seconds(10)) 
streamingContext.checkpoint(inputOptions.checkpointDir) 

//derive the stream and window 
val eventHubsStream = EventHubsUtils.createUnionStream(streamingContext, eventHubsParameters) 
val eventHubsWindowedStream = eventHubsStream.window(Seconds(10)) 

val initialRDD = sparkSession.sparkContext.parallelize(List(("dummy", 100L), ("source", 32L))) 
val stateSpec = StateSpec.function(trackStateFunc _) 
    .initialState(initialRDD) 
    .numPartitions(2) 
    .timeout(Seconds(60)) 

val eventStream = eventHubsWindowedStream 
    .map(messageStr => { 
    //parse teh event 
    var event = gson.fromJson(new String(messageStr), classOf[Event]) 

    //return a tuble of key/value pair 
    (event.product_id.toString, 1) 
    }) 

val eventStateStream = eventStream.mapWithState(stateSpec) 

val stateSnapshotStream = eventStateStream.stateSnapshots() 
stateSnapshotStream.print() 

stateSnapshotStream.foreachRDD { rdd => 
    import sparkSession.implicits._ 
    rdd.toDF("word", "count").registerTempTable("batch_word_count") 
} 

streamingContext.remember(Minutes(1)) 

streamingContext 
+0

*統合されていないウィンドウストリームでは機能しません。 –

+0

お詫び申し上げますが、本質的に状態関数は決して呼び出されません。私はその点までデバッグできません。私は、サンプルコードを使用して、その罰金..と私は単一のストリームを使用して、その罰金..しかし、私はユニオン化されたストリームやウィンドウを使用するとき。 –

+0

あなたのIDEでローカルにデバッグを試しましたか? –

答えて

0

をこだわって去った。進行ディレクトリはHDFSまたはADLしかサポートしていないので、私はローカルでテストすることができません。ちょうど今

それでも、組合ストリームは動作しません..私は -

EventHubsUtils.createDirectStreams(> GetEventHubParams(inputOptions))streamingContext、inputOptions.namespace、 inputOptions.hdfs、地図(inputOptions.eventhub) HDFSの進捗ディレクトリを削除する方法を理解する必要があります!!

関連する問題