私たちは、本物のストリーミングジョブで糸で動作するmeter、gaugeなどのカスタムメトリックを実行しています。私たちは、その後、MyMapperクラスにメーターを追加
バージョン1.2.1を使用している
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-dropwizard</artifactId>
<version>${flink.version}</version>
</dependency>
追加の依存関係がのpom.xmlする:ここで
は手順です。
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper;
import org.apache.flink.metrics.Meter;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Test {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.readTextFile("/home/LizardKing/Documents/Power/Prova.csv")
.map(new MyMapper())
.writeAsCsv("/home/LizardKing/Results.csv");
JobExecutionResult res = env.execute();
}
private static class MyMapper extends RichMapFunction<String, Object> {
private transient Meter meter;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new DropwizardMeterWrapper(new com.codahale.metrics.Meter()));
}
@Override
public Object map(String value) throws Exception {
this.meter.markEvent();
return value;
}
}
}
これが役に立ちます。
正確にあなたは苦労していますか?スループットのために、あなたが提供したリンクのように 'MyMapper'関数に' Meter'を登録します。 Flink Webダッシュボードでメトリックをライブで見ることができます。 – us2012
私がmyMeterクラスを実装する必要があるという指示に従えば、私は何か試してみましたが動作しません。 DropWizardメーターを使用してスタンドアロンモードで実行しようとすると、pom.xmlに依存関係を含めてもエラー(java.lang.NoClassDefFoundError:com/codahale/metrics/Meter )があります。 – LizardKing