PubSubから読み込み、いくつかのフィールドを抽出してbigtableに書き込むデータフローストリーミングジョブがあります。私たちは、自動スケーリング時にデータフローのスループットが低下するのを観察しています。たとえば、データフロージョブが現在2人のワーカーで実行されていて、100メッセージ/秒の速度で処理している場合、自動スケーリング中に100メッセージ/秒のこのレートが低下し、何回か0に低下してから500メッセージに増加します/秒。私たちは毎回これを見ています。これは、オートスケーリング中にシステムのラグが高くなり、パブ/サブの応答されないメッセージが大きくなっています。自動スケーリング中にGoogle Dataflowスループットが低下する
これはデータフロー自動スケーリングの予想される動作ですか、それとも自動応答して未確認メッセージのスパイスを最小限に抑えながらこの100メッセージ/秒を維持する方法ですか?私は、パブ/サブのStackdriverとデータフローのスクリーンショットを添付しています2017-10-23_12_29_09-11538967430775949506
: (ご注意:100メッセージ/秒、500のメッセージ/秒は一例の数字です)
プル要求毎回のデータフローの自動スケーリングの数の低下があります。私はタイムスタンプ付きのスクリーンショットを撮ることができませんでしたが、プルリクエストをドロップすると、時間データフロー自動スケーリングと一致します。 ===========編集:=======================
私たちは以下を使用してGCSに並列で書いています前述のウィンドウ処理。
実際に何が起こっているWindowedFileNames.java
public class WindowedFileNames extends FilenamePolicy implements OrangeStreamConstants{
/**
*
*/
private static final long serialVersionUID = 1L;
private static Logger logger = LoggerFactory.getLogger(WindowedFileNames.class);
protected final String outputPath;
public WindowedFileNames(String outputPath) {
this.outputPath = outputPath;
}
@Override
public ResourceId windowedFilename(ResourceId outputDirectory, WindowedContext context, String extension) {
IntervalWindow intervalWindow = (IntervalWindow) context.getWindow();
DateTime date = intervalWindow.maxTimestamp().toDateTime(DateTimeZone.forID("America/New_York"));
String fileName = String.format(FOLDER_EXPR, outputPath, //"orangestreaming",
DAY_FORMAT.print(date), HOUR_MIN_FORMAT.print(date) + HYPHEN + context.getShardNumber());
logger.error(fileName+"::::: File name for the current minute");
return outputDirectory
.getCurrentDirectory()
.resolve(fileName, StandardResolveOptions.RESOLVE_FILE);
}
@Override
public ResourceId unwindowedFilename(ResourceId outputDirectory, Context context, String extension) {
return null;
}
}
これを見て – Pablo