2017-09-12 1 views
1

エアフロースクリプトを使用してdataflow jarを実行しようとしています。私はDataFlowJavaOperatorを使用しています。 PARAMジャーでは、私はこのジョブを実行しようとすると、私はエアフロー例外:リターンコード1でDataFlowが失敗しました

{gcp_dataflow_hook.py:108} INFO - Start waiting for DataFlow process to complete. 
[2017-09-12 16:59:38,225] {models.py:1417} ERROR - DataFlow failed with return code 1 
Traceback (most recent call last): 
    File "/usr/lib/python2.7/site-packages/airflow/models.py", line 1374, in run 
    result = task_copy.execute(context=context) 
    File "/usr/lib/python2.7/site-packages/airflow/contrib/operators/dataflow_operator.py", line 116, in execute 
    hook.start_java_dataflow(self.task_id, dataflow_options, self.jar) 
    File "/usr/lib/python2.7/site-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 146, in start_java_dataflow 
    task_id, variables, dataflow, name, ["java", "-jar"]) 
    File "/usr/lib/python2.7/site-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 138, in _start_dataflow 
    _Dataflow(cmd).wait_for_done() 
    File "/usr/lib/python2.7/site-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 119, in wait_for_done 
    self._proc.returncode)) 
Exception: DataFlow failed with return code 1` 

としてエラーが発生しますローカルsystem.Butで実行可能なjarファイルの存在のパスを渡しています私の気流スクリプトは次のとおりです。

from airflow.contrib.operators.dataflow_operator import DataFlowJavaOperator 
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook 
from airflow.models import BaseOperator 
from airflow.utils.decorators import apply_defaults 
from datetime import datetime, timedelta 


default_args = { 
'owner': 'airflow', 
'start_date': datetime(2017, 03, 16), 
'email': [<EmailID>], 

'dataflow_default_options': { 
     'project': '<ProjectId>', 
     # 'zone': 'europe-west1-d', (i am not sure what should i pass here) 
     'stagingLocation': 'gs://spark_3/staging/' 
    } 
} 

dag = DAG('Dataflow',schedule_interval=timedelta(minutes=2), 
default_args=default_args) 

dataflow1 = DataFlowJavaOperator(
task_id='dataflow_example', 
jar ='/root/airflow_scripts/csvwriter.jar', 
gcp_conn_id = 'GCP_smoke', 
dag=dag) 

私は誰が私はこの

Note :I am creating this jar while selecting option as Runnable JAR file by packaging all the external dependencies.

答えて

1

から抜け出すために助けてください問題は、私が使っていた瓶とあった、私が作っていますどのような間違いを確認していません。 jarを使用する前にjarが期待どおりに実行されていることを確認してください。

例: あなたのjarファイルがdataflow_job1.jarだった場合、あなたのjarファイルが正常に実行されたら、エアフローDataflowJavaOperatorジャージャーを使用して進み

java -jar dataflow_job_1.jar --parameters_if_any 

を使用してjarファイルを実行します。

さらに コーダーに関連するエラーが発生した場合は、コードを実行するために独自のコーダーを作成する必要があります。それはデフォルトのコーダを持つdidnotよう は例えば、私はのTableRowクラスに問題があったので、私はこれを補うために持っていた:

TableRowCoder:

public class TableRowCoder extends Coder<TableRow> { 
private static final long serialVersionUID = 1L; 
private static final Coder<TableRow> tableRow = TableRowJsonCoder.of(); 
@Override 
public void encode(TableRow value, OutputStream outStream) throws CoderException, IOException { 
    tableRow.encode(value, outStream); 

} 
@Override 
public TableRow decode(InputStream inStream) throws CoderException, IOException { 
    return new TableRow().set("F1", tableRow.decode(inStream)); 
} 
@Override 
public List<? extends Coder<?>> getCoderArguments() { 
    // TODO Auto-generated method stub 
    return null; 
} 
@Override 
public void verifyDeterministic() throws org.apache.beam.sdk.coders.Coder.NonDeterministicException { 


} 
} 

その後

を使用して、あなたのコード内でこのコーダを登録します
pipeline.getCoderRegistry().registerCoderForClass(TableRow.class, new TableRowCoder()) 

まだ(コーダーとは関係ありません)エラーがある場合は移動するには:

*.jar\META-INF\services\FileSystemRegistrar 

を実行し、発生する可能性のある依存関係を追加します。私はそれを動作させるために以下の行を追加する必要がありました

Unable to find registrar for gs 

:として

は、例えば、ステージングエラーがある可能性があります。

org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystemRegistrar 
関連する問題