2016-09-02 9 views
0

私は15分ごとにS3イベントによってトリガーされるJavaラムダ機能を持っています。私は約3時間ごとに、ラムダの各呼び出しに最新のファイルがアップロードされていることと、その3時間以内にアップロードされたすべてのファイルが含まれていることに気付きました。S3ファイルがAWS Lambdaで複数回処理されています

リスト全体を反復処理する場合、以前のラムダ呼び出しですでに処理されていたファイルが繰り返されます。

アップロードされた最新のファイルのみを処理する方法を教えてください。 node.jsには、context.suceed()があります。これは、イベントが正常に処理されたものとみなします。 Javaはそれを持っていないようです。

以下は、Cloudwatchのログです。私は質問がしかし以下、マイケルによって回答されていると信じてい

08:35:16 START RequestId: 56c0dc17-6f77-11e6-a102-7517541f4ac3 Version: $LATEST 
08:35:26 TIME - AUTHENTICATE: 8101ms 
08:35:26 TIME - MESSAGE PARSE: 1ms 
08:35:26 data :: event/events/2016/ 08/31/2016 0831123000.export.csv 
08:35:35 Processed 147 events 
08:35:35 TIME - FILE PARSE: 9698 
08:35:35 Found 1 event files 
08:35:35 Total function took: 17800ms 
08:35:35 END RequestId: 56c0dc17-6f77-11e6-a102-7517541f4ac3 
08:35:35 REPORT RequestId: 56c0dc17-6f77-11e6-a102-7517541f4ac3 Duration: 19403.67 ms Billed Duration: 19500 ms Memory Size: 192 MB Max Memory Used: 116 MB 
08:45:03 START RequestId: bcb8e064-6f78-11e6-baea-a312004d2418 Version: $LATEST 
08:45:03 TIME - AUTHENTICATE: 119ms 
08:45:03 TIME - MESSAGE PARSE: 0ms 
08:45:03 data :: event/events/2016/ 08/31/2016 0831123000.export.csv 
08:45:05 Processed 147 events 
08:45:05 data :: event/events/2016/ 08/31/2016 0831124500.export.csv 
08:45:06 Processed 211 events 
08:45:06 TIME - FILE PARSE: 2499 
08:45:06 Found 2 event files 
08:45:06 Total function took: 2618ms 
08:45:06 END RequestId: bcb8e064-6f78-11e6-baea-a312004d2418 
08:45:06 REPORT RequestId: bcb8e064-6f78-11e6-baea-a312004d2418 Duration: 2796.25 ms Billed Duration: 2800 ms Memory Size: 192 MB Max Memory Used: 116 MB 
09:05:02 START RequestId: 8747aa 08-6f7b-11e6-80fd-f30a15cf07fc Version: $LATEST 
09:05:02 TIME - AUTHENTICATE: 98ms 
09:05:02 TIME - MESSAGE PARSE: 0ms 
09:05:02 data :: event/events/2016/ 08/31/2016 0831123000.export.csv 
09:05:03 Processed 147 events 
09:05:03 data :: event/events/2016/ 08/31/2016 0831124500.export.csv 
09:05:04 Processed 211 events 
09:05:04 data :: event/events/2016/ 08/31/2016 0831130000.export.csv 
09:05:04 Processed 204 events 
09:05:04 TIME - FILE PARSE: 2242 
09:05:04 Found 3 event files 
09:05:04 Total function took: 2340ms 
09:05:04 END RequestId: 8747aa 08-6f7b-11e6-80fd-f30a15cf07fc 

EDIT 1は、他の誰のためのコードの一部です。私は実際にレコードを保持するためにグローバルリストを使用しています。

パブリッククラスLambdaHandler {

private final List<GDELTEventFile> eventFiles = new ArrayList<>(); 
private AmazonS3Client s3Client; 
private final CSVFormat CSV_FORMAT = CSVFormat.TDF.withIgnoreEmptyLines().withTrim(); 

public void gdeltHandler(S3Event event, Context context) { 
    StopWatch sw = new StopWatch(); 
    long time = 0L; 

    sw.start(); 
    s3Client = new AmazonS3Client(new EnvironmentVariableCredentialsProvider()); 
    sw.split(); 
    System.out.println("TIME - AUTHENTICATE: " + sw.getSplitTime() + "ms"); 
    time += sw.getSplitTime(); 
    sw.reset(); 

    sw.start(); 
    processEvent(event); 
    sw.split(); 
    System.out.println("TIME - MESSAGE PARSE: " + sw.getSplitTime() + "ms"); 
    time += sw.getSplitTime(); 
    sw.reset(); 

    sw.start(); 
    processFiles(); 
    sw.split(); 
    System.out.println("TIME - FILE PARSE: " + sw.getSplitTime()); 
    time += sw.getSplitTime(); 

    System.out.println("Found " + eventFiles.size() + " event files"); 
    System.out.println("Total function took: " + time + "ms"); 
} 

private void processEvent(S3Event event) { 
    List<S3EventNotification.S3EventNotificationRecord> records = event.getRecords(); 
    for (S3EventNotification.S3EventNotificationRecord record : records) { 
     long filesize = record.getS3().getObject().getSizeAsLong(); 
     eventFiles.add(new GDELTEventFile(record.getS3().getBucket().getName(), record.getS3().getObject().getKey(), filesize)); 
    } 
} 

private void processFiles() { 
    for (GDELTEventFile event : eventFiles) { 
     try { 
      System.out.println(event.getBucket() + " :: " + event.getFilename()); 
      GetObjectRequest request = new GetObjectRequest(event.getBucket(), event.getFilename()); 
      S3Object file = s3Client.getObject(request); 
      try (BufferedReader reader = new BufferedReader(new InputStreamReader(file.getObjectContent()))) { 
       CSVParser parser = new CSVParser(reader, CSV_FORMAT); 
       int count = 0; 
       for (CSVRecord record : parser) { 
         count++; 
        } 
       } 
       System.out.println("Processed " + count + " events"); 
      } 
     } catch (IOException ioe) { 
      System.out.println("IOException :: " + ioe); 
     } 
    } 
} 
+0

'data ::'行の情報はどこから来ていますか?私は、S3がイベントごとに複数のレコードを送信するのを見たことはありません...実際、私のコードは予期しないことがあったとしても、これを止めて例外をスローするように設計されています。コンテナの再利用の可能性を説明できなかった可能性があります。 Lambdaイベントが以前の呼び出しから同じコンテナを再利用する場合、古いイベントを保持しているグローバルなデータ構造を持つ可能性はありますか? –

+0

こんにちはMichael、実際の行がどこに印刷されているのかを尋ねるなら、関数内からそれらの行を表示しています。それは単なるSystem.out.printlnです。ファイル自体はgdeltサービスによって15分ごとに生成され、ec2インスタンスでダウンロードしてからcli経由でアップロードします。ラムダでコンテナを再利用する方法は?以前はそれを聞いたことがありません... – Brooks

+0

コンテナの再利用を説明するとき、私は[それが物事であることに気づく]ことを意味します(https://aws.amazon.com/blogs/compute/container-reuse-in-ラムダ/)とそれに応じたコード。関数が起動するたびに、ラムダ関数を実行しているアイドル状態のプロセスに起動しますが、これは新しいかもしれないし、前回の関数を実行した*同じ*プロセスかもしれませんし、新しいコードをアップロードしていないので)。S3からグローバル配列にレコードをプッシュしている場合、その配列を反復処理するときにハンドラにスコープを設定するのではなく、古いイベントが含まれることがあります。 –

答えて

3

これは、ラムダのcontainer reuseの重要な側面を一望符号の場合である - ラムダにおける容器の再使用は、プロセスの再利用を含みます。関数が再利用されたコンテナで実行されるとき、関数は以前と同じプロセスで実行されている必要があります。

S3のイベント通知データ構造は、イベントごとに複数のオブジェクトを含めることができるようなものですが、これは決して実現しません...しかし、イベント構造をグローバル構造にプッシュするということは、後の関数呼び出しで古いデータが表示されます。

これはキャッシュとして非常に役立ちますが、コードをどのように設計しなければならないかに大きな影響を及ぼします。常に1つの呼び出しから将来の呼び出し、それに応じてコードを生き延びることはできません。

コンテナを再利用することは、コンテナを再利用することでスペースが枯渇する可能性がある場合は、一時ファイルをクリーンアップする必要があることにも注意してください。

ファンクションコードを再デプロイすると、常に古いコンテナが破棄され、将来の最新バージョンの呼び出しで再利用されることはありません。

関連する問題