2016-08-30 10 views
0

私はデータフローを使用して、BigQueryIO.Write.to()を使用してBigQueryにデータを書き込みます。Dataflow:BigQueryIOでの書き込み時のSocketTimeoutException

は時々、私はデータフローからこの警告が出ます:

{ 
metadata: { 
    severity: "WARNING"  
    projectId: "[...]"  
    serviceName: "dataflow.googleapis.com"  
    region: "us-east1-d"  
    labels: { 
    compute.googleapis.com/resource_type: "instance"  
    compute.googleapis.com/resource_name: "dataflow-[...]-08240401-e41e-harness-7dkd"  
    dataflow.googleapis.com/region: "us-east1-d"  
    dataflow.googleapis.com/job_name: "[...]"  
    compute.googleapis.com/resource_id: "[...]"  
    dataflow.googleapis.com/step_id: ""  
    dataflow.googleapis.com/job_id: "[...]"  
    } 
    timestamp: "2016-08-30T11:32:00.591Z"  
    projectNumber: "[...]"  
} 
insertId: "[...]" 
log: "dataflow.googleapis.com/worker" 
structPayload: { 
    message: "exception thrown while executing request"  
    work: "[...]"  
    thread: "117"  
    worker: "dataflow-[...]-08240401-e41e-harness-7dkd"  
    exception: "java.net.SocketTimeoutException: Read timed out 
    at java.net.SocketInputStream.socketRead0(Native Method) 
    at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) 
    at java.net.SocketInputStream.read(SocketInputStream.java:170) 
    at java.net.SocketInputStream.read(SocketInputStream.java:141) 
    at sun.security.ssl.InputRecord.readFully(InputRecord.java:465) 
    at sun.security.ssl.InputRecord.read(InputRecord.java:503) 
    at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:961) 
    at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:918) 
    at sun.security.ssl.AppInputStream.read(AppInputStream.java:105) 
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) 
    at java.io.BufferedInputStream.read1(BufferedInputStream.java:286) 
    at java.io.BufferedInputStream.read(BufferedInputStream.java:345) 
    at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:704) 
    at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:647) 
    at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1535) 
    at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1440) 
    at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480) 
    at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:338) 
    at com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:37) 
    at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:94) 
    at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:981) 
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419) 
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352) 
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469) 
    at com.google.cloud.dataflow.sdk.util.BigQueryTableInserter$1.call(BigQueryTableInserter.java:229) 
    at com.google.cloud.dataflow.sdk.util.BigQueryTableInserter$1.call(BigQueryTableInserter.java:222) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745)"  
    logger: "com.google.api.client.http.HttpTransport"  
    stage: "F5"  
    job: "[...]"  
} 
} 

私はこの1つ、次のいずれかの「再試行」のログが表示されません。

私の質問は以下のとおりです。私は、データを失うこと

  • アム?書き込み操作が正しく行われたかどうかはわかりません。コードを正しく理解すると、書き込みバッチ全体が不確定な状態になります。
  • もしそうなら、データを正確に1回BigQueryに書き込む方法がありますか?
  • もしそうなら、WARNINGの代わりにERRORの重大度はありませんか?

ここに私の使用の文脈のビットです:

  • 私は「時々」0~3倍であることができるKafkaIO.java
  • を使用してカフカからの読み込み、ストリーミングモードでデータフローを使用しています時速
  • 仕事によって、私は仕事に応じてタイプN1-標準-4
  • の2〜36人の労働者を使用しています、私はBigQueryの
  • に3K 10Kへのメッセージ/秒から書いている
  • のAv erageメッセージのサイズが3KB
  • データフローの労働者たち-east1-Dゾーンにあるされ、BigQueryのデータセットの場所は米国

答えて

1

であるあなたは、これらのエラーは、BigQueryのストリーミングサービスからの過渡的な問題に関連する表示されます。私の経験では、あなたは仕事の人生の中でこれらが飛び散っているのを見るかもしれないということです。これらのログの大規模なブレークアウトが表示されるのは、通常、BigQueryストリーミングサービスに障害が発生していることを意味します。

Cloud Dataflowはリクエストの行を再試行します(コードBigQuery... line 290を参照)。警告の後のある時点でログ項目やテーブルのレコードが表示されない場合は、別のエラーがあります。

ストリーミングモードでは、サービスは無期限に再試行します。この問題のために仕事が失敗しないことを意味します。私たちは永遠に試していますので、これがエラーであるか警告であるかは問われています。これを内部的に議論しますが、Apache Beam user groupにメモを投稿して議論を進めることができます:-)

クラウドロギングでその警告メッセージにメトリックを作成して対処できます。私たちはStackdriverの統合を深めています。これは良い使用例です。

データを失うことはなく、BigQueryへのデータの到着が遅れることになります。私はいくつかの単純な固定ウィンドウを構築し、イベント処理時間を使って1分のウィンドウを数えます。それから、私は時間の経過と共にカウントダウンを新鮮さの指標として見ます。固定ウィンドウがウォーターマークの後ろに遅れている場合は、挿入に何か問題があります。この例外パスは、この場合により問題を律速するためにテストするApiErrorExtractor()を呼び出し、から継承のIOExceptionの場合、コメント

に基づい

  • 追加の清澄化のために編集。

    この場合、SocketTimeoutはレート制限によるものではないため、例外は呼び出し側にスローされます。発信者はfinishBundleのBigQuery.IO行2308です。 IOExceptionをキャッチしてRuntimeExceptionをスローするflushRows()を呼び出します。

    スティミングモードでは、この方法で失敗したバンドルは、無期限に再試行されます。注:バッチモードでは、ランナーは4回試行して失敗します。

    この場合(レート制限なしの場合)、行ログを再試行しません。

    データは失われず、バンドルが再試行されると遅延されます。

    最悪のシナリオは、すべての作業者がこの問題を経験しているため、パイプラインが進行できないことです。これは、BigQueryストリーミングサービスが停止しているか、すべての接続を切断した場合に発生します。今度は、BiqQueryのインジェストサービスが安定してバンドルが通過すると、レート制限ケースが発生するかもしれませんが、コードをオフにすることでこれらのエラーを抑えることができます。

    極端に悪いケースは、着信パイプラインのデータレートが、BigQueryストリーミング受信サービスによって管理される最大書き込みレート(レート制限レート)に常に近づいていることです。したがって、再試行(一時的またはその他)からバックログが発生した場合、パイプラインが追いつかないことがあります。

    ストリームデータフローにはドレイン機能があり、受信データの処理を停止してパイプラインを進め、未処理のすべてのウィンドウを正常に排除します。ただし、drainではfinishBundle()が成功する必要があります。したがって、この場合(SocketTimeout)ドレインがスタックされます。パイプライン対ドレインを終了した場合、未完成バンドルのデータ損失が発生します。

    BigQuery.IOロジックをオーバーライドして、別の場所でエラーが発生しているデータをパイプすることができます。あなたはこれを行うことができますが、私はBigQueryストリーミングサービスを利用してターミナルの停止を決してしません。これは、レート制限の近いレートで常に稼働していて、回復不能なバックログ処理に敏感である場合、レート制限の問題を回避するために、異なるリダクションまたはシャーディング・メカニズムを実装することができます。

    バックログ復旧に関するもう1つの提案は、ストリーミングソースへのイベントフローを停止できることです。たとえば、Pub/Subのトピックへの書き込みを停止します。あなたはサブスクリプションで別のトピックに書き始めます。既存のデータフローパイプラインは、既存のトピックではなくなります。新しいサブスクリプションで新しいバックログを処理する方法についてはまだ対処する必要がありますが、少なくとも既存のパイプライン内のデータは失われないことが保証されています。

    イベント時間処理を使用していない場合は、この方法が有効です。ただし、イベント時間処理を使用している場合、ウィンドウは重複した出力を持ち、両方ともONTIMEとマークされますが、そうではありません。

    あなたのユースケースに関連して私のここでの前提はたくさんありますが、あなたの質問として共有したいと思っていました。

    これが役に立ちます。

+0

ありがとうございます。しかし、私はDataflowがこのバッチを再試行するとは思わない。例外がスローされるので、BigQueryによって返されたエラー(もしあれば、それを読み出すまでにタイムアウトした)は 'futures' [(#L221)]に追加されません(https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/ ee25e238e65fc71b5db7ba0dace4b45d19dbf07a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java#L221)の一覧をご覧ください。したがって、 'allErrors'(#L283)は空であり、再試行はありません。 – A21z

+0

私は、投げられた例外が呼び出し元によってどのように処理され、今日後で戻ってくるか見ていきます。 –

+0

A21z - コメントに返信する際に追加情報を追加しました。これが助けにならないかどうか私に教えてください。 –

関連する問題