[elk@localhost logstash-5.4.0]$
vi
nginx.conf 这里新生成一个配置文件
input {
kafka {
codec =>
"json"
topics_pattern =>
"logstash-.*"
bootstrap_servers =>
"192.168.12.105:9092"
auto_offset_reset =>
"latest"
group_id =>
"logstash-g1"
}
}
filter {
if
"nginx-accesslog"
in
[tags] {
grok {
match => {
"message"
=>
"%{IPORHOST:http_host} %{IPORHOST:clientip} - %{USERNAME:remote_user} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:http_verb} %{NOTSPACE:http_request}(?: HTTP/%{NUMBER:http_version})?|%{DATA:raw_http_request})\" %{NUMBER:response} (?:%{NUMBER:bytes_read}|-) %{QS:referrer} %{QS:agent} %{QS:xforwardedfor} %{NUMBER:request_time:float} %{GREEDYDATA:traceID}"
}
}
mutate {
convert => [
"status"
,
"integer"
]
convert => [
"body_bytes_sent"
,
"integer"
]
convert => [
"request_time"
,
"float"
]
}
geoip {
source
=>
"remote_addr"
}
date
{
match => [
"timestamp"
,
"dd/MMM/YYYY:HH:mm:ss Z"
]
}
useragent {
source
=>
"http_user_agent"
}
}
if
"tomcat-accesslog"
in
[tags] {
grok {
match => {
"message"
=>
"%{IPORHOST:clientip} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:http_verb} %{NOTSPACE:http_request}(?: HTTP/%{NUMBER:http_version})?|%{DATA:raw_http_request})\" %{NUMBER:response} (?:%{NUMBER:bytes_read}|-) %{QS:referrer} %{NUMBER:request_time:float} %{GREEDYDATA:traceID}"
}
}
date
{
match => [
"timestamp"
,
"dd/MMM/YYYY:HH:mm:ss Z"
]
}
}
}
output {
elasticsearch {
hosts => [
"192.168.12.109:9200"
]
index =>
"logstash-%{type}-%{+YYYY.MM.dd}"
document_type =>
"%{type}"
}
}
保存,并启动
[elk@localhost logstash-5.4.0]$
nohup
bin
/logstash
-f nginx.conf &
网友评论