2016-12-28 11 views
0

kafkaを入力として使用し、logstashを出力として使いたいです。私はlogstashにいくつかのトピックをフィードし、トピックに従ってフィルタリングしたいと思う。私はそのようなコードを書こうとしました:書き方kafkaトピックをフィルタリングするためのLogstashフィルタ

input { 
    kafka { 
     bootstrap_servers => "localhost:9092" 
     topics => ["test", "payment"] 
     } 
} 

filter { 
    if [topic] = "test" { 
     //do something 
    } else { 
     //do something 
    } 
} 

しかし、それは動作しないようです。

+0

はあなたが直面している問題は何ですか?どんな種類のエラー? 'topic'ではなく' topics'でなければならない 'topic'フィルタに' s'がありませんか? – Kulasangar

答えて

0

を追加してkafkaフィールドを追加する必要があります。

トピックにメッセージサイズなどのKafkaメタデータを追加するオプション。これにより、kafkaという名前のフィールドが次の属性を含むlogstashイベントに追加されます。topic:このメッセージが関連するトピックconsumer_group:このイベントを読み込むために使用されるコンシューマ・グループ:このメッセージが関連付けられるパーティションoffset:パーティションは、このメッセージは、キーに関連付けられている:のByteBufferキー

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#plugins-inputs-kafka-decorate_events

メッセージを含む、次いでを追加することによって、この

input { 
    kafka { 
    bootstrap_servers => "localhost:9092" 
    topics => ["test", "payment"] 
    } 
} 

filter { 
    if [kafka][topic] = "test" { 
    //do something 
    } else { 
    //do something 
    } 
} 
0

等CONF変更入力部を更新します3210を入力してkafkaフィールドを追加します。次のように

input { 
    kafka { 
     bootstrap_servers => "localhost:9092" 
     topics => ["test", "payment"] 
     decorate_events => true 
    } 
} 

変更フィルタ部を:

filter { 
    if [@metadata][kafka][topic] == "test" { 
     //do something 
    } else { 
     //do something 
    } 
} 
関連する問題