乐闻世界logo
搜索文章和话题

How do you use conditional statements in Logstash, and what are the common conditional operators?

2月21日 15:52

Logstash supports conditional statements that allow controlling data flow based on field values, tags, or other conditions. This enables us to apply different processing logic to different data.

Conditional Statement Syntax

Logstash supports the following conditional operators:

Comparison Operators

  • ==: Equal to
  • !=: Not equal to
  • <: Less than
  • >: Greater than
  • <=: Less than or equal to
  • >=: Greater than or equal to

Logical Operators

  • and: Logical AND
  • or: Logical OR
  • nand: Logical NAND
  • xor: Logical XOR
  • not: Logical NOT

Regular Expressions

  • =~: Matches regular expression
  • !~: Does not match regular expression

Inclusion Operators

  • in: Contained in array
  • not in: Not contained in array

Basic Conditional Statements

if Statement

conf
filter { if [type] == "apache" { grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } } }

if-else Statement

conf
filter { if [type] == "apache" { grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } } else if [type] == "nginx" { grok { match => { "message" => "%{NGINXACCESS}" } } } else { grok { match => { "message" => "%{COMMONAPACHELOG}" } } } }

Field Value Comparison

String Comparison

conf
filter { if [log_level] == "ERROR" { mutate { add_tag => ["error_log"] } } }

Numeric Comparison

conf
filter { if [response] >= 400 { mutate { add_tag => ["http_error"] } } }

Regular Expression Matching

conf
filter { if [message] =~ /exception/i { mutate { add_tag => ["exception"] } } }

Tag Checking

Check if Tag Exists

conf
filter { if "error" in [tags] { # Process error logs } }

Add Tags

conf
filter { if [status] >= 500 { mutate { add_tag => ["server_error"] } } else if [status] >= 400 { mutate { add_tag => ["client_error"] } } }

Complex Conditional Statements

Multiple Conditions

conf
filter { if [type] == "apache" and [response] >= 400 { mutate { add_tag => ["apache_error"] } } }

Using Parentheses for Grouping

conf
filter { if ([type] == "apache" or [type] == "nginx") and [response] >= 400 { mutate { add_tag => ["web_error"] } } }

Nested Conditions

conf
filter { if [type] == "apache" { if [response] >= 500 { mutate { add_tag => ["apache_server_error"] } } else if [response] >= 400 { mutate { add_tag => ["apache_client_error"] } } } }

Using Conditional Statements in Input

conf
input { file { path => "/var/log/*.log" type => "system" } if [type] == "system" { file { path => "/var/log/syslog" type => "syslog" } } }

Using Conditional Statements in Filter

Apply Different Filters Based on Field Values

conf
filter { if [type] == "json" { json { source => "message" } } else { grok { match => { "message" => "%{COMMONAPACHELOG}" } } } }

Handle Parse Failures

conf
filter { grok { match => { "message" => "%{PATTERN:field}" } tag_on_failure => ["_grokparsefailure"] } if "_grokparsefailure" in [tags] { # Handle parse failure mutate { add_field => { "parse_error" => "true" } } } }

Using Conditional Statements in Output

Route to Different Indexes Based on Log Level

conf
output { if [log_level] == "ERROR" { elasticsearch { hosts => ["http://localhost:9200"] index => "error-logs-%{+YYYY.MM.dd}" } } else { elasticsearch { hosts => ["http://localhost:9200"] index => "access-logs-%{+YYYY.MM.dd}" } } }

Multi-output Conditional Routing

conf
output { # Error logs sent to dedicated index if [level] == "ERROR" { elasticsearch { hosts => ["http://localhost:9200"] index => "errors-%{+YYYY.MM.dd}" } } # Access logs sent to Kafka if [type] == "access" { kafka { bootstrap_servers => "localhost:9092" topic_id => "access-logs" } } # All logs output to file backup file { path => "/backup/all-logs.log" } }

Field Existence Checking

Check if Field Exists

conf
filter { if [user_id] { # user_id field exists mutate { add_tag => ["has_user_id"] } } }

Check if Field is Empty

conf
filter { if [user_id] and [user_id] != "" { # user_id field exists and is not empty mutate { add_tag => ["valid_user_id"] } } }

Array Operations

Check if Array Contains Element

conf
filter { if "error" in [tags] { # tags array contains "error" } }

Check Array Length

conf
filter { if [tags] and [tags].length > 0 { # tags array is not empty } }

Best Practices

  1. Use Conditional Statements to Improve Performance: Avoid processing unnecessary data
  2. Reasonable Use of Tags: Use tags to mark different types of data
  3. Handle Exception Cases: Use conditional statements to handle parsing failures and other exceptions
  4. Code Readability: Use parentheses and indentation to improve configuration file readability
  5. Testing and Verification: Use test data to verify conditional statement logic

Real-world Application Example

Complete Log Processing Flow

conf
input { file { path => "/var/log/app/*.log" start_position => "beginning" } } filter { # Apply different parsing logic based on log format if [message] =~ /^\{/ { # JSON format log json { source => "message" } } else { # Text format log grok { match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{LOGLEVEL:level}\] %{GREEDYDATA:msg}" } } } # Add tags based on log level if [level] == "ERROR" or [level] == "FATAL" { mutate { add_tag => ["error", "alert"] } } # Parse timestamp if [timestamp] { date { match => ["timestamp", "ISO8601"] } } } output { # Error logs sent to dedicated index if "alert" in [tags] { elasticsearch { hosts => ["http://localhost:9200"] index => "alerts-%{+YYYY.MM.dd}" } } else { elasticsearch { hosts => ["http://localhost:9200"] index => "logs-%{+YYYY.MM.dd}" } } }
标签:Logstash