Logstash 配置说明
大家都知道我们ueba的服务端使用了ELK,这里就说一下其中的一个组件L,也就是Logstash的配置说明。
在说配置之前,需要大致的了解一下Logstash是什么,是做什么的。
Logstash 是开源的服务器端数据处理管道,能够同时从多个来源收集数据、过滤转换数据,然后将数据发送到存储库中(我们的存储库当然是 Elasticsearch),所以Logstash并不会存储任何日志数据。我们的客户端在采集到行为数据日志后,会首先发送给Logstash,Logstash进行数据的接收,并根据配置对数据进行过滤、转换的工作,处理完之后再发送给Elasticsearch进行最终的存储。
说到Logstash的数据处理过滤,处理过滤的条件在哪设置,怎么配置,是本次要说明的内容。
首先我们先了解 logstash 配置文件的一些知识。
1.Logstash 过滤规则都在配置文件中进行设置,配置文件地址和文件名如下:
.../isa/conf/logstash/logstash.conf
Linux 环境中使用 vi 工具进行编辑,如果 vi 不会用的,,请百度。
2.Logstash 配置文件的三个主要组成部分
input
:
顾名思义,输入,这里可以配置接收日志的协议和端口,如我们客户端发送过来就是通过 udp 协议的 6004 端口,这里你可以新增和修改接收日志的协议和端口,如 tcp、http 等常用协议。
input {
udp {
port => 1234
codec => "json"
}
udp {
port => 6004
codec => "json"
}
}
filter
:
过滤,logstash 处理的规则主要就是在该模块中进行编辑
filter {
translate {
field => "action"
dictionary => [ "w", "窗口",
"mr", "鼠标",
"ml", "鼠标",
"mc", "鼠标",
"inputbox", "输入框",
"cmd", "命令",
"net", "网络",
"web", "网页",
"fcreate", "新建",
"fopen", "打开",
"fdel", "删除",
"fmodify", "修改",
"frename", "重命名",
"fcopy", "文件复制",
"print", "打印",
"fcloud", "云盘",
"ftp", "ftp",
"mail", "邮件",
"im", "通讯",
"shell", "shell命令",
"fmount", "移动介质",
"bevent", "页面事件",
"cp", "COPY",
"robot", "RAP流程",
"screenshot", "截屏"]
fallback => "%{action}"
destination => "action_s"
}
}
output
:
输出,配置将处理后的数据输出到什么地方,可以是 el 的索引文件,也可以是本地文件,也可以是 redis,也可以是邮件等
output {
elasticsearch {
hosts => ["127.0.0.1:6200"]
index => "ueba-log-index"
document_type => "ueba-log"
}
}
下面来介绍一些常用的配置介绍:
校验配置文件有效性
如果 logstash 配置文件中语法不对,是无法重启成果 logstash 服务的,所以可以先用该命令尝试是否没有语法错误
.../logstash/bin/logstash -t -f .../isa/conf/logstash/logstash.conf
通过匹配到字段的内容,新建一个字段并赋值(filter 中配置)
if [content] =~ "cmd" {
mutate { add_field => { "cmd_lable" => "运维" } }
}else if [content] =~ "sss" {
mutate { add_field => { "cmd_lable" => "运维sss" } }
}else if [content] =~ "xxx" {
mutate { add_field => { "cmd_lablexxx" => "运维xxx" } }
}
content 字段中包含 cmd,日志中增加字段:cmd_lable,显示内容:运维
content 字段中包含 sss,日志中增加字段:cmd_lable,显示内容:运维 sss
content 字段中包含 xxx,日志中增加字段:cmd_lablexxx,显示内容:运维 xxx
若为数值类型字段时,则 [content] == 119 即可,不需要加”"
通过匹配到字段的内容,新建一个字段并赋值(filter 中配置)
以下这个案例中,完成的内容是:我们在 tag_category 为 T_START 的日志中会采集到很多的字段信息,想把这些信息经过缓存、处理、计算,赋值给其它的日志。
比如开始标签中会采集到座席姓名、手机号码等信息,但是结束标签是采集不到的,通过缓存把开始标签中的座席姓名、手机号码等字段,赋值给结束标签。
再比如用来计算开始标签和结束标签之间的时长。
filter {
#当检测到tag_category字段的值为T_START的日志,则进入该aggregate
if "T_START" == [tag_category] {
aggregate {
#这里可设定一个唯一值的字段,用来标识
task_id => "%{session_no}"
code => "
#缓存中生成t_seat_uname(座席姓名),使用日志中已有t_seat_uname的值赋予
map['t_seat_uname'] = event.get('t_seat_uname’)
#缓存中生成t_phone_num(电话号码),使用日志中已有t_phone_num的值赋予
map['t_phone_num'] = event.get('t_phone_num’)
#缓存中生成t_tags_num(业务标签数),初始为0
map['t_tags_num'] = 0
#缓存中生成t_tags(业务标签数组),初始为数组
map['t_tags'] = Array.new
"
map_action => "create"
}
}
#当检测到tag_category字段的值不为T_STOP同时也不为T_START的日志,则进入该aggregate
if "T_STOP" != [tag_category] and "T_START" != [tag_category] {
aggregate {
task_id => "%{session_no}"
code => "
#当action为tag(业务标签)时,缓存中的t_tags_num字段值+1
map['t_tags_num'] +=1 if event.get('action') == 'tag’
#当action为tag(业务标签)时,将tag_name加入到t_tags的数组中
map['t_tags'].push(event.get('tag_name')) if event.get('action') == ‘tag'
#日志生成新字段t_seat_uname
event.set('t_seat_uname' , map['t_seat_uname'])
#日志生成新字段t_phone_num
event.set('t_phone_num' , map['t_phone_num'])
"
map_action => "update"
}
}
#当检测到tag_category字段的值为T_STOP的日志,则进入该aggregate
if "T_STOP" == [tag_category] {
aggregate {
task_id => "%{session_no}"
code => "
#缓存t_duration获取总时长(秒),当前offsettime偏移时间-会话开始时的t_first_offsettime偏移时间
map['t_duration'] = event.get('offsettime')-map['t_first_offsettime']
#日志生成新字段t_seat_uname
event.set('t_seat_uname' , map['t_seat_uname'])
#t_duration转化为秒
event.set('t_duration' , (map['t_duration']/1000).round(2))
#日志生成新字段t_tags_num
event.set('t_tags_num' , map['t_tags_num'])
#日志生成新字段t_tags
event.set('t_tags' , map['t_tags'])
"
map_action => "update"
end_of_task => true
}
}
输出日志到本地时间命名的文件中(output 中配置)`
output {
elasticsearch {
hosts => ["127.0.0.1"]
index => "ueba-log-index"
document_type => "ueba-log"
}
file{
path => "/test/isa/conf/logstash-file-%{+YYYY.MM.DD}"
}
}
丢弃相关类型日志(output 中配置)
有些用户不需要的日志可以丢弃掉,减少存储及并发处理的压力
output {
if [action] != 'k' and [action] != 'web' and [action] != 'ml'
{
elasticsearch {
hosts => ["127.0.0.1"]
index => "ueba-log-index"
document_type => "ueba-log"
}
}
}
公司会配置 logstash 的人非常少,大家学一下,功能强大的超乎想象,无需研发,即可实现大量数据转换处理功能
logstash 配置语法是基于 ruby 的,ruby 也是类似 python 的一种新生代语言,大家再学习 logstash 配置的同时,也顺便学习下 ruby
现在世界语言排名如下: