2016-03-23 18 views
0

私のログファイルには、httpログファイルのようにウェブサイトの階層が表示されるような、アイテムの階層を表すデータがあります。階層的なデータ照合と表示

私は、このようなこの

41 2016-01-01 01:41:32-500 show:category:all 
41 2016-01-01 04:11:20-500 show:category:animals 
42 2016-01-02 01:41:32-500 show:item:wallaby 
42 2016-01-02 01:41:32-500 show:home 

などのデータを有していてもよく、私は...ここで%{NUMBER:terminal}%{TIMESTAMP_ISO8601:ts}(?<info>([^\r])*)

を3項目を持つことになり、私はmutateを使用して配列に情報データを解析し、 splitを入力してlvl1:lvl2:lvl3['lvl1','lvl2','lvl3']に変換してください。

私はinfo[0]が同じであるか、またはinfo[0]info[1]が同じであるすべてのレコードを数えるなど、さまざまなレベルでカウントを簡単に取得するためにデータを集計することに興味があります。 (時間範囲と端末を選択できるようにする)

この種の情報を可視化する方法はありますか? また、データにアクセスしやすくするためにフィルタがデータと一致する方法を変更する必要がありますか? レベルの深さはさまざまですが、最大レベルが5であることはかなり確信で​​きます。したがって、テキストをさまざまなフィールドに解析することができます。lvl1lvl2lvl3lvl4lvl5

答えて

0

あなたの質問によると、私はあなたのデータ解析方法に同意します。しかし、私はそれを直接集計可能にするためにさらに追加したいと思います&は、木場を使用して視覚化します。

アプローチがなければならない: -

  • フィルタ%を使用してデータを{NUMBER:ターミナル}%{TIMESTAMP_ISO8601:TS}と(?([^ \ R])*){によって与えられた情報を1としてあなた}
  • 変異し
  • フィルター

あなたが言及したようにその後のmutate &フィルタを使用した後、あなたは配列{の観点からデータを取得します}

  • 今度はadd_field =>言及することにより、レベル1としてフィールドを追加することができる[ "フィールド名" を、 "%{[arrayname] [0]}"]
  • は今度はadd_fieldに言及することによってレベル2にフィールドを追加することができ
  • add_field => ["fieldname"、 "%{[arrayname] [2]"という名前を付けてレベル3としてフィールドを追加することができます。 }」]

このような情報を可視化するには、直接「きばら」を使用できます。私がやってしまったものだ

+0

。また、{info_1} =〜/%\ {info \ [0 \] \}/{mutate {remove_field => ["info_1"]}} 'の各配列フィールドに別のフィルタを追加しました。空のフィールド – Daniel

+0

クール。そのような解決策を探している他の人たちのために完全なlogstash構成ファイルを共有することができれば幸いです。 –

0

私の解決策

input { 
    file { 
     path => "C:/Temp/zipped/*.txt" 
     start_position => beginning 
     ignore_older => 0 
     sincedb_path => "C:/temp/logstash_temp2.sincedb" 
    } 
} 

filter { 
    grok { 
    match => ["message","^%{NOTSPACE}\[%{NUMBER:terminal_id}\] %{NUMBER:log_level} %{NUMBER} %{TIMESTAMP_ISO8601:ts} \[(?<facility>([^\]]*))\] (?<lvl>([^$|\r])*)"] 
    } 
    mutate { 
     split => ["lvl", ":"] 
     add_field => {"lvl_1" => "%{lvl[0]}"} 
     add_field => {"lvl_2" => "%{lvl[1]}"} 
     add_field => {"lvl_3" => "%{lvl[2]}"} 
     add_field => {"lvl_4" => "%{lvl[3]}"} 
     add_field => {"lvl_5" => "%{lvl[4]}"} 
     add_field => {"lvl_6" => "%{lvl[5]}"} 
     add_field => {"lvl_7" => "%{lvl[6]}"} 
     add_field => {"lvl_8" => "%{lvl[7]}"} 
     lowercase => [ "terminal_id" ] # set to lowercase so that it can be used for index - additional filtering may be required 
    } 
    date { 
    match => ["ts", "YYYY-MM-DD HH:mm:ssZZ"] 
    } 
} 
filter { 
    if [lvl_1] =~ /%\{lvl\[0\]\}/ {mutate {remove_field => [ "lvl_1" ]}} 
    if [lvl_2] =~ /%\{lvl\[1\]\}/ {mutate {remove_field => [ "lvl_2" ]}} 
    if [lvl_3] =~ /%\{lvl\[2\]\}/ {mutate {remove_field => [ "lvl_3" ]}} 
    if [lvl_4] =~ /%\{lvl\[3\]\}/ {mutate {remove_field => [ "lvl_4" ]}} 
    if [lvl_5] =~ /%\{lvl\[4\]\}/ {mutate {remove_field => [ "lvl_5" ]}} 
    if [lvl_6] =~ /%\{lvl\[5\]\}/ {mutate {remove_field => [ "lvl_6" ]}} 
    if [lvl_7] =~ /%\{lvl\[6\]\}/ {mutate {remove_field => [ "lvl_7" ]}} 
    if [lvl_8] =~ /%\{lvl\[7\]\}/ {mutate {remove_field => [ "lvl_8" ]}} 
    mutate{ 
    remove_field => [ "lvl","host","ts" ] # do not keep this data 
    } 
} 


output { 
    if [facility] == "mydata" { 
    elasticsearch { 
     hosts => ["localhost:9200"] 
     index => "logstash-mydata-%{terminal_id}-%{+YYYY.MM.DD}" 
    } 
    } else { 
    elasticsearch { 
     hosts => ["localhost:9200"] 
     index => "logstash-other-%{terminal_id}-%{+YYYY.MM.DD}" 
    } 
    } 
    # stdout { codec => rubydebug } 
} 
関連する問題