2016-05-20 4 views


filebeat --> logstash --> elastic search --> kibana 


@timestamp      messageId        event 
May 19th 2016, 02:55:29.003  00e02f2f-32d5-9509-870a-f80e54dc8775 system1Enter 
May 19th 2016, 02:55:29.200  00e02f2f-32d5-9509-870a-f80e54dc8775 system1Exit 
May 19th 2016, 02:55:29.205  00e02f2f-32d5-9509-870a-f80e54dc8775 system2Enter 
May 19th 2016, 02:55:29.453  00e02f2f-32d5-9509-870a-f80e54dc8775 system2Exit 


messageId        in1:1->2:in2 
00e02f2f-32d5-9509-870a-f80e54dc8775 197:5:248 



logstash [集合フィルタ(https://www.elastic.co/guide/en/logstash/current/plugins-filters-aggregate.html) – Val


@valに記載の本の良い例がありますこの例では、経過時間がログ行にある必要があります。2つの別々のログ行から計算しません。たぶん私は 'Elapsed {}'プラグインと一緒にそれを何らかの形で使うことができます。 – RaGe



これはLogstash aggregate filterのみで実現できますが、elapsed filterが既に行っていることを実質的に再実装する必要があります。

次に、Logstash aggregate filterelapsed filterの組み合わせを使用してみましょう。後者は各ステージの時間を測定するために使用され、前者はすべてのタイミング情報を最後のイベントに集計するために使用されます。

サイドノート:タイムスタンプ形式を再考して、解析のための標準とすることができます。私は簡単に解析できるようにISO 8601に変換しましたが、独自の正規表現を自由に使うことができます。


2016-05-19T02:55:29.003 00e02f2f-32d5-9509-870a-f80e54dc8775 system1Enter 
2016-05-19T02:55:29.200 00e02f2f-32d5-9509-870a-f80e54dc8775 system1Exit 
2016-05-19T02:55:29.205 00e02f2f-32d5-9509-870a-f80e54dc8775 system2Enter 
2016-05-19T02:55:29.453 00e02f2f-32d5-9509-870a-f80e54dc8775 system2Exit 


       "@timestamp" => "2016-05-21T04:20:51.731Z", 
         "tags" => [ "elapsed", "elapsed_match", "in1" ], 
       "elapsed_time" => 0.197, 
        "messageId" => "00e02f2f-32d5-9509-870a-f80e54dc8775", 
    "elapsed_timestamp_start" => "2016-05-19T00:55:29.003Z" 


filter { 
    grok { 
    match => ["message", "%{TIMESTAMP_ISO8601:timestamp} %{UUID:messageId} %{WORD:event}"] 
    add_tag => [ "%{event}" ] 
    date { 
    match => [ "timestamp", "ISO8601"] 
    # Measures the execution time of system1 
    elapsed { 
    unique_id_field => "messageId" 
    start_tag => "system1Enter" 
    end_tag => "system1Exit" 
    new_event_on_match => true 
    add_tag => ["in1"] 
    # Measures the execution time of system2 
    elapsed { 
    unique_id_field => "messageId" 
    start_tag => "system2Enter" 
    end_tag => "system2Exit" 
    new_event_on_match => true 
    add_tag => ["in2"] 
    # Measures the time between system1 and system2 
    elapsed { 
    unique_id_field => "messageId" 
    start_tag => "system1Exit" 
    end_tag => "system2Enter" 
    new_event_on_match => true 
    add_tag => ["1->2"] 
    # Records the execution time of system1 
    if "in1" in [tags] and "elapsed" in [tags] { 
    aggregate { 
     task_id => "%{messageId}" 
     code => "map['report'] = [(event['elapsed_time']*1000).to_i]" 
     map_action => "create" 
    # Records the time between system1 and system2 
    if "1->2" in [tags] and "elapsed" in [tags] { 
    aggregate { 
     task_id => "%{messageId}" 
     code => "map['report'] << (event['elapsed_time']*1000).to_i" 
     map_action => "update" 
    # Records the execution time of system2 
    if "in2" in [tags] and "elapsed" in [tags] { 
    aggregate { 
     task_id => "%{messageId}" 
     code => "map['report'] << (event['elapsed_time']*1000).to_i; event['report'] = map['report'].join(':')" 
     map_action => "update" 
     end_of_task => true 


       "@timestamp" => "2016-05-21T04:20:51.734Z", 
         "tags" => [ "elapsed", "elapsed_match", "1->2" ], 
       "elapsed_time" => 0.005, 
        "messageId" => "00e02f2f-32d5-9509-870a-f80e54dc8775", 
    "elapsed_timestamp_start" => "2016-05-19T00:55:29.200Z" 

を、あなたはどのように示して、このような新しいイベントを、取得します多くのティ私はsystem2、すなわち248msに費やされました。そのイベントはまた、私はlogstash 5.4で、この作品のためにいくつかの微調整をしなければならなかったメッセージ

       "@timestamp" => "2016-05-21T04:20:51.736Z", 
         "tags" => [ "elapsed", "elapsed_match", "in2" ], 
       "elapsed_time" => 0.248, 
        "messageId" => "00e02f2f-32d5-9509-870a-f80e54dc8775", 
    "elapsed_timestamp_start" => "2016-05-19T00:55:29.205Z" 
        "report" => "197:5:248" 

おかげさまで、私は同じウサギの穴を下っていました。私が経過したフィルタについて気づいたことは、それぞれのログ行がいくつかの経過したフィルタのうちの1つと一致するため、一致しないすべてのものが '_grokparsefailure'タグを追加することです。私は 'tag_on_failure => []'を経過フィルタに追加しました。 – RaGe


あなたはこの作品を期待通りに作れましたか? – Val


私は 'to_i 'で見ているエラーを解明しようとしています。明らかにマップ行の入力は文字列ではなく配列です。 '集計例外が発生しました。エラー:[197.0]の未定義メソッド 'to_i:Array' – RaGe



filter { 
    grok { 
    match => ["message", "%{TIMESTAMP_ISO8601:timestamp} %{UUID:messageId} %{WORD:event}"] 
    add_tag => [ "%{event}" ] 
    date { 
    match => [ "timestamp", "ISO8601"] 
    # Measures the execution time of system1 
    elapsed { 
    unique_id_field => "messageId" 
    start_tag => "system1Enter" 
    end_tag => "system1Exit" 
    new_event_on_match => true 
    add_tag => ["in1"] 
    # Measures the time between system1 and system2 
    elapsed { 
    unique_id_field => "messageId" 
    start_tag => "system1Exit" 
    end_tag => "system2Enter" 
    new_event_on_match => true 
    add_tag => ["1->2"] 
    # Measures the execution time of system2 
    elapsed { 
    unique_id_field => "messageId" 
    start_tag => "system2Enter" 
    end_tag => "system2Exit" 
    new_event_on_match => true 
    add_tag => ["in2"] 
    # Records the execution time of system1 
    if "in1" in [tags] and "elapsed" in [tags] { 
    aggregate { 
     task_id => "%{messageId}" 
     code => "map['report'] = (event.get('elapsed_time')*1000).to_i" 
     map_action => "create" 
    # Records the time between system1 and system2 
    if "1->2" in [tags] and "elapsed" in [tags] { 
    aggregate { 
     task_id => "%{messageId}" 
     code => "map['report'] << (event.get('elapsed_time')*1000).to_i" 
     map_action => "update" 
    # Records the execution time of system2 
    if "in2" in [tags] and "elapsed" in [tags] { 
    aggregate { 
     task_id => "%{messageId}" 
     code => "map['report'] << (event.get('elapsed_time')*1000).to_i" 
     map_action => "update" 
     end_of_task => true 

経過フィルタと集計フィルタは正しく動作しません複数のlogstashワーカーが使用されている場合その場合、同じ問題をどのように解決するのでしょうか?ありがとう。 –


1つの作業を使用するようにします。これはボトルネックになる可能性がありますが、他のアイテムは同じパスに従う必要はありません –