如何从零编写一个完整的 Logstash 配置文件?
当你在凌晨两点被叫醒,因为日志管道断了——那一刻你会意识到,理解 Logstash 配置文件的每一行到底在做什么,不是锦上添花,而是生存技能。
Logstash 配置文件的三段式骨架
Logstash 的配置文件本质上只做三件事:从哪读数据、怎么处理数据、往哪写数据。对应的就是 input、filter、output 三个区块,数据像流水线一样依次穿过它们。
confinput { # 数据从哪里来 } filter { # 数据怎么加工(可选) } output { # 数据到哪里去 }
其中 input 和 output 是必需的,filter 可以省略。三个区块的声明顺序固定为 input-filter-output,但 Logstash 并不强制——只是惯例如此,调换位置也能运行,只是阅读和维护时会非常混乱。
input:数据的入口
input 插件决定数据源的类型和接入方式。以下是生产环境最常用的四种。
file 插件——读取本地日志
confinput { file { path => "/var/log/nginx/access.log" start_position => "beginning" sincedb_path => "/var/lib/logstash/sincedb" tags => ["nginx"] } }
start_position 控制首次读取是从文件头还是文件尾开始;sincedb_path 记录已读取的文件偏移量,避免重启后重复消费。如果想在测试时每次都从头读,把 sincedb_path 设为 /dev/null。
beats 插件——接收 Elastic Agent 数据
confinput { beats { port => 5044 ssl => true ssl_certificate => "/etc/logstash/certs/logstash.crt" ssl_key => "/etc/logstash/certs/logstash.key" } }
这是 Elastic Stack 生态中最主流的采集方式。Filebeat、Metricbeat 等轻量采集器将数据推送到这个端口,Logstash 作为中继做进一步加工。
kafka 插件——消费消息队列
confinput { kafka { bootstrap_servers => "kafka1:9092,kafka2:9092" topics => ["app-logs", "access-logs"] group_id => "logstash-consumer" consumer_threads => 4 decorate_events => true } }
从 Kafka 消费数据适合高吞吐、解耦的场景。decorate_events 为 true 时会在事件中添加 Kafka 元数据(topic、partition、offset),便于后续追溯。
http 插件——接收 HTTP 推送
confinput { http { port => 8080 codec => json additional_codecs => { "application/json" => "json" } } }
适用于应用主动推送 JSON 日志的场景,比如 Webhook 回调或自定义 SDK 上报。
filter:数据的加工车间
filter 是 Logstash 最有价值的部分,负责把非结构化的原始数据变成可查询的结构化字段。
grok 插件——正则解析日志
conffilter { grok { match => { "message" => "%{IP:client_ip} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:status:int} %{NUMBER:duration:float}ms" } overwrite => ["message"] } }
grok 基于正则表达式,但提供了大量预定义模式(如 %{IP}、%{TIMESTAMP_ISO8601}、%{GREEDYDATA}),避免从零写正则。解析失败时会自动添加 _grokparsefailure 标签,可以据此做告警。
常用内置模式速查:
| 模式 | 匹配内容 | 示例 |
|---|---|---|
%{IP} | IPv4/IPv6 地址 | 192.168.1.1 |
%{HOSTNAME} | 主机名 | web-server-01 |
%{TIMESTAMP_ISO8601} | ISO 时间戳 | 2024-01-15T10:30:00 |
%{GREEDYDATA} | 贪婪匹配剩余内容 | 任意字符串 |
%{QUOTEDSTRING} | 带引号字符串 | "hello world" |
mutate 插件——字段变换
conffilter { mutate { rename => { "resp_code" => "http_status" } remove_field => ["headers", "cookies"] lowercase => ["request_path"] strip => ["user_input"] copy => { "source_ip" => "client_ip" } } }
mutate 做的是"脏活":重命名字段让语义更清晰、删掉无用字段减少存储、大小写转换统一格式。这些操作琐碎但直接影响后续查询体验。
date 插件——时间戳解析
conffilter { date { match => ["log_time", "dd/MMM/yyyy:HH:mm:ss Z"] target => "@timestamp" timezone => "Asia/Shanghai" } }
date 插件将日志中的时间字符串解析为 Logstash 事件的标准 @timestamp 字段。这一步至关重要——如果没有正确解析时间,Elasticsearch 中的时序查询和索引路由都会出错。注意 match 的格式必须与日志中的实际格式完全对应,否则静默失败。
output:数据的出口
elasticsearch 插件——写入 ES
confoutput { elasticsearch { hosts => ["http://es-node1:9200", "http://es-node2:9200"] index => "app-logs-%{+YYYY.MM.dd}" user => "elastic" password => "${ES_PASSWORD}" ssl => true ssl_certificate_verification => false action => "create" } }
index 中的 %{+YYYY.MM.dd} 会根据事件的 @timestamp 动态生成按天分割的索引名。action => "create" 保证相同文档 ID 只写入一次,避免重复。密码等敏感信息用 ${ENV_VAR} 从环境变量读取,不要硬编码在配置文件里。
file 插件——落盘归档
confoutput { file { path => "/data/archive/%{type}-%{+YYYY-MM-dd}.log" codec => line { format => "%{message}" } } }
适合需要将处理后的数据持久化到文件系统的场景,如审计日志归档、合规数据留存。
stdout 插件——调试利器
confoutput { stdout { codec => rubydebug } }
rubydebug codec 会以结构化的可读格式输出事件的全部字段,是排查配置问题的第一工具。调试时可以用它替代正式 output,确认 filter 解析结果正确后再切换回去。
条件判断:让配置具备分支逻辑
实际场景中,不同来源的日志需要不同的处理路径。Logstash 支持在 filter 和 output 中使用 if/else if/else 条件判断。
conffilter { if [type] == "nginx" { grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } mutate { remove_field => ["message"] } } else if [type] == "app" { json { source => "message" } date { match => ["timestamp", "ISO8601"] } } else { drop { } } } output { if "_grokparsefailure" in [tags] { file { path => "/var/log/logstash/parse-failed-%{+YYYY-MM-dd}.log" } } elasticsearch { hosts => ["localhost:9200"] index => "%{type}-%{+YYYY.MM.dd}" } }
条件表达式支持的比较运算符:==、!=、<、>、<=、>=,以及正则匹配 =~ 和 !~。逻辑运算符为 and、or、not。可以用 [field] 引用事件字段,in 判断数组包含关系。
一个实用模式:将 grok 解析失败的事件单独输出到文件,既不丢数据,又不污染主索引。
多管道配置:隔离不同的数据流
当一条 Logstash 实例需要处理多种互不相干的数据流时,用多管道(pipelines)替代单管道内的条件分支会更清晰。
在 config/pipelines.yml 中定义:
yaml- pipeline.id: nginx-pipeline path.config: "/etc/logstash/conf.d/nginx.conf" pipeline.workers: 4 pipeline.batch.size: 250 - pipeline.id: app-pipeline path.config: "/etc/logstash/conf.d/app.conf" pipeline.workers: 2 pipeline.batch.size: 125
每个管道有独立的配置文件、worker 线程数和 batch 大小,互不干扰。如果某个管道的 filter 出了问题,不会拖垮其他管道。
管道之间还可以通过 pipeline input/output 插件通信:
conf# 管道 A 的 output output { pipeline { send_to => [enrichment] } } # 管道 B 的 input input { pipeline { address => enrichment } }
性能调优的关键参数
配置写对了只是第一步,跑得稳才是生产环境的要求。
pipeline.workers
默认值是 CPU 核心数。对于 CPU 密集型的 filter(尤其是 grok),不要盲目调大——worker 过多会导致上下文切换开销增大。一般设为 CPU 核数或略低即可。
pipeline.batch.size
每次批量处理的事件数,默认 125。调大可以提高吞吐量,但会增加内存占用和单次处理延迟。对于 grok 较重的场景,建议从 125 开始逐步调到 250-500 观察效果。
pipeline.batch.delay
批次等待时间,默认 50ms。降低这个值可以减少延迟,但可能让批次更小、吞吐下降。对延迟敏感的场景可以调到 10-20ms。
queue.type
默认是内存队列(memory),重启丢数据。生产环境建议用持久化队列(persisted):
confqueue.type: persisted path.queue: /data/logstash/queue queue.max_bytes: 4gb
持久化队列将事件写入磁盘,Logstash 异常退出后可以从断点恢复,代价是吞吐量下降约 10-20%。
grok 的性能陷阱
grok 是 Logstash 中最耗 CPU 的插件。两个优化方向:
一、将多个 grok match 拆成按条件分支执行,避免每条事件都跑完所有正则:
conffilter { if [type] == "syslog" { grok { match => { "message" => "%{SYSLOGLINE}" } } } else if [type] == "apache" { grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } } }
二、用 patterns_dir 加载自定义模式,将复杂正则拆分成命名片段,既提升可读性也便于缓存复用。
一个完整的配置示例
以下是一个涵盖了上述所有要点的生产级配置:
confinput { beats { port => 5044 type => "beats" } kafka { bootstrap_servers => "kafka1:9092,kafka2:9092" topics => ["app-logs"] group_id => "logstash-consumer" consumer_threads => 3 decorate_events => true type => "kafka-app" } } filter { if [type] == "beats" and "nginx" in [tags] { grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } date { match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"] target => "@timestamp" } mutate { remove_field => ["message", "prospector"] } } else if [type] == "kafka-app" { json { source => "message" } date { match => ["timestamp", "ISO8601"] target => "@timestamp" } mutate { rename => { "lvl" => "log_level" } lowercase => ["log_level"] } } if "_grokparsefailure" in [tags] or "_jsonparsefailure" in [tags] { mutate { add_field => { "parse_error" => "true" } } } } output { if [parse_error] == "true" { file { path => "/var/log/logstash/failed-%{+YYYY-MM-dd}.log" codec => line { format => "%{message}" } } } else { elasticsearch { hosts => ["http://es-node1:9200", "http://es-node2:9200"] index => "%{type}-%{+YYYY.MM.dd}" user => "elastic" password => "${ES_PASSWORD}" } } stdout { codec => rubydebug } }
理解 Logstash 配置的关键不是记住多少插件参数,而是建立起 input-filter-output 的思维模型:数据从哪来、到哪去、中间怎么变。在这个框架下,每个插件只是填空题。遇到问题时,按这个顺序排查:数据进来了吗(查 input 日志)?字段解析对了吗(用 stdout + rubydebug 看)?写进目标了吗(查 output 日志和目标系统)?三步定位,比盲目改配置高效得多。