2017-01-12 9 views
0

カフカの消費者から同じコードを読み込んで同じトピックIDから同じトピックを読んで、効率的に大量のトピックを消費するようにしました。しかし、それは正常に起動した後にエラーが出ると思われ、私はそれが消費者がメッセージを待ってから失敗すると考えています。しかし、問題は、ある消費者がシャットダウンすると、他の消費者もシャットダウンすることです。mulithreaded kafka消費者、ノーシュアエント例外なし

ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); 
    try { 

     while (it.hasNext()) 
     { 
     try{ 

      String mesg = new String(it.next().message()); 
      System.out.println(mesg); 
      if (StringUtils.isEmpty(mesg)){ 
       continue; 
      } 
      System.out.println("Thread " + m_threadNumber + ": " + 
        new  String(it.next().message())); 
      mesg = messageFormat.createMsg(mesg); 
      System.out.println("MESSAGE TRANSMISSION SUCCESSFUL!"); 
      }   
      catch(Exception e) 
      { 
       e.printStackTrace(); 
       continue; 
      } 

      } 
      }catch(Exception e) 
      { 
     e.printStackTrace(); 

    } 

    System.out.println("Shutting down Thread: " + m_threadNumber); 
    //System.out.println("Shutting down Thread: " + m_threadNumber); 
    } 

ここに例外エラーがあります。

NoSuchElementException exception 

ここでは彼の仕事が欲しいと願っていますので、ここで助けてください。前もって感謝します。

+0

例外スタックトレースを送信すると役立つ可能性があります。 –

答えて

0

置き換えます。 System.out.println( "Thread" + m_threadNumber + ":" + new String(it.next()。message()));

と System.out.println( "Thread" + m_threadNumber + ":" + mesg);

助けてください。

ログを出力する場合は、イテレータに次の要素がない可能性があるため、例外が発生するため、再度ログを出力します。

関連する問題