1
私はデータの前処理ルーチンに特化したクラスと異なるメソッドを実行し、次のDAGを、持っている:XComを使用してクラス間でデータを交換しますか?
from datetime import datetime
import os
import sys
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
import ds_dependencies
SCRIPT_PATH = os.getenv('MARKETING_PREPROC_PATH')
if SCRIPT_PATH:
sys.path.insert(0, SCRIPT_PATH)
from table_builder import OnlineOfflinePreprocess
else:
print('Define MARKETING_PREPROC_PATH value in environmental variables')
sys.exit(1)
default_args = {
'start_date': datetime.now(),
'max_active_runs': 1,
'concurrency': 4
}
worker = OnlineOfflinePreprocess()
DAG = DAG(
dag_id='marketing_data_preproc',
default_args=default_args,
start_date=datetime.today()
)
import_online_data = PythonOperator(
task_id='import_online_data',
python_callable=worker.import_online_data,
dag=DAG)
import_offline_data = PythonOperator(
task_id='import_offline_data',
python_callable=worker.import_offline_data,
dag=DAG)
merge_aurum_to_sherlock = PythonOperator(
task_id='merge_aurum_to_sherlock',
python_callable=worker.merge_aurum_to_sherlock,
dag=DAG)
merge_sherlock_to_aurum = PythonOperator(
task_id='merge_sherlock_to_aurum',
python_callable=worker.merge_sherlock_to_aurum,
dag=DAG)
upload_au_to_sh = PythonOperator(
task_id='upload_au_to_sh',
python_callable=worker.upload_table,
op_args='aurum_to_sherlock',
dag=DAG)
upload_sh_to_au = PythonOperator(
task_id='upload_sh_to_au',
python_callable=worker.upload_table,
op_args='sherlock_to_aurum',
dag=DAG)
import_online_data >> merge_aurum_to_sherlock
import_offline_data >> merge_aurum_to_sherlock
merge_aurum_to_sherlock >> merge_sherlock_to_aurum
merge_aurum_to_sherlock >> upload_au_to_sh
merge_sherlock_to_aurum >> upload_sh_to_au
これは、次のエラーを生成します。
どのように与えられ、実際にはかなり明白である[2017-09-07 19:32:09,587] {base_task_runner.py:97} INFO - Subtask: AttributeError: 'OnlineOfflinePreprocess' object has no attribute 'online_info'
気流が働きます:呼び出された異なるクラスメソッドからの出力は、グラフの上部で初期化されたグローバルクラスオブジェクトに格納されません。
XCom
でこれを解決できますか?全体として、OOPの一貫性を気流とどのように融合させるかについての考え方は何ですか?
大変です。私はかなり大きなテーブルを取り扱っているので、ここでは意味をなされることは、あなたが言及した非ローカルストレージオプションの1つです。 XComは、GBを渡すために作られたようではありません。ただ、私が何かを明らかにしていないことを確認したかっただけです。乾杯! – Aaron