2016-12-01 7 views
3

データフローパイプラインのサイド入力としてHashMapを渡そうとしています。私はString、Int、Longのいずれかを渡す少数を除いて例を見つけることができません。私のコード:データフローのサイド入力としてHashMapを渡す方法

tagList = pipeline.apply(TextIO.Read.named("tagListTextRead").from("gs://mybucket/tag-list.json")); 

PCollection<Map<String,TagObject>> tagMap = tagList 
      .apply(ParDo.named("allTagsToTagMap").of(new Tags.BuildTagListMapFn())); 


PCollectionView<Map<String, TagObject>> tagMapView = 
      allTags.apply(View.<String, TagObject>asMap()); 

3番目のステートメントは構文エラーです。

The method apply(PTransform<? super PCollection<Map<String,TagObject>>,OutputT>) in the type 
    PCollection<Map<String,TagObject>> is not applicable for the arguments 
    (View.AsMap<String,TagObject>) 

データフローパイプラインのサイド入力としてHashMapを渡す方法を教えてもらえますか。

答えて

2

ここでは、パイプラインの詳細によって2つの異なる回答があります。あなたはPCollection<KV<K, V>>を持っている場合は

  1. あなたはPCollectionView<Map<K, V>>を生成するためにView.asMap()を使用することができます。自分でMapを作成する必要はありません。

  2. 単一の要素を持つPCollection<Map<K, V>>がある場合は、View.asSingleton()をサイド入力に使用できます。

最初は、おそらく最も自然で、あなたのコードは、中間値の種類を表示するには、この拡大

PCollectionView<Map<String, TagObject>> = pipeline 
    .apply("tagListTextRead", TextIO.Read.from("gs://mybucket/tag-list.json")) 
    .apply("tagsToKv", new Tags.TagToKvFunction()) 
    .apply("viewTags", View.<String, TagObject>asMap()) 

のように見えることになります。非常に多くの

PCollection<String> rawTags = 
    pipeline.apply("tagListTextRead", TextIO.Read.from("gs://mybucket/tag-list.json")) 

PCollection<KV<String, TagObject>> kvs = 
    rawTags.apply("tagsToKv", new Tags.TagToKvFunction()) 

PCollectionView<Map<String, TagObject>> = 
    kvs.apply("viewTags", View.<String, TagObject>asMap()) 
+0

感謝。これは私のために働いた。私のケースは#2だったので、asSingletonを使うことは私が必要としていたものでした。 –

関連する問題