私はRedisとELKスタックを持っています。 スキームは以下のとおりです。logstash - >のRedis - > logstash(インデクサ) - > elasticsearch - > kibana弾性が下がる
LogstashインデクサのRedisからデータを取得し、弾性にそれを置く:
はinput {
redis {
host=>"redis"
type=>"redis-input"
data_type=>"list"
key=>"logstash"
}
}
filter {
geoip {
source=>"ipaddr"
target=>"geoip"
database=>"/GeoLiteCity.dat"
add_field=>["[geoip][coordinates]","%{[geoip][longitude]}"]
add_field=>["[geoip][coordinates]","%{[geoip][latitude]}"]
}
mutate {
remove_field=>["message","@version","timestamp"]
convert=>{"[geoip][coordinates]"=>"float"}
}
}
output {
elasticsearch {
template=>"/typing-template.json"
template_overwrite=>true
hosts=>["elasticsearch:9200"]
}
}
Iをログに記録する4サーバーがあります収集したい。ここではそのlogstash confにある:
input {
file {
path => [ "C:/Program Files (x86)/*/logs/*.log", "C:/Program Files (x86)/**/logs/*.log", "C:/Program Files/***/logs/*.log", "C:/Program Files/****/logs/*.log" ]
start_position => "beginning"
type => "mtdclog"
ignore_older => 0
sincedb_path => "NUL"
}
}
filter {
grok { match => { "path" => "%{GREEDYDATA}/(?<logdate>[0-9]{8})\.log" }}
grok { match => [ "message", "%{NONNEGINT:log_stream}\t%{TIME:logtime}\s{1,2}%{IPV4:ipaddr}\t'%{NUMBER:account}': (?<event>login) \[ver: (?<client_build>[0-9\.]+)",
"message", "%{NONNEGINT:log_stream}\t%{TIME:logtime}\s{1,2}%{IPV4:ipaddr}\t'%{NUMBER:account}': (?<event>liveupdate) '%{GREEDYDATA:data}'",
"message", "%{NONNEGINT:log_stream}\t%{TIME:logtime}\s{1,2}%{IPV4:ipaddr}\t'%{NUMBER:account}': (?<event>check version)%{GREEDYDATA:data}",
"message", "%{NONNEGINT:log_stream}\t%{TIME:logtime}\s{1,2}%{IPV4:ipaddr}\t'%{NUMBER:account}': %{GREEDYDATA:data}",
"message", "%{NONNEGINT:log_stream}\t%{TIME:logtime}\s{1,2}(?<event>News):%{GREEDYDATA:data}",
"message", "%{NONNEGINT:log_stream}\t%{TIME:logtime}\s{1,2}%{IPV4:ipaddr}\t(?<event>unknown command) (?<command_code>[A-Z0-9]+)",
"message", "%{NONNEGINT:log_stream}\t%{TIME:logtime}\s{1,2}(?<event>History):%{GREEDYDATA:data}",
"message", "%{NONNEGINT:log_stream}\t%{TIME:logtime}\s{1,2}%{GREEDYDATA:log_line}",
"message", "%{GREEDYDATA:log_line}"
]
}
mutate {
add_field => { "ts" => "%{logdate} %{logtime}"}
remove_field => [ "logdate", "logtime" ]
}
date {
match => [ "ts", "YYYYMMdd HH:mm:ss.SSS" ]
target => "@timestamp"
}
if [path] =~ "Pattern1" { mutate { add_field => { "dc_type" => "Pattern1" } }}
if [path] =~ "Pattern2" { mutate { add_field => { "dc_type" => "Pattern2" } }}
mutate { remove_field => [ "message", "@version", "ts", "path", "host" ]
add_field => { "location" => "somecity" }
convert => { "log_stream" => "integer"
"client_build" => "integer"
"account" => "integer"
}
}
}
output {
redis {
host => "xxx.yyy.zzz.aaa"
port => "6381"
data_type => "list"
key => "logstash" }
タスク:私は1ヶ月間の古いログを処理したい 。毎日のログファイルは約35MBです。したがって、4台のサーバからの総容量は140MBほどではありません。
問題: その後、私はlogstashサービスを開始します - すべてが良好で、4-5時間は正常に動作します。私は木場の解析されたデータを見て、それを扱うことができます。しかし、弾力性が低下します。 メッセージは「30000ms後の要求タイムアウト」です。
同じELKスタック私は別のサーバとlogstash configsに使用しています - それはgreateで動作し、より多くのログ行を処理します。しかし、私はこの場合のトラブルを理解することはできません。