BigQueryから特定のユーザーイベントを取得し、ユーザーごとのイベントカウントを生成する単純なDataflow Pythonパイプラインを実行しようとしています。 CombinePerKey
は、変換前のGoogle Cloud Dataflow PythonでCombinePerKeyを使用
TypeError: Expected tuple, got int [while running 'Map(<lambda at user_stats.py:...>)']
データこの形式である:
(u'55107178236374', 1)
(u'55107178236374', 1)
(u'55107178236374', 1)
(u'2296845644499670', 1)
(u'2296845644499670', 1)
(u'1489727796186326', 1)
(u'1489727796186326', 1)
(u'1489727796186326', 1)
(u'1489727796186326', 1)
代わりにこれでuser_event_counts
を計算する場合:
user_event_counts = (user_events|df.GroupByKey()|
df.Map('count', lambda (user, ones): (user, sum(ones))))
これは私にエラーを与えて実行すると、
p = df.Pipeline(argv=pipeline_args)
result_query = "..."
data = p | df.io.Read(df.io.BigQuerySource(query=result_query))
user_events = data|df.Map(lambda x: (x['users_user_id'], 1))
user_event_counts = user_events|df.CombinePerKey(sum)
エラーがなく、私が期待した結果が得られます。
docsに基づいて、両方のアプローチから同様の動作が期待されます。私は明らかにCombinePerKey
に関して何かを見逃していましたが、私はそれが何かを見ることができません。任意のヒントをいただきました!