2017-12-22 8 views
0

のKafkaExceptionをキャッチすることはできません私のコードの一部です:Pythonはここconfluent_kafka

class KafkaProducer: 

    def __init__(self): 
    pass 

    bootstrap_server_host = system_config.get_kafka_bootstrap_server() 
    producer = Producer({'bootstrap.servers': bootstrap_server_host, "log.connection.close":False}) 

    @classmethod 
    def send(cls, topic, key, value, data_type=None, uid=None): 
     try: 
      data = {"data": value, "createTime": long(time.time() * 1000)} 
      if data_type is not None: 
      data["type"] = int(data_type) 
      if uid is not None: 
      data["uid"] = long(uid) 
      cls.producer.produce(topic, json.dumps(data), key) 
      cls.producer.poll(0) 
     except BufferError as e: 
      logger.error('%% Local producer queue is full ' \ 
         '(%d messages awaiting delivery): try again\n' % 
         len(cls.producer)) 
      raise e 

class new_application_scanner(): 
   @classmethod 
    def scan_new_application(cls): 
     db_source = None 
     try: 
      db_source = DBConnector().connect() 
      db_cur = db_source.cursor() 

      ... 

         KafkaProducer.send("RiskEvent", str(uid), 
        {"uid": uid, "country_id": user_info[1], "event_id": constant.RISK_EVENT_NEW_APPLICATION}) 

      ... 
     except Exception as e: 
      logger.error(traceback.format_exc()) 
     finally: 
      if db_source is not None: 
      db_source.close() 




def run_scan_new_application(): 
    while is_scan_new_application_active: 
    try: 
     logging.info("scan_new_application starts at %s",time.time()) 
     new_application_scanner.scan_new_application() 
     logging.info("scan_new_application ends at %s", time.time()) 
    except Exception as e: 
     logging.error("new_application_scanner Error:%s",format(e)) 
     logging.error(traceback.format_exc()) 
    time.sleep(10) 


t1 = threading.Thread(target=run_scan_new_application, name='run_scan_new_application', args=([])) 
t1.start() 

私は2台のサーバーのカフカ・グループを持っています。 2つのサーバを1つずつ再起動すると、KafkaProducer.send()はKafkaException(おそらくconfluent_kafkaのいくつかのバグ)をスローし、いくつかの例外ログがあります。

奇妙なことに、例外はscan_new_applicationからスローされ続け、run_scan_new_applicationにも例外ログがあります。でもスレッドstopped.Hereは、例外のログです:

2017-12-21 07:11:49 INFO pre_risk_control_flow.py:71 pid-16984 scan_new_application starts at 1513840309.6 
2017-12-21 07:11:49 ERROR new_application_scan.py:165 pid-16984 Traceback (most recent call last): 
    File "/home/ubuntu/data/code/risk/Feature_Engine/data_retrive/pre_risk_control_flow/new_application_scan.py", line 163, in scan_new_application 
    {"uid": uid, "country_id": user_info[1], "event_id": constant.RISK_EVENT_NEW_APPLICATION}) 
    File "/home/ubuntu/data/code/risk/Feature_Engine/data_retrive/kafka_client/Producer.py", line 27, in send 
    cls.producer.produce(topic, json.dumps(data), key) 
KafkaException: KafkaError{code=_UNKNOWN_TOPIC,val=-188,str="Unable to produce message: Local: Unknown topic"} 

2017-12-21 07:11:49 ERROR pre_risk_control_flow.py:75 pid-16984 new_application_scanner Error:KafkaError{code=_UNKNOWN_TOPIC,val=-188,str="Unable to produce message: Local: Unknown topic"} 

答えて

0

基礎となるクライアントがKafkaException KafkaError{code=_UNKNOWN_TOPIC..}を上げていることは(今)要求されたトピックは、クラスタ内に存在しない(と自動トピックの作成が無効になっている)知っているので。これは予想されます。

send()にキャッチしていないため、例外はrun_scan_new_applicationに表示されます。