2017-08-11 5 views
1

私はAirflow 1.8.1を使用しており、PostgreOperatorからのSQLリクエストの結果をプッシュしたいと思います。ここで気流:PostgreOperatorからxcom値をプッシュするには?

は私の仕事です:

check_task = PostgresOperator(
    task_id='check_task', 
    postgres_conn_id='conx', 
    sql="check_task.sql", 
    xcom_push=True, 
    dag=dag) 

def py_is_first_execution(**kwargs): 
    value = kwargs['ti'].xcom_pull(task_ids='check_task') 
    print 'count ----> ', value 
    if value == 0: 
     return 'next_task' 
    else: 
     return 'end-flow' 

check_branch = BranchPythonOperator(
    task_id='is-first-execution', 
    python_callable=py_is_first_execution, 
    provide_context=True, 
    dag=dag) 

、ここでは私のsqlスクリプトである:私はcheck_taskからXCOM値をチェックするとき

select count(1) from table 

それはnone値を取得します。

答えて

0

最後に、$AIRFLOW_HOME/pluginsの下にプラグインマネージャーの新しいSensor ExecuteSqlOperatorを作成しました。

例としてCheckOperatorを使用し、戻り値を変更しました。この演算子の基本的な実行は、私が必要としていたものとまったく逆です。ここ CheckOperator

と私のSqlSensorにカスタマイズされています:

はここでデフォルトExecuteSqlOperatorのだReverseSqlSensor

class SqlExecuteOperator(BaseOperator): 
""" 
Performs checks against a db. The ``CheckOperator`` expects 
a sql query that will return a single row. 

Note that this is an abstract class and get_db_hook 
needs to be defined. Whereas a get_db_hook is hook that gets a 
single record from an external source. 
:param sql: the sql to be executed 
:type sql: string 
""" 

template_fields = ('sql',) 
template_ext = ('.hql', '.sql',) 
ui_color = '#fff7e6' 

@apply_defaults 
def __init__(
     self, sql, 
     conn_id=None, 
     *args, **kwargs): 
    super(SqlExecuteOperator, self).__init__(*args, **kwargs) 
    self.conn_id = conn_id 
    self.sql = sql 

def execute(self, context=None): 
    logging.info('Executing SQL statement: ' + self.sql) 
    records = self.get_db_hook().get_first(self.sql) 
    logging.info("Record: " + str(records)) 
    records_int = int(records[0]) 
    print (records_int) 
    return records_int 

def get_db_hook(self): 
    return BaseHook.get_hook(conn_id=self.conn_id) 
1

私が正しい場合、クエリが値を返すときに気流が自動的にxcomにプッシュされます。しかし、postgresoperatorのコードを見ると、PostgresHook(dbapi_hookの拡張)のrunメソッドを呼び出すexecuteメソッドがあることがわかります。どちらのメソッドも何も返さないため、xcomに何もプッシュしません。 これを修正するために、PostgresOperatorのコピーである 'Custom.postgresSelectOperator'を作成しますが、 'hook.run(..)'ではなく 'return hook.get_records(..)'を作成します。

希望はあなたを助けます。

関連する問題