Logstash 中如何使用条件判断语句,有哪些常见的条件操作符?
Logstash 支持条件判断语句,可以根据字段值、标签或其他条件来控制数据流。这使得我们能够对不同的数据应用不同的处理逻辑。条件判断语法Logstash 支持以下条件操作符:比较操作符==:等于!=:不等于<:小于>:大于<=:小于等于>=:大于等于逻辑操作符and:逻辑与or:逻辑或nand:逻辑与非xor:逻辑异或not:逻辑非正则表达式=~:匹配正则表达式!~:不匹配正则表达式包含操作in:包含在数组中not in:不包含在数组中基本条件判断if 语句filter { if [type] == "apache" { grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } }}if-else 语句filter { if [type] == "apache" { grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } } else if [type] == "nginx" { grok { match => { "message" => "%{NGINXACCESS}" } } } else { grok { match => { "message" => "%{COMMONAPACHELOG}" } } }}字段值判断字符串比较filter { if [log_level] == "ERROR" { mutate { add_tag => ["error_log"] } }}数值比较filter { if [response] >= 400 { mutate { add_tag => ["http_error"] } }}正则表达式匹配filter { if [message] =~ /exception/i { mutate { add_tag => ["exception"] } }}标签判断检查标签是否存在filter { if "error" in [tags] { # 处理错误日志 }}添加标签filter { if [status] >= 500 { mutate { add_tag => ["server_error"] } } else if [status] >= 400 { mutate { add_tag => ["client_error"] } }}复杂条件判断多条件组合filter { if [type] == "apache" and [response] >= 400 { mutate { add_tag => ["apache_error"] } }}使用括号分组filter { if ([type] == "apache" or [type] == "nginx") and [response] >= 400 { mutate { add_tag => ["web_error"] } }}嵌套条件filter { if [type] == "apache" { if [response] >= 500 { mutate { add_tag => ["apache_server_error"] } } else if [response] >= 400 { mutate { add_tag => ["apache_client_error"] } } }}在 Input 中使用条件判断input { file { path => "/var/log/*.log" type => "system" } if [type] == "system" { file { path => "/var/log/syslog" type => "syslog" } }}在 Filter 中使用条件判断根据字段值应用不同的过滤器filter { if [type] == "json" { json { source => "message" } } else { grok { match => { "message" => "%{COMMONAPACHELOG}" } } }}处理解析失败filter { grok { match => { "message" => "%{PATTERN:field}" } tag_on_failure => ["_grokparsefailure"] } if "_grokparsefailure" in [tags] { # 处理解析失败的情况 mutate { add_field => { "parse_error" => "true" } } }}在 Output 中使用条件判断根据日志级别路由到不同的索引output { if [log_level] == "ERROR" { elasticsearch { hosts => ["http://localhost:9200"] index => "error-logs-%{+YYYY.MM.dd}" } } else { elasticsearch { hosts => ["http://localhost:9200"] index => "access-logs-%{+YYYY.MM.dd}" } }}多输出条件路由output { # 错误日志发送到专门的索引 if [level] == "ERROR" { elasticsearch { hosts => ["http://localhost:9200"] index => "errors-%{+YYYY.MM.dd}" } } # 访问日志发送到 Kafka if [type] == "access" { kafka { bootstrap_servers => "localhost:9092" topic_id => "access-logs" } } # 所有日志都输出到文件备份 file { path => "/backup/all-logs.log" }}字段存在性检查检查字段是否存在filter { if [user_id] { # user_id 字段存在 mutate { add_tag => ["has_user_id"] } }}检查字段是否为空filter { if [user_id] and [user_id] != "" { # user_id 字段存在且不为空 mutate { add_tag => ["valid_user_id"] } }}数组操作检查数组是否包含元素filter { if "error" in [tags] { # tags 数组包含 "error" }}检查数组长度filter { if [tags] and [tags].length > 0 { # tags 数组不为空 }}最佳实践使用条件判断提高性能:避免对不必要的数据进行处理合理使用标签:使用标签标记不同类型的数据处理异常情况:使用条件判断处理解析失败等异常代码可读性:使用括号和缩进提高配置文件的可读性测试验证:使用测试数据验证条件判断逻辑实际应用示例完整的日志处理流程input { file { path => "/var/log/app/*.log" start_position => "beginning" }}filter { # 根据日志类型应用不同的解析逻辑 if [message] =~ /^\{/ { # JSON 格式日志 json { source => "message" } } else { # 文本格式日志 grok { match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{LOGLEVEL:level}\] %{GREEDYDATA:msg}" } } } # 根据日志级别添加标签 if [level] == "ERROR" or [level] == "FATAL" { mutate { add_tag => ["error", "alert"] } } # 解析时间戳 if [timestamp] { date { match => ["timestamp", "ISO8601"] } }}output { # 错误日志发送到专门的索引 if "alert" in [tags] { elasticsearch { hosts => ["http://localhost:9200"] index => "alerts-%{+YYYY.MM.dd}" } } else { elasticsearch { hosts => ["http://localhost:9200"] index => "logs-%{+YYYY.MM.dd}" } }}