次のようにApache Flinkを使用したいと思います。私は別の流れのデータによって豊かにされなければならない1つのメインストリームを持っています。このメインストリームには、属性「site」と「timestamp」を持つ要素があります。他のストリーム(それをcountrystreamと呼ぶ)には、属性「サイト」と「国」があります。 countrystreamは、サイトに使用されている最新の国を追跡する必要があります。たとえば、("klm.com", "netherlands")
が最初に到着し、しばらくしてタプル("klm.com", "france")
が到着した場合、「klm.com」は「france」(これは後者であるため)を指す必要があります。したがって、状態を維持する必要があります。タプル( "klm.com"、100)がメインストリームに到着したとします。これで、("klm.com", 100, "france")
に充実しているはずです。いくつかのサイトがcountrystreamに見つからない場合は、 "?"で強調してください。たとえば、("stackoverflow.com", 150, "?")
です。どのように私はこれを達成することができますか?ストリームを別のストリームで強化する
0
A
答えて
0
解決策が見つかりました(しばらく時間がかかりました)。これは効率的ですか?それは改善することができますか?私の反復ストリームにチェックポイントを置くことができないということですか?
val env = StreamExecutionEnvironment.getExecutionEnvironment
val mainStream = env.fromElements("a", "a", "b", "a", "a", "b", "b", "a", "c", "b", "a", "c")
val infoStream = env.fromElements((1, "a", "It is F"), (2, "b", "It is B"), (3, "c", "It is C"), (4, "a", "Whoops, it is A"))
.iterate(
iteration => {
(iteration, iteration)
}
)
mainStream
.coGroup(infoStream)
.where[String]((x: String) => x)
.equalTo(_._2)
.window(TumblingProcessingTimeWindows.of(Time.seconds(1))) {
(first: Iterator[String], second: Iterator[(Int, String, String)], out: Collector[(String, String)]) => {
first.foreach((key: String) => {
val matchingRecords = second
.filter(_._2 == key)
if (matchingRecords.nonEmpty) {
val matchingRecord = matchingRecords.maxBy(_._1)
out.collect((matchingRecord._2, matchingRecord._3))
}
}
)
}
}
.print()
env.execute("proof_of_concept")
関連する問題
- 1. spark streaming - あるストリームでtmpビューを作成し、別のストリームで使用する
- 2. 別のストリームを呼び出す前にストリームを待つ
- 3. AESで "ストリーム"を暗号化する
- 4. Javaストリーム> "orElseGet"を親ストリームにインライン化することは可能ですか?
- 5. クラスのストリームの初期化
- 6. 方法1は、別のストリーム
- 7. WinRTストリームの暗号化
- 8. 1つのストリームから別のストリームへパイプデータ
- 9. ストリームへのシリアル化とストリームからの読み取り
- 10. Java8ストリームは - 個別ストリームに重複を削除
- 11. C++はあるストリームから別のストリームにNバイトをコピーします
- 12. ストリームをグループ化すると、二回
- 13. 出力ストリームを「パラメータ化する」方法
- 14. RTMPストリームをMMSストリームに変換する
- 15. Jdomのストリームへのストリーム
- 16. エリクシルのCSVストリームへのストリーム
- 17. ストリームのストリーム:ストリームから変換できません<Object>
- 18. clearcaseでビューを強制的に親ストリームにする
- 19. Objective-CでのHTTPメッセージのトークン化ストリーム
- 20. C++での暗号化ストリームの使用
- 21. ストリーム1つのイーサネットケーブルでマルチスレッド化
- 22. TCPストリームを消費して別のシンクにリダイレクトします(Akkaストリームの場合)
- 23. TCPストリームに強制的にバッファの内容を送信する
- 24. 別のループにループを含むストリーム
- 25. RxJSでストリームを修正して初期ストリームを結合する
- 26. ストリーム
- 27. ストリーム
- 28. ストリーム
- 29. ストリーム
- 30. RxJSストリームを最新の別のストリームにマップするにはどうすればよいですか?