2016-05-16 15 views
0

BigQueryから特定のユーザーイベントを取得し、ユーザーごとのイベントカウントを生成する単純なDataflow Pythonパイプラインを実行しようとしています。 CombinePerKeyは、変換前のGoogle Cloud Dataflow PythonでCombinePerKeyを使用

TypeError: Expected tuple, got int [while running 'Map(<lambda at user_stats.py:...>)'] 

データこの形式である:

(u'55107178236374', 1) 
(u'55107178236374', 1) 
(u'55107178236374', 1) 
(u'2296845644499670', 1) 
(u'2296845644499670', 1) 
(u'1489727796186326', 1) 
(u'1489727796186326', 1) 
(u'1489727796186326', 1) 
(u'1489727796186326', 1) 

代わりにこれでuser_event_countsを計算する場合:

user_event_counts = (user_events|df.GroupByKey()| 
    df.Map('count', lambda (user, ones): (user, sum(ones)))) 
これは私にエラーを与えて実行すると、

p = df.Pipeline(argv=pipeline_args) 
result_query = "..." 
data = p | df.io.Read(df.io.BigQuerySource(query=result_query)) 
user_events = data|df.Map(lambda x: (x['users_user_id'], 1)) 
user_event_counts = user_events|df.CombinePerKey(sum) 

エラーがなく、私が期待した結果が得られます。

docsに基づいて、両方のアプローチから同様の動作が期待されます。私は明らかにCombinePerKeyに関して何かを見逃していましたが、私はそれが何かを見ることができません。任意のヒントをいただきました!

答えて

1

SDKのバージョンが0.2.4より低いと思われます。 これは、いくつかのシナリオで組み合わせ操作を処理する方法のバグです。この問題はSDK(v0.2.4)の最新リリースで修正されています。https://github.com/GoogleCloudPlatform/DataflowPythonSDK/releases/tag/v0.2.4 ごめんなさい。最新のリリースで引き続き問題が発生した場合は、お知らせください。

関連する問題