2016-05-20 4 views
3

私はいくつかのシステムを流れるメッセージを持っています。各システムはメッセージのエントリを記録し、タイムスタンプとuuidのメッセージIDで終了します。私は今、これらのイベント持っている結果イベント間の時間の計算

filebeat --> logstash --> elastic search --> kibana 

:私は、を介してすべてのログを摂取してい

@timestamp      messageId        event 
May 19th 2016, 02:55:29.003  00e02f2f-32d5-9509-870a-f80e54dc8775 system1Enter 
May 19th 2016, 02:55:29.200  00e02f2f-32d5-9509-870a-f80e54dc8775 system1Exit 
May 19th 2016, 02:55:29.205  00e02f2f-32d5-9509-870a-f80e54dc8775 system2Enter 
May 19th 2016, 02:55:29.453  00e02f2f-32d5-9509-870a-f80e54dc8775 system2Exit 

を私がレポートで過ごした時間の(理想的に積み重ね棒グラフまたは列)を生成したいと思います各システム:

messageId        in1:1->2:in2 
00e02f2f-32d5-9509-870a-f80e54dc8775 197:5:248 

これを行うにはどのような方法が最適ですか?ログシュートフィルター?木場計算フィールド?

+0

logstash [集合フィルタ(https://www.elastic.co/guide/en/logstash/current/plugins-filters-aggregate.html) – Val

+0

@valに記載の本の良い例がありますこの例では、経過時間がログ行にある必要があります。2つの別々のログ行から計算しません。たぶん私は 'Elapsed {}'プラグインと一緒にそれを何らかの形で使うことができます。 – RaGe

答えて

9

これはLogstash aggregate filterのみで実現できますが、elapsed filterが既に行っていることを実質的に再実装する必要があります。

次に、Logstash aggregate filterelapsed filterの組み合わせを使用してみましょう。後者は各ステージの時間を測定するために使用され、前者はすべてのタイミング情報を最後のイベントに集計するために使用されます。

サイドノート:タイムスタンプ形式を再考して、解析のための標準とすることができます。私は簡単に解析できるようにISO 8601に変換しましたが、独自の正規表現を自由に使うことができます。

だから私は、次のログから始めている:私はすべて収集するために、3つのelapsedフィルタ(各段階in1に1つ、1->2in2)、その後、3つの集計のフィルタを使用してい

2016-05-19T02:55:29.003 00e02f2f-32d5-9509-870a-f80e54dc8775 system1Enter 
2016-05-19T02:55:29.200 00e02f2f-32d5-9509-870a-f80e54dc8775 system1Exit 
2016-05-19T02:55:29.205 00e02f2f-32d5-9509-870a-f80e54dc8775 system2Enter 
2016-05-19T02:55:29.453 00e02f2f-32d5-9509-870a-f80e54dc8775 system2Exit 

ファーストタイミング情報それは次のようになります。3番目のイベントの後

{ 
       "@timestamp" => "2016-05-21T04:20:51.731Z", 
         "tags" => [ "elapsed", "elapsed_match", "in1" ], 
       "elapsed_time" => 0.197, 
        "messageId" => "00e02f2f-32d5-9509-870a-f80e54dc8775", 
    "elapsed_timestamp_start" => "2016-05-19T00:55:29.003Z" 
} 

、最初の2つのイベントの後

filter { 
    grok { 
    match => ["message", "%{TIMESTAMP_ISO8601:timestamp} %{UUID:messageId} %{WORD:event}"] 
    add_tag => [ "%{event}" ] 
    } 
    date { 
    match => [ "timestamp", "ISO8601"] 
    } 
    # Measures the execution time of system1 
    elapsed { 
    unique_id_field => "messageId" 
    start_tag => "system1Enter" 
    end_tag => "system1Exit" 
    new_event_on_match => true 
    add_tag => ["in1"] 
    } 
    # Measures the execution time of system2 
    elapsed { 
    unique_id_field => "messageId" 
    start_tag => "system2Enter" 
    end_tag => "system2Exit" 
    new_event_on_match => true 
    add_tag => ["in2"] 
    } 
    # Measures the time between system1 and system2 
    elapsed { 
    unique_id_field => "messageId" 
    start_tag => "system1Exit" 
    end_tag => "system2Enter" 
    new_event_on_match => true 
    add_tag => ["1->2"] 
    } 
    # Records the execution time of system1 
    if "in1" in [tags] and "elapsed" in [tags] { 
    aggregate { 
     task_id => "%{messageId}" 
     code => "map['report'] = [(event['elapsed_time']*1000).to_i]" 
     map_action => "create" 
    } 
    } 
    # Records the time between system1 and system2 
    if "1->2" in [tags] and "elapsed" in [tags] { 
    aggregate { 
     task_id => "%{messageId}" 
     code => "map['report'] << (event['elapsed_time']*1000).to_i" 
     map_action => "update" 
    } 
    } 
    # Records the execution time of system2 
    if "in2" in [tags] and "elapsed" in [tags] { 
    aggregate { 
     task_id => "%{messageId}" 
     code => "map['report'] << (event['elapsed_time']*1000).to_i; event['report'] = map['report'].join(':')" 
     map_action => "update" 
     end_of_task => true 
    } 
    } 
} 

、あなたは197msがSYSTEM1に費やされていることを示している。このような新しいイベントを取得しますあなたはSYSTEM1とSYSTEM2の間で費やされているどのくらいの時間を示し、このようなイベント、すなわち5msの買ってあげる:第四イベント後

{ 
       "@timestamp" => "2016-05-21T04:20:51.734Z", 
         "tags" => [ "elapsed", "elapsed_match", "1->2" ], 
       "elapsed_time" => 0.005, 
        "messageId" => "00e02f2f-32d5-9509-870a-f80e54dc8775", 
    "elapsed_timestamp_start" => "2016-05-19T00:55:29.200Z" 
} 

を、あなたはどのように示して、このような新しいイベントを、取得します多くのティ私はsystem2、すなわち248msに費やされました。そのイベントはまた、私はlogstash 5.4で、この作品のためにいくつかの微調整をしなければならなかったメッセージ

{ 
       "@timestamp" => "2016-05-21T04:20:51.736Z", 
         "tags" => [ "elapsed", "elapsed_match", "in2" ], 
       "elapsed_time" => 0.248, 
        "messageId" => "00e02f2f-32d5-9509-870a-f80e54dc8775", 
    "elapsed_timestamp_start" => "2016-05-19T00:55:29.205Z" 
        "report" => "197:5:248" 
} 
+0

おかげさまで、私は同じウサギの穴を下っていました。私が経過したフィルタについて気づいたことは、それぞれのログ行がいくつかの経過したフィルタのうちの1つと一致するため、一致しないすべてのものが '_grokparsefailure'タグを追加することです。私は 'tag_on_failure => []'を経過フィルタに追加しました。 – RaGe

+0

あなたはこの作品を期待通りに作れましたか? – Val

+0

私は 'to_i 'で見ているエラーを解明しようとしています。明らかにマップ行の入力は文字列ではなく配列です。 '集計例外が発生しました。エラー:[197.0]の未定義メソッド 'to_i:Array' – RaGe

1

のすべてのタイミング情報とreportフィールドが含まれ、ここに改訂コードです。

filter { 
    grok { 
    match => ["message", "%{TIMESTAMP_ISO8601:timestamp} %{UUID:messageId} %{WORD:event}"] 
    add_tag => [ "%{event}" ] 
    } 
    date { 
    match => [ "timestamp", "ISO8601"] 
    } 
    # Measures the execution time of system1 
    elapsed { 
    unique_id_field => "messageId" 
    start_tag => "system1Enter" 
    end_tag => "system1Exit" 
    new_event_on_match => true 
    add_tag => ["in1"] 
    } 
    # Measures the time between system1 and system2 
    elapsed { 
    unique_id_field => "messageId" 
    start_tag => "system1Exit" 
    end_tag => "system2Enter" 
    new_event_on_match => true 
    add_tag => ["1->2"] 
    } 
    # Measures the execution time of system2 
    elapsed { 
    unique_id_field => "messageId" 
    start_tag => "system2Enter" 
    end_tag => "system2Exit" 
    new_event_on_match => true 
    add_tag => ["in2"] 
    } 
    # Records the execution time of system1 
    if "in1" in [tags] and "elapsed" in [tags] { 
    aggregate { 
     task_id => "%{messageId}" 
     code => "map['report'] = (event.get('elapsed_time')*1000).to_i" 
     map_action => "create" 
    } 
    } 
    # Records the time between system1 and system2 
    if "1->2" in [tags] and "elapsed" in [tags] { 
    aggregate { 
     task_id => "%{messageId}" 
     code => "map['report'] << (event.get('elapsed_time')*1000).to_i" 
     map_action => "update" 
    } 
    } 
    # Records the execution time of system2 
    if "in2" in [tags] and "elapsed" in [tags] { 
    aggregate { 
     task_id => "%{messageId}" 
     code => "map['report'] << (event.get('elapsed_time')*1000).to_i" 
     map_action => "update" 
     end_of_task => true 
    } 
    } 
} 
+0

経過フィルタと集計フィルタは正しく動作しません複数のlogstashワーカーが使用されている場合その場合、同じ問題をどのように解決するのでしょうか?ありがとう。 –

+0

1つの作業を使用するようにします。これはボトルネックになる可能性がありますが、他のアイテムは同じパスに従う必要はありません –