Logstash supports conditional statements that allow controlling data flow based on field values, tags, or other conditions. This enables us to apply different processing logic to different data.
Conditional Statement Syntax
Logstash supports the following conditional operators:
Comparison Operators
==: Equal to!=: Not equal to<: Less than>: Greater than<=: Less than or equal to>=: Greater than or equal to
Logical Operators
and: Logical ANDor: Logical ORnand: Logical NANDxor: Logical XORnot: Logical NOT
Regular Expressions
=~: Matches regular expression!~: Does not match regular expression
Inclusion Operators
in: Contained in arraynot in: Not contained in array
Basic Conditional Statements
if Statement
conffilter { if [type] == "apache" { grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } } }
if-else Statement
conffilter { if [type] == "apache" { grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } } else if [type] == "nginx" { grok { match => { "message" => "%{NGINXACCESS}" } } } else { grok { match => { "message" => "%{COMMONAPACHELOG}" } } } }
Field Value Comparison
String Comparison
conffilter { if [log_level] == "ERROR" { mutate { add_tag => ["error_log"] } } }
Numeric Comparison
conffilter { if [response] >= 400 { mutate { add_tag => ["http_error"] } } }
Regular Expression Matching
conffilter { if [message] =~ /exception/i { mutate { add_tag => ["exception"] } } }
Tag Checking
Check if Tag Exists
conffilter { if "error" in [tags] { # Process error logs } }
Add Tags
conffilter { if [status] >= 500 { mutate { add_tag => ["server_error"] } } else if [status] >= 400 { mutate { add_tag => ["client_error"] } } }
Complex Conditional Statements
Multiple Conditions
conffilter { if [type] == "apache" and [response] >= 400 { mutate { add_tag => ["apache_error"] } } }
Using Parentheses for Grouping
conffilter { if ([type] == "apache" or [type] == "nginx") and [response] >= 400 { mutate { add_tag => ["web_error"] } } }
Nested Conditions
conffilter { if [type] == "apache" { if [response] >= 500 { mutate { add_tag => ["apache_server_error"] } } else if [response] >= 400 { mutate { add_tag => ["apache_client_error"] } } } }
Using Conditional Statements in Input
confinput { file { path => "/var/log/*.log" type => "system" } if [type] == "system" { file { path => "/var/log/syslog" type => "syslog" } } }
Using Conditional Statements in Filter
Apply Different Filters Based on Field Values
conffilter { if [type] == "json" { json { source => "message" } } else { grok { match => { "message" => "%{COMMONAPACHELOG}" } } } }
Handle Parse Failures
conffilter { grok { match => { "message" => "%{PATTERN:field}" } tag_on_failure => ["_grokparsefailure"] } if "_grokparsefailure" in [tags] { # Handle parse failure mutate { add_field => { "parse_error" => "true" } } } }
Using Conditional Statements in Output
Route to Different Indexes Based on Log Level
confoutput { if [log_level] == "ERROR" { elasticsearch { hosts => ["http://localhost:9200"] index => "error-logs-%{+YYYY.MM.dd}" } } else { elasticsearch { hosts => ["http://localhost:9200"] index => "access-logs-%{+YYYY.MM.dd}" } } }
Multi-output Conditional Routing
confoutput { # Error logs sent to dedicated index if [level] == "ERROR" { elasticsearch { hosts => ["http://localhost:9200"] index => "errors-%{+YYYY.MM.dd}" } } # Access logs sent to Kafka if [type] == "access" { kafka { bootstrap_servers => "localhost:9092" topic_id => "access-logs" } } # All logs output to file backup file { path => "/backup/all-logs.log" } }
Field Existence Checking
Check if Field Exists
conffilter { if [user_id] { # user_id field exists mutate { add_tag => ["has_user_id"] } } }
Check if Field is Empty
conffilter { if [user_id] and [user_id] != "" { # user_id field exists and is not empty mutate { add_tag => ["valid_user_id"] } } }
Array Operations
Check if Array Contains Element
conffilter { if "error" in [tags] { # tags array contains "error" } }
Check Array Length
conffilter { if [tags] and [tags].length > 0 { # tags array is not empty } }
Best Practices
- Use Conditional Statements to Improve Performance: Avoid processing unnecessary data
- Reasonable Use of Tags: Use tags to mark different types of data
- Handle Exception Cases: Use conditional statements to handle parsing failures and other exceptions
- Code Readability: Use parentheses and indentation to improve configuration file readability
- Testing and Verification: Use test data to verify conditional statement logic
Real-world Application Example
Complete Log Processing Flow
confinput { file { path => "/var/log/app/*.log" start_position => "beginning" } } filter { # Apply different parsing logic based on log format if [message] =~ /^\{/ { # JSON format log json { source => "message" } } else { # Text format log grok { match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{LOGLEVEL:level}\] %{GREEDYDATA:msg}" } } } # Add tags based on log level if [level] == "ERROR" or [level] == "FATAL" { mutate { add_tag => ["error", "alert"] } } # Parse timestamp if [timestamp] { date { match => ["timestamp", "ISO8601"] } } } output { # Error logs sent to dedicated index if "alert" in [tags] { elasticsearch { hosts => ["http://localhost:9200"] index => "alerts-%{+YYYY.MM.dd}" } } else { elasticsearch { hosts => ["http://localhost:9200"] index => "logs-%{+YYYY.MM.dd}" } } }