2017-09-08 3 views
1

私はデータの前処理ルーチンに特化したクラスと異なるメソッドを実行し、次のDAGを、持っている:XComを使用してクラス間でデータを交換しますか?

from datetime import datetime 
import os 
import sys 

from airflow.models import DAG 
from airflow.operators.python_operator import PythonOperator 

import ds_dependencies 

SCRIPT_PATH = os.getenv('MARKETING_PREPROC_PATH') 

if SCRIPT_PATH: 
    sys.path.insert(0, SCRIPT_PATH) 
    from table_builder import OnlineOfflinePreprocess 
else: 
    print('Define MARKETING_PREPROC_PATH value in environmental variables') 
    sys.exit(1) 

default_args = { 
    'start_date': datetime.now(), 
    'max_active_runs': 1, 
    'concurrency': 4 
} 

worker = OnlineOfflinePreprocess() 

DAG = DAG(
    dag_id='marketing_data_preproc', 
    default_args=default_args, 
    start_date=datetime.today() 
) 

import_online_data = PythonOperator(
    task_id='import_online_data', 
    python_callable=worker.import_online_data, 
    dag=DAG) 

import_offline_data = PythonOperator(
    task_id='import_offline_data', 
    python_callable=worker.import_offline_data, 
    dag=DAG) 

merge_aurum_to_sherlock = PythonOperator(
    task_id='merge_aurum_to_sherlock', 
    python_callable=worker.merge_aurum_to_sherlock, 
    dag=DAG) 

merge_sherlock_to_aurum = PythonOperator(
    task_id='merge_sherlock_to_aurum', 
    python_callable=worker.merge_sherlock_to_aurum, 
    dag=DAG) 

upload_au_to_sh = PythonOperator(
    task_id='upload_au_to_sh', 
    python_callable=worker.upload_table, 
    op_args='aurum_to_sherlock', 
    dag=DAG) 

upload_sh_to_au = PythonOperator(
    task_id='upload_sh_to_au', 
    python_callable=worker.upload_table, 
    op_args='sherlock_to_aurum', 
    dag=DAG) 

import_online_data >> merge_aurum_to_sherlock 
import_offline_data >> merge_aurum_to_sherlock 

merge_aurum_to_sherlock >> merge_sherlock_to_aurum 
merge_aurum_to_sherlock >> upload_au_to_sh 
merge_sherlock_to_aurum >> upload_sh_to_au 

これは、次のエラーを生成します。

どのように与えられ、実際にはかなり明白である
[2017-09-07 19:32:09,587] {base_task_runner.py:97} INFO - Subtask: AttributeError: 'OnlineOfflinePreprocess' object has no attribute 'online_info' 

気流が働きます:呼び出された異なるクラスメソッドからの出力は、グラフの上部で初期化されたグローバルクラスオブジェクトに格納されません。

XComでこれを解決できますか?全体として、OOPの一貫性を気流とどのように融合させるかについての考え方は何ですか?

答えて

3

気流のあるOOPと気流のある状態に関する問題は少なくなります。

タスク間で受け渡される必要のある状態は、永続的に格納する必要があります。これは、各気流タスクが独立したプロセス(別のマシンで実行されている可能性もあります)であり、メモリ内通信が不可能であるためです。

XCOMを使用してこの状態を渡すことができます(空気流データベースに格納されるため、小さい場合)。サイズが大きい場合は、おそらくファイルシステム、S3またはHDFS、または特殊データベースなどの別の場所に格納したいと思うかもしれません。

+0

大変です。私はかなり大きなテーブルを取り扱っているので、ここでは意味をなされることは、あなたが言及した非ローカルストレージオプションの1つです。 XComは、GBを渡すために作られたようではありません。ただ、私が何かを明らかにしていないことを確認したかっただけです。乾杯! – Aaron

関連する問題