Logstash 配置说明

  大家都知道我们ueba的服务端使用了ELK,这里就说一下其中的一个组件L,也就是Logstash的配置说明。
  在说配置之前,需要大致的了解一下Logstash是什么,是做什么的。
  Logstash 是开源的服务器端数据处理管道,能够同时从多个来源收集数据、过滤转换数据,然后将数据发送到存储库中(我们的存储库当然是 Elasticsearch),所以Logstash并不会存储任何日志数据。我们的客户端在采集到行为数据日志后,会首先发送给Logstash,Logstash进行数据的接收,并根据配置对数据进行过滤、转换的工作,处理完之后再发送给Elasticsearch进行最终的存储。
  说到Logstash的数据处理过滤,处理过滤的条件在哪设置,怎么配置,是本次要说明的内容。

首先我们先了解 logstash 配置文件的一些知识。

1.Logstash 过滤规则都在配置文件中进行设置,配置文件地址和文件名如下:

 .../isa/conf/logstash/logstash.conf

Linux 环境中使用 vi 工具进行编辑,如果 vi 不会用的,,请百度。

2.Logstash 配置文件的三个主要组成部分

input
顾名思义,输入,这里可以配置接收日志的协议和端口,如我们客户端发送过来就是通过 udp 协议的 6004 端口,这里你可以新增和修改接收日志的协议和端口,如 tcp、http 等常用协议。

input {
        udp {
                port => 1234
                codec => "json"
        }
        udp {
                port => 6004
                codec => "json"
        }
}

filter
过滤,logstash 处理的规则主要就是在该模块中进行编辑

filter {
        translate {
                field => "action"
                dictionary => [ "w", "窗口",
                    "mr", "鼠标",
                    "ml", "鼠标",
                    "mc", "鼠标",
                    "inputbox", "输入框",
                                        "cmd", "命令",
                                        "net", "网络",
                                        "web", "网页",
                                        "fcreate", "新建",
                                        "fopen", "打开",
                                        "fdel", "删除",
                                        "fmodify", "修改",
                                        "frename", "重命名",
                                        "fcopy", "文件复制",
                                        "print", "打印",
                                        "fcloud", "云盘",
                                        "ftp", "ftp",
                                        "mail", "邮件",
                                        "im", "通讯",
                                        "shell", "shell命令",
                                        "fmount", "移动介质",
                                        "bevent", "页面事件",
                                        "cp", "COPY",
                                        "robot", "RAP流程",
                                        "screenshot", "截屏"]
                fallback => "%{action}"
                destination => "action_s"
        }
}

output
输出,配置将处理后的数据输出到什么地方,可以是 el 的索引文件,也可以是本地文件,也可以是 redis,也可以是邮件等

output {
        elasticsearch {
                hosts => ["127.0.0.1:6200"]
                index => "ueba-log-index"
                document_type => "ueba-log"
        }
}

下面来介绍一些常用的配置介绍:


校验配置文件有效性

如果 logstash 配置文件中语法不对,是无法重启成果 logstash 服务的,所以可以先用该命令尝试是否没有语法错误

.../logstash/bin/logstash -t -f .../isa/conf/logstash/logstash.conf

通过匹配到字段的内容,新建一个字段并赋值(filter 中配置)

if [content] =~ "cmd" {
	mutate { add_field => { "cmd_lable" => "运维"  } }
}else if [content] =~ "sss" {
	mutate { add_field => { "cmd_lable" => "运维sss"  } }
}else if [content] =~ "xxx" {
	mutate { add_field => { "cmd_lablexxx" => "运维xxx"  } }
}

content 字段中包含 cmd,日志中增加字段:cmd_lable,显示内容:运维
content 字段中包含 sss,日志中增加字段:cmd_lable,显示内容:运维 sss
content 字段中包含 xxx,日志中增加字段:cmd_lablexxx,显示内容:运维 xxx
若为数值类型字段时,则 [content] == 119 即可,不需要加”"


通过匹配到字段的内容,新建一个字段并赋值(filter 中配置)

以下这个案例中,完成的内容是:我们在 tag_category 为 T_START 的日志中会采集到很多的字段信息,想把这些信息经过缓存、处理、计算,赋值给其它的日志。
比如开始标签中会采集到座席姓名、手机号码等信息,但是结束标签是采集不到的,通过缓存把开始标签中的座席姓名、手机号码等字段,赋值给结束标签。
再比如用来计算开始标签和结束标签之间的时长。

filter {
     #当检测到tag_category字段的值为T_START的日志,则进入该aggregate
     if "T_START" == [tag_category] {
          aggregate {
          #这里可设定一个唯一值的字段,用来标识
          task_id => "%{session_no}"
          code =>  "
               #缓存中生成t_seat_uname(座席姓名),使用日志中已有t_seat_uname的值赋予
               map['t_seat_uname'] = event.get('t_seat_uname’)
			   
               #缓存中生成t_phone_num(电话号码),使用日志中已有t_phone_num的值赋予
               map['t_phone_num'] = event.get('t_phone_num’)
			   
               #缓存中生成t_tags_num(业务标签数),初始为0
               map['t_tags_num'] = 0
			   
               #缓存中生成t_tags(业务标签数组),初始为数组
               map['t_tags'] = Array.new
                "
         map_action => "create"
         }
     }
     #当检测到tag_category字段的值不为T_STOP同时也不为T_START的日志,则进入该aggregate
     if "T_STOP" != [tag_category] and "T_START" != [tag_category] {
          aggregate {
          task_id => "%{session_no}"
          code => "
               #当action为tag(业务标签)时,缓存中的t_tags_num字段值+1
               map['t_tags_num'] +=1 if event.get('action') == 'tag’
			   
               #当action为tag(业务标签)时,将tag_name加入到t_tags的数组中
               map['t_tags'].push(event.get('tag_name')) if event.get('action') == ‘tag'
			   
               #日志生成新字段t_seat_uname
               event.set('t_seat_uname' , map['t_seat_uname'])
			   
               #日志生成新字段t_phone_num
               event.set('t_phone_num' , map['t_phone_num'])
               "
         map_action => "update"
         }
    }
     #当检测到tag_category字段的值为T_STOP的日志,则进入该aggregate
     if "T_STOP" == [tag_category] {
          aggregate {
          task_id => "%{session_no}"
          code => "
               #缓存t_duration获取总时长(秒),当前offsettime偏移时间-会话开始时的t_first_offsettime偏移时间
               map['t_duration'] = event.get('offsettime')-map['t_first_offsettime']
			   
               #日志生成新字段t_seat_uname
               event.set('t_seat_uname' , map['t_seat_uname'])
			   
               #t_duration转化为秒
               event.set('t_duration' , (map['t_duration']/1000).round(2))
			   
               #日志生成新字段t_tags_num
               event.set('t_tags_num' , map['t_tags_num'])
			   
               #日志生成新字段t_tags
               event.set('t_tags' , map['t_tags'])
               "
         map_action => "update"
         end_of_task => true
        }
    }

输出日志到本地时间命名的文件中(output 中配置)`

output {
        elasticsearch {
                hosts => ["127.0.0.1"]
                index => "ueba-log-index"
                document_type => "ueba-log"
        }
        file{
                path => "/test/isa/conf/logstash-file-%{+YYYY.MM.DD}"
        }

}

丢弃相关类型日志(output 中配置)

有些用户不需要的日志可以丢弃掉,减少存储及并发处理的压力

output {
        if [action] != 'k' and [action] != 'web' and [action] != 'ml'
        {
        elasticsearch {
                hosts => ["127.0.0.1"]
                index => "ueba-log-index"
                document_type => "ueba-log"
        }
        }

}