2017-01-10 6 views
2

私がやろうとしていることを説明するために少し工夫した例を使用しましょう。私は株価、株式数、および価格:{ symbol = "GOOG", count = 30, price = 200 }で、取引のストリームが入っていると想像してください。私は株の名前、この場合"Google"でこれらの出来事を豊かにしたいと思います。私は、例えば得データフロー内、PCollection<KV<String, String>>によって更新されるシンボル - >名前マッピングの「表」を維持し、このテーブルでの取引の私のストリームに参加したい。この目的のため データフローの「テーブル」にストリームに参加する

a PCollection<KV<Trade, String>>

これは、ストリーム処理アプリケーションの完全な基本的な使用例のようですが、データフローでこれを達成する方法を理解するのは苦労しています。私はそれがカフカストリームで可能であることを知っています。

私はではありません。は、検索のために外部データベースを使用したいと思います。この問題をDataflow内で解決するか、Kafka Streamsに切り替える必要があります。

+0

詳細データのソースについての詳細は教えてください。特に - それは静的ですか? Per-key?世界的に?それは複数のキーと結合されていますか?合計キーはいくつですか? –

+0

@SamMcVeety濃縮データは動的であり、常に更新されます(キーごとに1時間あたり約1つの新しい要素)。少なくとも何百万という鍵がありますが、おそらく何億というものもあります。 –

+0

ジョインがキーを意識していれば、両方のコレクションの同じキーを持つ要素が同じノードで処理され、適切なスケーリングが可能になることを確認することができます。 –

答えて

3

2つのオプションについて説明します。 1つは、現在のバージョンのDataflow(1.X)で動作するサイド入力を使用し、もう1つは、今後のDataflow(2.X)の一部であるDoFn内の状態を使用します。データフロー1.X用

ソリューション、ここでの一般的な考え方は、すべての労働者へのシンボル - >名前マッピングを利用できるようにマップ値side-inputを使用することです側入力に

を使用。

このテーブルはグローバルウィンドウ内にある必要があります(何も年齢が変わることはありません)。すべての要素がトリガされる必要があります(または新しい更新が生成されるように頻繁に実行する必要があります)。また、各シンボルの最新の名前を取るためにはいくつかのロジックが必要です。

このソリューションの欠点は、新しいエントリが入るたびにルックアップテーブル全体が再生成され、すぐにすべてのワーカーにプッシュされないことです。むしろ、それぞれは将来的に新しいマッピングを「ある時点で」得るでしょう。ハイレベルで

、このパイプラインは(私はこのコードをテストしていないので、いくつかの種類があるかもしれません)のようなものになります。私たちはここにviewAsMultiMapを使用していた

PCollection<KV<Symbol, Name>> symbolToNameInput = ...; 
final PCollectionView<Map<Symbol, Iterable<Name>>> symbolToNames = symbolToNameInput 
    .apply(Window.into(GlobalWindows.of()) 
     .triggering(Repeatedly.forever(AfterProcessingTime 
      .pastFirstElementInPane() 
      .plusDelayOf(Duration.standardMinutes(5))) 
     .accumulatingFiredPanes()) 
    .apply(View.asMultiMap()) 

注意を。つまり、すべてのシンボルの名前はとなります。すべてとなります。私たちは物事を見るときに、iterableで最新の名前を取る必要があります。ステートAPI

にこのソリューションを使用してデータフロー2.Xについて

PCollection<Detail> symbolDetails = ...; 
symbolDetails 
    .apply(ParDo.withSideInputs(symbolToNames).of(new DoFn<Detail, AugmentedDetails>() { 
    @Override 
    public void processElement(ProcessContext c) { 
     Iterable<Name> names = c.sideInput(symbolToNames).get(c.element().symbol()); 
     Name name = chooseName(names); 
     c.output(augmentDetails(c.element(), name)); 
    } 
    })); 

ソリューションは、今後のデータフロー2.0リリースの一部となる新機能を使用しています。プレビューリリース(現在はDataflow 2.0-beta1)の一部ではありませんが、利用可能な場合はrelease notesをご覧ください。

一般的な考え方では、キー付き状態では、特定のキーに関連付けられた値を保存できます。このケースでは、私たちが見た最新の「名前」の価値を覚えています。

ステートフルDoFnを実行する前に、各要素を共通の要素型(NameOrDetails)オブジェクトにラップします。これは次のようになります。

// Convert SymbolToName entries to KV<Symbol, NameOrDetails> 
PCollection<KV<Symbol, NameOrDetails>> left = symbolToName 
    .apply(ParDo.of(new DoFn<SymbolToName, KV<Symbol, NameOrDetails>>() { 
    @ProcessElement 
    public void processElement(ProcessContext c) { 
     SymbolToName e = c.element(); 
     c.output(KV.of(e.getSymbol(), NameOrDetails.name(e.getName()))); 
    } 
    }); 

// Convert detailed entries to KV<Symbol, NameOrDetails> 
PCollection<KV<Symbol, NameOrDetails>> right = details 
    .apply(ParDo.of(new DoFn<Details, KV<Symbol, NameOrDetails>>() { 
    @ProcessElement 
    public void processElement(ProcessContext c) { 
     Details e = c.element(); 
     c.output(KV.of(e.getSymobl(), NameOrDetails.details(e))); 
    } 
}); 

// Flatten the two streams together 
PCollectionList.of(left).and(right) 
    .apply(Flatten.create()) 
    .apply(ParDo.of(new DoFn<KV<Symbol, NameOrDetails>, AugmentedDetails>() { 
    @StateId("name") 
    private final StateSpec<ValueState<String>> nameSpec = 
     StateSpecs.value(StringUtf8Coder.of()); 

    @ProcessElement 
    public void processElement(ProcessContext c 
     @StateId("name") ValueState<String> nameState) { 
     NameOrValue e = c.element().getValue(); 
     if (e.isName()) { 
     nameState.write(e.getName()); 
     } else { 
     String name = nameState.read(); 
     if (name == null) { 
     // Use symbol if we haven't received a mapping yet. 
     name = c.element().getKey(); 
     } 
    c.output(e.getDetails().withName(name)); 
    } 
    }); 

+0

ありがとうございました。これは...ほとんどのストリーミングアプリケーションに必要と思われるものを達成するために書くための狂った量のコードです。これは実際に実際に行われていますか?ロードマップには、より高いレベルの構成要素はありませんか? –

関連する問題