本篇文章推荐通过pdf下载方式观看:https://download.zhoufengjie.cn/document/loganalysis/filebeat-logstash-ldnslog-uploadtoelasticsearch.pdf
0、环境说明:
环境搭建:https://download.zhoufengjie.cn/document/loganalysis/elasticsearch-logstash-kibana-install.pdf
filebeat+logstash做移动LDNS的日志采集上传到elasticsearch,通过kibana查看。
filebeat设备ip地址:192.168.0.105
logstash设备ip地址:192.168.0.105
elasticsearch设备ip地址:192.168.0.97
kibana设备ip地址:192.168.0.97
LDNS的log格式定义为:
####
1. 用户IP:发起DNS请求的用户IP,要求支持IPv4和IPv6用户地址;
2. 请求域名:用户请求域名(Query字段的值);
3. 解析时间:DNS服务器回复用户的时间;
4. A记录解析地址:用户域名解析请求的应答包中,Answers字段中第一个A记录IP地址;
5. 解析结果代码(RCODE):用十进制数标识,其中0-NOERROR, 1-FORMERR, 2-SERVFAIL, 3-NXDOMAIN, 4-NOTIMP, 5-REFUSED, 6~15-保留;
6. 请求DNS记录类型:用户请求类型,如A、AAAA、CNAME等,用十进制数标识,其中1-A, 28-AAAA, 5-CNAME;
7. cname:用户请求域名的所有cname域名,按照顺序存放,以分号隔开;
8. AAAA记录解析地址:用户域名解析请求的应答包中,Answers字段中第一个AAAA记录IP地址;
9. 业务IP:提供服务的DNS服务器IP地址,建议有需求的省份上报DNS缓存实际地址;没有需求的省份可不上报,但须保留该字段;
10. 解析时延(可选);
####
日志样例:
####
2409:8a44:1e00:84a2:0e37:47ff:fe76:3eae|iesdouyin.com.|20191031002459||0|1|||2409:8088:0000:0000:0000:0000:0000:0008 223.88.236.41|sina.cn.|20191031002459|221.179.175.207|0|1|||211.138.24.66 223.88.189.98|www.sina.com.cn.|20191031002459|120.192.83.125|0|1|spool.grid.sinaedge.com.||211.138.24.66 223.104.108.208|wspeed.qq.com.|20191031002459||2|28|||211.138.24.66 2409:8945:7a40:2752:5055:cd86:b0f8:6333|k35.gslb.ksyuncdn.com.|20191031002459|111.7.69.2|0|1|||2409:8088:0000:0000:0000:0000:0000:0008 111.7.89.133|cdn.cloudforest.ltd.|20191031002459||0|28|||211.138.24.66
223.88.54.236|apissl.ksapisrv.com.|20191031002459|103.107.217.103|0|1|api.ksapisrv.com.;nls-kix.ksapisrv.com.||211.138.24.66 223.91.103.90|gs.a.sohu.com.|20191031002459||0|28|fjsyyd.a.sohu.com.|2409:8c00:3001:0000:0000:0000:0000:0004|211.138.24.66 223.90.15.3|www.w3.org.|20191031002459|128.30.52.100|0|1|||211.138.24.66
####
1、filebeat部署:
编辑vi /etc/filebeat/filebeat.yml输入以下内容[在搜集nginx日志的基础上,增加dns日志的搜集]:
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31  | 
						filebeat.inputs: - input_type: log   paths:     - /var/log/dns/dns*.log   exclude_files: ['.gz$']   tags: ["filebeat-dns-log"]   document_type: dnslog - input_type: log   paths:     - /var/log/nginx/access*.log   exclude_files: ['.gz$']   tags: ["filebeat-nginx-accesslog"]   document_type: nginxaccess - input_type: log   paths:     - /var/log/nginx/error*.log   tags: ["filebeat-nginx-errorlog"]   exclude_files: ['.gz$']   document_type: nginxerror tags: ["105-filebeat-logs"] filebeat.config.modules:   path: ${path.config}/modules.d/*.yml   reload.enabled: false output.logstash:   hosts: ["127.0.0.1:5044"] #output: ##  console: ##    pretty: true  | 
					
临时测试【测试的时候,可以把output改成console,然后看输出】
/usr/bin/filebeat -e -c /etc/filebeat/filebeat.yml
启动filebeat:
systemctl start filebeat
2、配置logstash:
编辑/etc/logstash/conf.d/rec-filebeat-log.conf配置一个专门采集nginx的logstash配置,用来把filebeat上传上来的日志进行字段过滤和拆解,输入如下内容:
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109  | 
						input {   beats {     port => 5044     host => "127.0.0.1"   } } filter {   if "105-filebeat-logs" in [tags] {     if "filebeat-dns-log" in [tags] {       grok {         match => { "message" => ["%{IP:[ldns][request][remote_ip]}\|%{HOSTNAME:[ldns][request][domain]}\|%{GREEDYDATA:content}"] }         remove_field => "message"         add_field => {"logtype"=>"ldnslogs"}       }       mutate {         #add_field => { "read_timestamp" => "%{@timestamp}" }         #convert => ["totaltime","float"]         split => ["content","|"]         add_field => { "[ldns][resolve][time]" => "%{[content][0]}" }         add_field => { "[ldns][resolve][addr]" => "%{[content][1]}" }         add_field => { "[ldns][resolve][code]" => "%{[content][2]}" }         add_field => { "[ldns][request][type]" => "%{[content][3]}" }         add_field => { "[ldns][resolve][cname]" => "%{[content][4]}" }         add_field => { "[ldns][resolve][ipv6addr]" => "%{[content][5]}" }         add_field => { "[ldns][resolve][serverip]" => "%{[content][6]}" }         remove_field => "content"       }       geoip {         source => "[ldns][request][remote_ip]"         database => "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-geoip-6.0.3-java/vendor/GeoLite2-City.mmdb"         target => "[ldns][request][geoip]"         fields => ["ip","country_name","region_name","city_name","latitude","longitude","region_code"]         add_field => [ "[ldns][request][geoip][coordinates]" , "%{[ldns][request][geoip][longitude]}" ]         add_field => [ "[ldns][request][geoip][coordinates]" , "%{[ldns][request][geoip][latitude]}" ]       }       geoip {         source => "[ldns][resolve][addr]"         database => "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-geoip-6.0.3-java/vendor/GeoLite2-City.mmdb"         target => "[ldns][resolve][geoip]"         fields => ["ip","country_name","region_name","city_name","latitude","longitude","region_code"]         add_field => [ "[ldns][resolve][geoip][coordinates]" , "%{[ldns][resolve][geoip][longitude]}" ]         add_field => [ "[ldns][resolve][geoip][coordinates]" , "%{[ldns][resolve][geoip][latitude]}" ]       }     }     if "filebeat-nginx-accesslog" in [tags] {       grok {         match => { "message" => ["%{DATA:[nginx][access][time]} %{DATA:[nginx][access][request_time]} %{IPORHOST:[nginx][access][remote_ip]} %{DATA:[nginx][access][upstream][cache_status]} %{NUMBER:[nginx][access][response_code]} %{NUMBER:[nginx][access][body_sent][bytes]} %{WORD:[nginx][access][method]} %{DATA:[nginx][access][scheme]} %{DATA:[nginx][access][domain]} %{DATA:[nginx][access][url]} %{DATA:[nginx][access][args]} %{DATA:[nginx][access][user_name]} %{DATA:[nginx][access][upstream][upstream_ip]} %{NUMBER:[nginx][access][upstream][response_code]} %{DATA:[nginx][access][upstream][response_time]} \"%{DATA:[nginx][access][upstream][content_type]}\" \"%{DATA:[nginx][access][referrer]}\" \"%{DATA:[nginx][access][agent]}\" \"%{GREEDYDATA:[nginx][access][cookie]}\""] }         remove_field => "message"         add_field => {"logtype"=>"nginxLogs"}       }       grok {         match => {"[nginx][access][url]" =>  "%{URIPATH:api}"}       }       mutate {         add_field => { "read_timestamp" => "%{@timestamp}" }         #convert => ["totaltime","float"]       }       #date {       #  match => [ "[nginx][access][time]", "dd/MMM/YYYY:H:m:s Z" ]       #  remove_field => "[nginx][access][time]"       #}       useragent {         source => "[nginx][access][agent]"         target => "[nginx][access][user_agent]"         remove_field => "[nginx][access][agent]"       }       geoip {         source => "[nginx][access][remote_ip]"         target => "[nginx][access][geoip]"       }     }     if "filebeat-nginx-errorlog" in [tags] {       grok {         match => { "message" => ["%{DATA:[nginx][error][time]} \[%{DATA:[nginx][error][level]}\] %{NUMBER:[nginx][error][pid]}#%{NUMBER:[nginx][error][tid]}: (\*%{NUMBER:[nginx][error][connection_id]} )?%{GREEDYDATA:[nginx][error][message]}"] }         remove_field => "message"         add_field => {"logtype"=>"nginxLogs"}       }       mutate {         rename => { "@timestamp" => "read_timestamp" }       }       date {         match => [ "[nginx][error][time]", "YYYY/MM/dd H:m:s" ]         remove_field => "[nginx][error][time]"       }     }   } } output {   if "105-filebeat-logs" in [tags] {     if [logtype] == "ldnslogs" {       elasticsearch {         hosts => ["192.168.0.97:9200"]         manage_template => false         index => "ldnslogs-%{[@metadata][beat]}-%{+YYYY.MM.dd}"       }     #if [logtype] == "nginxLogs" {     #  elasticsearch {     #    hosts => ["192.168.0.97:9200"]     #    manage_template => false     #    index => "nginxlogs-%{[@metadata][beat]}-%{+YYYY.MM.dd}"     #  }       #stdout {       #  codec => rubydebug       #}     }   } }  | 
					
注:gork格式说明:https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns
重启logstash:
systemctl restart logstash
3、查看kibana日志
管理=>索引模式=>创建索引模式=>ldnslog-filebeat*
然后就可以进一步分析查看了,点击discover可以查看日志状态。上传到elasticsearch上面的日志,