Logstash 支持条件判断语句,可以根据字段值、标签或其他条件来控制数据流。这使得我们能够对不同的数据应用不同的处理逻辑。
条件判断语法
Logstash 支持以下条件操作符:
比较操作符
==:等于!=:不等于<:小于>:大于<=:小于等于>=:大于等于
逻辑操作符
and:逻辑与or:逻辑或nand:逻辑与非xor:逻辑异或not:逻辑非
正则表达式
=~:匹配正则表达式!~:不匹配正则表达式
包含操作
in:包含在数组中not in:不包含在数组中
基本条件判断
if 语句
conffilter { if [type] == "apache" { grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } } }
if-else 语句
conffilter { if [type] == "apache" { grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } } else if [type] == "nginx" { grok { match => { "message" => "%{NGINXACCESS}" } } } else { grok { match => { "message" => "%{COMMONAPACHELOG}" } } } }
字段值判断
字符串比较
conffilter { if [log_level] == "ERROR" { mutate { add_tag => ["error_log"] } } }
数值比较
conffilter { if [response] >= 400 { mutate { add_tag => ["http_error"] } } }
正则表达式匹配
conffilter { if [message] =~ /exception/i { mutate { add_tag => ["exception"] } } }
标签判断
检查标签是否存在
conffilter { if "error" in [tags] { # 处理错误日志 } }
添加标签
conffilter { if [status] >= 500 { mutate { add_tag => ["server_error"] } } else if [status] >= 400 { mutate { add_tag => ["client_error"] } } }
复杂条件判断
多条件组合
conffilter { if [type] == "apache" and [response] >= 400 { mutate { add_tag => ["apache_error"] } } }
使用括号分组
conffilter { if ([type] == "apache" or [type] == "nginx") and [response] >= 400 { mutate { add_tag => ["web_error"] } } }
嵌套条件
conffilter { if [type] == "apache" { if [response] >= 500 { mutate { add_tag => ["apache_server_error"] } } else if [response] >= 400 { mutate { add_tag => ["apache_client_error"] } } } }
在 Input 中使用条件判断
confinput { file { path => "/var/log/*.log" type => "system" } if [type] == "system" { file { path => "/var/log/syslog" type => "syslog" } } }
在 Filter 中使用条件判断
根据字段值应用不同的过滤器
conffilter { if [type] == "json" { json { source => "message" } } else { grok { match => { "message" => "%{COMMONAPACHELOG}" } } } }
处理解析失败
conffilter { grok { match => { "message" => "%{PATTERN:field}" } tag_on_failure => ["_grokparsefailure"] } if "_grokparsefailure" in [tags] { # 处理解析失败的情况 mutate { add_field => { "parse_error" => "true" } } } }
在 Output 中使用条件判断
根据日志级别路由到不同的索引
confoutput { if [log_level] == "ERROR" { elasticsearch { hosts => ["http://localhost:9200"] index => "error-logs-%{+YYYY.MM.dd}" } } else { elasticsearch { hosts => ["http://localhost:9200"] index => "access-logs-%{+YYYY.MM.dd}" } } }
多输出条件路由
confoutput { # 错误日志发送到专门的索引 if [level] == "ERROR" { elasticsearch { hosts => ["http://localhost:9200"] index => "errors-%{+YYYY.MM.dd}" } } # 访问日志发送到 Kafka if [type] == "access" { kafka { bootstrap_servers => "localhost:9092" topic_id => "access-logs" } } # 所有日志都输出到文件备份 file { path => "/backup/all-logs.log" } }
字段存在性检查
检查字段是否存在
conffilter { if [user_id] { # user_id 字段存在 mutate { add_tag => ["has_user_id"] } } }
检查字段是否为空
conffilter { if [user_id] and [user_id] != "" { # user_id 字段存在且不为空 mutate { add_tag => ["valid_user_id"] } } }
数组操作
检查数组是否包含元素
conffilter { if "error" in [tags] { # tags 数组包含 "error" } }
检查数组长度
conffilter { if [tags] and [tags].length > 0 { # tags 数组不为空 } }
最佳实践
- 使用条件判断提高性能:避免对不必要的数据进行处理
- 合理使用标签:使用标签标记不同类型的数据
- 处理异常情况:使用条件判断处理解析失败等异常
- 代码可读性:使用括号和缩进提高配置文件的可读性
- 测试验证:使用测试数据验证条件判断逻辑
实际应用示例
完整的日志处理流程
confinput { file { path => "/var/log/app/*.log" start_position => "beginning" } } filter { # 根据日志类型应用不同的解析逻辑 if [message] =~ /^\{/ { # JSON 格式日志 json { source => "message" } } else { # 文本格式日志 grok { match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{LOGLEVEL:level}\] %{GREEDYDATA:msg}" } } } # 根据日志级别添加标签 if [level] == "ERROR" or [level] == "FATAL" { mutate { add_tag => ["error", "alert"] } } # 解析时间戳 if [timestamp] { date { match => ["timestamp", "ISO8601"] } } } output { # 错误日志发送到专门的索引 if "alert" in [tags] { elasticsearch { hosts => ["http://localhost:9200"] index => "alerts-%{+YYYY.MM.dd}" } } else { elasticsearch { hosts => ["http://localhost:9200"] index => "logs-%{+YYYY.MM.dd}" } } }