2016-10-18 4 views
1

kafka-nodeを使用してkafkaでNodeを初めて使用しています。メッセージを消費するには、外部APIを呼び出す必要があります。外部APIを呼び出す必要があります。私は消費者が突然失敗するのを克服したい。消費者が失敗した場合、それを消費する消費者は仕事が完了しなかったという同じメッセージを受け取る。kafka-nodeを使用して消費されたkafkaメッセージのコミットを制御する方法

私はkafka 0.10を使用しており、ConsumerGroupを使用しようとしています。

私はオプションで設定し、その作業が完了した時点でメッセージをコミットすることを考えました(前に以前にいくつかのJavaコードで行ったように)。

しかし、いったん完了したらメッセージを正しくコミットする方法がわからないようです。どうすればいいですか?

もう一つの心配は、コールバックのために、前のメッセージが終了する前に次のメッセージが読み込まれているように見えることです。メッセージx + 2がメッセージx + 1の前に終了した場合、オフセットはx + 2に設定されるため、失敗した場合、x + 1は決して再実行されません。ここで

は、私がこれまで何をしたか、基本的である:あなたがここに行うことができます

var options = { 
    host: connectionString, 
    groupId: consumerGroupName, 
    id: clientId, 
    autoCommit: false 
}; 

var kafka = require("kafka-node"); 
var ConsumerGroup = kafka.ConsumerGroup; 

var consumerGroup = new ConsumerGroup(options, topic); 

consumerGroup.on('connect', function() { 
    console.log("Consuming Kafka %s, topic=%s", JSON.stringify(options), topic); 
}); 

consumerGroup.on('message', function(message) { 
    console.log('%s read msg Topic="%s" Partition=%s Offset=%d', this.client.clientId, message.topic, message.partition, message.offset); 
    console.log(message.value); 
    doSomeStuff(function() { 
     // HOW TO COMMIT???? 
     consumerGroup.commit(function(err, data) { 
      console.log("------ Message done and committed ------"); 
     }); 
    }); 
}); 

consumerGroup.on('error', function(err) { 
    console.log("Error in consumer: " + err); 
    close(); 
}); 

process.once('SIGINT', function() { 
    close(); 
}); 

var close = function() { 
    // SHOULD SEND 'TRUE' TO CLOSE ??? 
    consumerGroup.close(true, function(error) { 
     if (error) { 
      console.log("Consuming closed with error", error); 
     } else { 
      console.log("Consuming closed"); 
     } 
    }); 
}; 

答えて

0

ことの一つは、あなたが処理するすべてのメッセージの再試行メカニズムを持つことです。

あなたはこのスレッドで私の答えを相談することができます。 https://stackoverflow.com/a/44328233/2439404

私はasync/cargoを使用してそれらを一緒にkafka-consumer、バッチを使用してカフカからのメッセージを消費し、async/queue(インメモリキュー)に入れ。このキューは、私がasync/retryableを渡す引数としてワーカー関数をとります。

ご迷惑をおかけしても、再試行可能を使用してメッセージの処理を行うことができます。 https://caolan.github.io/async/docs.html#retryable

これで問題が解決する場合があります。

+0

あなたのプロフィールへの回答ポイントへのリンク –

+0

完了。指摘してくれてありがとう –

関連する問題