0

イムカフカと協力最初の数秒でメッセージを生成するために遅く、私はそのようなプロデューサー行わ:あなたはそれが非常に簡単である見ることができるようカフカは

synchronized (obj) { 

     while (true){ 

      long start = Instant.now().toEpochMilli(); 
      for (int i=0; i< NUM_MSG_SEC ; i++) 
      { 

       PriceStreamingData data = PriceStreamingData.newBuilder() 
         .setUser(getRequest().getUser()) 
         .setSecurity(getRequest().getSecurity()) 
         .setTimestamp(Instant.now().toEpochMilli()) 
         .setPrice(new Random().nextDouble()*200) 
         .build(); 


       record = new ProducerRecord<>(topic, keyBuilder.build(data), 
         data); 



       producer.send(record,new Callback(){ 
        @Override 
        public void onCompletion(RecordMetadata arg0, Exception arg1) { 
         counter.incrementAndGet(); 
         if(arg1 != null){ 
          arg1.printStackTrace(); 
         } 


        } 
       }); 

      } 
      long diffCiclo = Instant.now().toEpochMilli() - start; 
      long diff = Instant.now().toEpochMilli() - startTime; 


      System.out.println("Number of sent: " + counter.get() + 
        " Millisecond:" + (diff) + " - NumberOfSent/Diff(K): " + counter.get()/diff); 

      try { 
       if(diffCiclo >= 1000){ 
        System.out.println("over 1 second: " + diffCiclo); 

       } 
       else { 
        obj.wait(1000 - diffCiclo); 

       } 
      } catch (InterruptedException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 




     } 
    } 

、それだけで新しいメッセージを作成し、それを送信します。 私がログ表示された場合:最初の10秒で

NumberOfSent/Diff(K) 

を、それは非常に悪い行うだけ

30k per second 

60秒後に私は

180k per second 

理由がありますか?どのようにしてすでに180kにプロセスを開始することができますか?

私カフカのプロデューサーの構成はFollwing

Async producer (but also with sync producer the situation dose not change) 
    ACKS_CONFIG = 0 
    BATCH_SIZE_CONFIG = 20000 
    COMPRESSION_TYPE_CONFIG = none 
    LINGER_MS_CONFIG = 0 

細部である:

NUM_MSG_SEC is set to 200000 or bigger number 
+0

遅延を引き起こす 'obj'にロックがありますか? 'isRunning()'がtrueを返すのはいつですか? –

+0

他に何もobjをロックしていない、遅れを引き起こすものは何もないと思います。遅延はどこかではないのですが、コードは非常にシンプルですが、私はそれがkafka設定の周りにあると思います。(isRunningは常にtrueです) –

+0

おそらく 'synchronized(obj)'の直後にlog文を追加して、コードが実際に実行された時期を判断してください。おそらくあなたのコードが実行される前に何が起こっているかを見るためにデバッグでも実行されます。 –

答えて

0

私が自分で解決策を見つけたと、私はこの記事があまりにも他の人のために有用であることを願って。

問題は、私のパラメータは、私はより高い値200000と最後に1000に彼らにそれらを設定しなかった問題を修正するために、20000と0だった

ProducerConfig.BATCH_SIZE_CONFIG 

ProducerConfig.LINGER_MS_CONFIG 

に立ちます私はパラメータを使ってJVMを起動しました:

-XX:MinMetaspaceFreeRatio=100 
-XX:MaxMetaspaceFreeRatio=100 

メタスペースを適切な値に設定します。

ここで、プロデューサは140kで直接開始し、1秒で180kになります。

関連する問題