2017-03-02 4 views
0

私は以下のアプリケーションを持っています(私はこのフレームワークではかなり新しいです)。キューからメッセージを読み込むときにキャッシュサイズ(増加)を見たいのですが、すべて0のままです時間。Igniteキャッシュから正しく読み取る方法

KafkaStreamer<String, String, String> kafkaStreamer = new KafkaStreamer<>(); 

    Ignition.setClientMode(true); 

    Ignite ignite = Ignition.start(); 

    Properties settings = new Properties(); 
    // Set a few key parameters 
    settings.put("bootstrap.servers", "localhost:9092"); 
    settings.put("group.id", "test"); 
    settings.put("zookeeper.connect", "localhost:2181"); 
    settings.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
    settings.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    settings.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
    settings.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 

    // Create an instance of StreamsConfig from the Properties instance 
    kafka.consumer.ConsumerConfig config = new ConsumerConfig(settings); 

    IgniteCache<String, String> cache = ignite.getOrCreateCache("myCache"); 

    IgniteDataStreamer<String, String> stmr = ignite.dataStreamer("myCache"); 
    // allow overwriting cache data 
    stmr.allowOverwrite(true); 

    kafkaStreamer.setIgnite(ignite); 
    kafkaStreamer.setStreamer(stmr); 

    // set the topic 
    kafkaStreamer.setTopic("test"); 

    // set the number of threads to process Kafka streams 
    kafkaStreamer.setThreads(1); 

    // set Kafka consumer configurations 
    kafkaStreamer.setConsumerConfig(config); 

    // set decoders 
    StringDecoder keyDecoder = new StringDecoder(null); 
    StringDecoder valueDecoder = new StringDecoder(null); 

    kafkaStreamer.setKeyDecoder(keyDecoder); 
    kafkaStreamer.setValueDecoder(valueDecoder); 

    kafkaStreamer.start(); 

    while (true) { 

     System.out.println(cache.metrics().getSize()); 
     Thread.sleep(200); 
    } 

誰かが間違っているか間違っていると言うことができますか?

ありがとうございます!

答えて

1

おそらく、IgniteDataStreamerのバッファを満たすのに十分なエントリを消費しないでしょう。フラッシュタイムアウトを設定してください:

stmr.autoFlushFrequency(1000); 
0

メトリックは、パフォーマンスの理由からデフォルトでは無効になっています。設定ファイルのCacheConfiguration.setStatisticsEnabled(true)またはstatisticsEnabledプロパティを使用してメトリックを有効にすることができます。

関連する問題