2017-06-16 4 views
2

Apache Flink用の非常に簡単なJavaプログラムを作成しました。スループット(1秒あたり処理されるタプル数)や待ち時間(プログラムが毎回処理する必要がある時間など)入力タプル)。Apache Flinkでのスループットと待ち時間

https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/metrics.html

しかし、私は、私が欲しいものを得るためにそれらを使用する方法がわからないです:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

env.readTextFile("/home/LizardKing/Documents/Power/Prova.csv") 
     .map(new MyMapper().writeAsCsv("/home/LizardKing/Results.csv"); 

JobExecutionResult res = env.execute(); 

私はFLINKは、いくつかの指標を公開することを知っています。リンクから、平均スループットを測定するために「メーター」を使用することができますが、定義した後、どのように使用すればよいですか?

+0

正確にあなたは苦労していますか?スループットのために、あなたが提供したリンクのように 'MyMapper'関数に' Meter'を登録します。 Flink Webダッシュボードでメトリックをライブで見ることができます。 – us2012

+0

私がmyMeterクラスを実装する必要があるという指示に従えば、私は何か試してみましたが動作しません。 DropWizardメーターを使用してスタンドアロンモードで実行しようとすると、pom.xmlに依存関係を含めてもエラー(java.lang.NoClassDefFoundError:com/codahale/metrics/Meter )があります。 – LizardKing

答えて

1

私たちは、本物のストリーミングジョブで糸で動作するmeter、gaugeなどのカスタムメトリックを実行しています。私たちは、その後、MyMapperクラスにメーターを追加

バージョン1.2.1を使用している

<dependency> 
    <groupId>org.apache.flink</groupId> 
    <artifactId>flink-metrics-dropwizard</artifactId> 
    <version>${flink.version}</version> 
</dependency> 

追加の依存関係がのpom.xmlする:ここで

は手順です。

import org.apache.flink.api.common.JobExecutionResult; 
import org.apache.flink.api.common.functions.RichMapFunction; 
import org.apache.flink.configuration.Configuration; 
import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper; 
import org.apache.flink.metrics.Meter; 
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 


public class Test { 


    public static void main(String[] args) throws Exception { 

     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

     env 
       .readTextFile("/home/LizardKing/Documents/Power/Prova.csv") 
       .map(new MyMapper()) 
       .writeAsCsv("/home/LizardKing/Results.csv"); 

     JobExecutionResult res = env.execute(); 
    } 


    private static class MyMapper extends RichMapFunction<String, Object> { 

     private transient Meter meter; 

     @Override 
     public void open(Configuration parameters) throws Exception { 
      super.open(parameters); 
      this.meter = getRuntimeContext() 
        .getMetricGroup() 
        .meter("myMeter", new DropwizardMeterWrapper(new com.codahale.metrics.Meter())); 
     } 

     @Override 
     public Object map(String value) throws Exception {  
      this.meter.markEvent(); 
      return value; 
     } 
    } 
} 

これが役に立ちます。

+0

それは助けて、私はまた別の問題を抱えていました。私はこのプログラムをflinkで(IDEではなく)実行しようとしたとき、pom.xmlに依存関係を含めるには不十分であることがわかりました。私はフリンクするためにライブラリを提供しなければならなかった、私が提案したアプローチはmaven-shadeプラグインを使うことです。アップロードされたjarに依存関係をパックする必要があります。 – LizardKing

関連する問題