在上一篇系列文章《【ES私房菜】收集 Linuix 系统日志》我们已经完成了Linux系统日志上报ES这个简单的试运行项目,我们现在对数据收集、处理以及上报等流程也有了一个全局的认知和了解,下面,我们一起看看ES如何收集Apache日志。
虽然logstash也能处理我们常见的WEB日志格式,但是需要写正则匹配,比较麻烦。幸好Apache支持自定义日志格式,所以这里我们对Apache日志格式进行了下改造,让它输出json格式。
WEB日志的字段很多,所以第一步,我们需要先对齐到底需要采集哪些字段,然后将日志格式定下来,这里我简单分享下网管这边设置的字段,如果想了解其他字段,请参考本文最后的附录。
Apache 日志中的客户端IP不能像Nginx那样使用map模块灵活获取,不管是使用IP变量 %a(直连IP) 还是 X-real-IP(非权威真实IP) 都无法覆盖所有请求场景。
所以,这里我们需要特殊处理下:
LogFormat "{ \
\"@timestamp\": \"%{%Y-%m-%dT%H:%M:%S%z}t\", \
\"client_ip\": \"%{X-Forwarded-For}i\", \
\"direct_ip\": \""%a\", \
\"request_time\": %T, \
\"status\": %>s, \
\"url\": \"%U%q\", \
\"method\": \"%m\", \
\"http_host\": \"%{Host}i\", \
\"server_ip\": \"%A\", \
\"http_referer\": \"%{Referer}i\", \
\"http_user_agent\": \"%{User-agent}i\", \
\"body_bytes_sent\": \"%B\", \
\"total_bytes_sent\": \"%O\" \
}" access_log_json
我们在日志里面指定2种远端IP,一个是代理叠加而成的IP列表:X-Forwarded-For,另一个是直连的远程IP:%a,当用户是直接访问WEB,而没有经过Haproxy等代理时,X-Forwarded-For 和 %a 应该是同一个IP,而经过代理的访问,则%a是X-Forwarded-For的一个子集。
最终,我们可以在logstash里面做一些逻辑处理,得到用户的真实IP。
由于字段中用到了 body_bytes_sent 和 total_bytes_sent 发送字节数统计字段,所以这里需要给Apache集成一下mod_logio.so模块,如果不需要这2个字段,则跳过此步骤。
这里分享下这个模块的集成过程:
①、下载Apache源码并解压
②、cd 到Apache源码目录下的 modules/loggers 目录,执行如下命令:
# 这里注意下现有 apache 的安装目录
/usr/local/apache2/bin/apxs -c -n -i -a mod_logio.c
正常情况下,会自动在 /usr/local/apache2/modules/ 下生成 mod_logio.so,并自动在httpd.conf里面Load这个模块,如果发现没有生成或加载,则手工操作:
# 编译后会在.libs目录下生成,我们手工拷贝到modules目录:
cp -f .libs/mod_logio.so /usr/local/apache2/modules
# 然后编辑 Apache 配置文件 httpd.conf ,新增如下配置:
LoadModule logio_module modules/mod_logio.so
③、配置日志:
将第①步设计好的日志格式以及日志文件配置添加到 httpd.conf ,比如:
# 自定义日志格式
LogFormat "{\"@timestamp\":\"%{%Y-%m-%dT%H:%M:%S%z}t\",\"client_ip\":\"%a\",\"request_time\":%T,\"status\":%>s,\"url\":\"%U%q\",\"method\":\"%m\",\"http_host\":\"%{Host}i\",\"server_ip\":\"%A\",\"http_referer\":\"%{Referer}i\",\"http_user_agent\":\"%{User-agent}i\",\"body_bytes_sent\":\"%B\",\"total_bytes_sent\":\"%O\"}" access_log_json
# 定义日志文件
CustomLog "|/usr/local/apache2/bin/rotatelogs -l /data/wwwlogs/access_%Y%m%d.log 86400" access_log_json
如果,现网采用的是虚拟主机模式,也就是配置了 httpd-vhost.conf 文件,那还得检查下这个文件中是否也配置 CustomLog,如果有配置,要么注释,要么参考上述CustomLog做相应的修改。
④、热重载Apache:
/usr/local/apache2/bin/apachectl -k graceful
Tips:使用graceful指令可以平滑重启Apache,如果配置有错误,Apache也不会异常退出。
Ps:当然,这里是按天在/data/wwwlogs下生成日志文件,所以要记得创建这个目录,并根据httpd运行的用户赋予权限,避免日志无法写入。
成功配置日志之后,日志输出长这样:
格式化之后:
{
"@timestamp": "2017-09-15T18:45:31+0800",
"client_ip": "10.64.64.128",
"request_time": 4,
"status": 200,
"url": "/index.php/netcontrol/command_tool/execute_command",
"method": "POST",
"http_host": "sniper.oa.com",
"server_ip": "172.16.x.xxx",
"http_referer": "http://sniper.oa.com/netcontrol/command_tool/home",
"http_user_agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.113 Safari/537.36 SmartProxyAgent/1.1.5.32:taothe-NB1",
"body_bytes_sent": "30315",
"total_bytes_sent": "30759"
}
这时候,我们还可以根据日志格式做相应调整,最终满足我们的分析需求。
按照《【ES私房菜】Filebeat安装部署及配置详解》在需要采集WEB日志的服务器上部署filebeat,然后编写如下配置:
vim filebeat.yml
############################# input #########################################
filebeat.prospectors:
- input_type: log
paths: /data/wwwlogs/access-*.log
document_type: "web_access_log"
spool_size: 1024
idle_timeout: "5s"
name: 172.16.x.xxx
############################# Kafka #########################################
output.kafka:
# initial brokers for reading cluster metadata
hosts: ["x.x.x.1:9092","x.x.x.2:9092","x.x.x.3:9092"]
# message topic selection + partitioning
topic: '%{[type]}'
flush_interval: 1s
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
############################# Logging #########################################
logging.level: info
logging.to_files: true
logging.to_syslog: false
logging.files:
path: /data/filebeat/logs
name: filebeat.log
keepfiles: 7
在正式上报数据之前,我们先配置下ES的template:
{
"template": "web_access_log-*",
"mappings": {
"log": {
"properties": {
"@timestamp": {
"include_in_all" : false,
"type" : "date"
},
"server_ip": {
"index": "not_analyzed",
"type": "ip"
},
"http_host": {
"index": "not_analyzed",
"type": "string"
},
"client_ip": {
"index": "not_analyzed",
"type": "ip"
},
"method": {
"type": "string"
},
"url": {
"type": "string"
},
"http_referer": {
"type": "string"
},
"body_bytes_sent": {
"index": "not_analyzed",
"type": "long"
},
"total_bytes_sent": {
"index": "not_analyzed",
"type": "long"
},
"status": {
"index": "not_analyzed",
"type": "long"
},
"request_time": {
"index": "not_analyzed",
"type": "double"
},
"http_user_agent": {
"type":"string"
}
}
}
}
}
Ps:这里就不详细说明每个字段含义了,请参考系列文章《ElastiSearch template简介(整理中)》.
将上述模板保存为 web.json 的文件,然后执行如下命令进行导入:
curl -XPUT http://x.x.x.x:9200/_template/template-web_access_log -d @web.json
正常将返回如下结果:
{
"acknowledged" : true
}
四、配置logstash
模板导入之后,我们再配置 logstash。
vim logstash.conf
input {
kafka {
bootstrap_servers => "x.x.x.1:9092,x.x.x.2:9092,x.x.x.3:9092"
topics => "web_access_log"
group_id => "logstash"
codec => json {
charset => "UTF-8"
}
add_field => { "[@metadata][type]" => "web_access_log" }
}
}
filter {
if [@metadata][type] == "web_access_log" {
# 这里对UTF-8单字节编码做了下替换处理,否则URL有中文会出现json无法解析报错
mutate {
gsub => ["message", "\\x", "\\\x"]
}
# 这里排除了下HEAD请求,如需要排除其他关键词,可自行添加
if ( 'method":"HEAD' in [message] ) {
drop {}
}
json {
source => "message"
remove_field => "message"
# 这里添加一个变量,用于后面的判断
add_field => { "[@metadata][direct_ip]" => "%{direct_ip}"}
remove_field => "[beat][hostname]"
remove_field => "[beat][name]"
remove_field => "@version"
remove_field => "[beat][version]"
remove_field => "input_type"
remove_field => "offset"
remove_field => "tags"
remove_field => "type"
remove_field => "host"
}
# 对client_ip 按逗号进行切分(X-Forwarded-For是多个IP,当然单个IP这里也会切分)
mutate {
split => ["client_ip", ","]
}
# 取出切分数组的第一个值(即真实IP),并替换给client_ip(此时列表已还原为单个IP)
# 注意:这个mutate必须分开写,写到一个里面是不行的!
mutate {
replace => { "client_ip" => "%{client_ip[0]}" }
}
# 当client_ip值为-的时候(X-For为空的情况)
if [client_ip] == "-" {
# 当 direct_ip 不为空(因为Nginx并没有报这个字段)
if [@metadata][direct_ip] not in ["%{direct_ip}","-"] {
# 把 direct_ip 的值赋给client_ip,完成对调
mutate {
replace => { "client_ip" => "%{direct_ip}" }
}
# 极端情况下,直接删除本条数据(命中client_ip和direct_ip都不对)
} else {
drop{}
}
}
# 移除 direct_ip 这个字段
mutate {
remove_field => "direct_ip"
}
}
}
output {
#stdout{
# codec => rubydebug
#}
if [@metadata][type] == "web_access_log" {
elasticsearch {
hosts => ["x.x.x.x:9200"]
index => "web_access_log-%{+YYYY.MM.dd}"
# 禁止logstash管理模板,并指定es模板
manage_template => false
template_name => "template-web_access_log"
}
}
}
可以看到,在filter部分对数据进行了较为复杂的逻辑处理,最终实现了获取用户真实IP的目标,而且这个规则同时兼容Nginx日志处理。
启动logstash上报数据之后,我们还需要在kibana里面配置下索引:
①、如图打开索引管理:
②、如图点击创建索引:
③、如图输入logstash指定的索引前缀,自动带出字段后选择时间戳字段,点击【Create】即可:
最后,回到Discover界面就能看到期待已久的高清美图了:
本文就介绍这么多,更多Kibana的奇淫巧计请关注《ES私房菜系列文章之教你玩转Kibana(整理中)》。
Apache日志格式字符串的含义
%% 百分号(Apache2.0.44或更高的版本)
%a 远端IP地址
%A 本机IP地址
%B 除HTTP头以外传送的字节数
%b 以CLF格式显示的除HTTP头以外传送的字节数,也就是当没有字节传送时显示’-‘而不是0。
%{Foobar}C 在请求中传送给服务端的cookieFoobar的内容。
%D 服务器处理本请求所用时间,以微为单位。
%{FOOBAR}e 环境变量FOOBAR的值
%f 文件名
%h 远端主机
%H 请求使用的协议
%{Foobar}i 发送到服务器的请求头Foobar:的内容。
%l 远端登录名(由identd而来,如果支持的话),除非IdentityCheck设为”On“,否则将得到一个”-”。
%m 请求的方法
%{Foobar}n 来自另一个模块的注解Foobar的内容。
%{Foobar}o 应答头Foobar:的内容。
%p 服务器服务于该请求的标准端口。
%P 为本请求提供服务的子进程的PID。
%{format}P 服务于该请求的PID或TID(线程ID),format的取值范围为:pid和tid(2.0.46及以后版本)以及hextid(需要APR1.2.0及以上版本)
%q 查询字符串(若存在则由一个”?“引导,否则返回空串)
%r 请求的第一行
%s 状态。对于内部重定向的请求,这个状态指的是原始请求的状态,—%>s则指的是最后请求的状态。
%t 时间,用普通日志时间格式(标准英语格式)
%{format}t 时间,用strftime(3)指定的格式表示的时间。(默认情况下按本地化格式)
%T 处理完请求所花时间,以秒为单位。
%u 远程用户名(根据验证信息而来;如果返回status(%s)为401,可能是假的)
%U 请求的URL路径,不包含查询字符串。
%v 对该请求提供服务的标准ServerName。
%V 根据UseCanonicalName指令设定的服务器名称。
%X 请求完成时的连接状态:
X= 连接在应答完成前中断。
+= 应答传送完后继续保持连接。
-= 应答传送完后关闭连接。
(在1.3以后的版本中,这个指令是%c,但这样就和过去的SSL语法:%{var}c冲突了)
%I 接收的字节数,包括请求头的数据,并且不能为零。要使用这个指令你必须启用mod_logio模块。
%O 发送的字节数,包括请求头的数据,并且不能为零。要使用这个指令你必须启用mod_logio模块。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。