Logstash 有哪些常用的输入插件,如何配置文件输入和 Kafka 输入?
Logstash 支持多种输入插件,可以从各种数据源收集数据。以下是常用的输入插件及其使用方法。1. File 输入插件File 插件用于从文件系统读取日志文件。基本配置input { file { path => "/var/log/*.log" start_position => "beginning" sincedb_path => "/dev/null" }}重要参数path:要读取的文件路径,支持通配符start_position:开始读取的位置(beginning 或 end)sincedb_path:记录读取位置的文件路径type:为事件添加类型标识tags:为事件添加标签高级配置input { file { path => ["/var/log/apache/*.log", "/var/log/nginx/*.log"] exclude => ["*.gz", "*.zip"] start_position => "beginning" sincedb_path => "/var/lib/logstash/sincedb" discover_interval => 15 stat_interval => 1 mode => "read" file_completed_action => "delete" file_completed_log_path => "/var/log/logstash/completed.log" }}2. Beats 输入插件Beats 插件用于接收来自 Beats(如 Filebeat、Metricbeat)的数据。基本配置input { beats { port => 5044 }}重要参数port:监听端口host:绑定地址ssl:启用 SSL/TLSclientinactivitytimeout:客户端不活动超时时间SSL 配置input { beats { port => 5044 ssl => true ssl_certificate => "/path/to/cert.pem" ssl_key => "/path/to/key.pem" ssl_certificate_authorities => ["/path/to/ca.pem"] ssl_verify_mode => "force_peer" }}3. Kafka 输入插件Kafka 插件用于从 Kafka 消息队列消费数据。基本配置input { kafka { bootstrap_servers => "localhost:9092" topics => ["logs"] group_id => "logstash-consumer" }}重要参数bootstrap_servers:Kafka 服务器地址topics:要消费的主题列表group_id:消费者组 IDconsumer_threads:消费者线程数decorate_events:添加 Kafka 元数据到事件高级配置input { kafka { bootstrap_servers => ["kafka1:9092", "kafka2:9092"] topics => ["app-logs", "system-logs"] group_id => "logstash-group" consumer_threads => 4 fetch_min_bytes => 1 fetch_max_wait_ms => 100 max_partition_fetch_bytes => 1048576 session_timeout_ms => 10000 auto_offset_reset => "latest" enable_auto_commit => false decorate_events => true codec => "json" }}4. HTTP 输入插件HTTP 插件通过 HTTP 接口接收数据。基本配置input { http { port => 8080 codec => "json" }}重要参数port:监听端口host:绑定地址codec:编解码器ssl:启用 SSL认证配置input { http { port => 8080 user => "admin" password => "secret" ssl => true ssl_certificate => "/path/to/cert.pem" ssl_key => "/path/to/key.pem" }}5. TCP/UDP 输入插件TCP/UDP 插件用于接收网络协议数据。TCP 配置input { tcp { port => 5000 codec => "json_lines" mode => "server" }}UDP 配置input { udp { port => 5001 codec => "json" workers => 2 }}6. Syslog 输入插件Syslog 插件用于接收系统日志。基本配置input { syslog { port => 514 type => "syslog" }}高级配置input { syslog { port => 514 host => "0.0.0.0" codec => "plain" use_rfc5424e => true grok_patterns => ["RSYSLOGBASE"] timezone => "UTC" }}7. JDBC 输入插件JDBC 插件用于从数据库读取数据。基本配置input { jdbc { jdbc_driver_library => "/path/to/mysql-connector.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb" jdbc_user => "user" jdbc_password => "password" schedule => "* * * * *" statement => "SELECT * FROM logs WHERE created_at > :sql_last_value" }}重要参数jdbcdriverlibrary:JDBC 驱动程序路径jdbcdriverclass:JDBC 驱动类名jdbcconnectionstring:数据库连接字符串schedule:执行计划(cron 表达式)statement:SQL 查询语句usecolumnvalue:使用列值跟踪tracking_column:跟踪列名lastrunmetadata_path:元数据存储路径8. Redis 输入插件Redis 插件用于从 Redis 读取数据。基本配置input { redis { host => "localhost" port => 6379 data_type => "list" key => "logstash" }}数据类型list:列表类型channel:发布订阅频道pattern_channel:模式匹配频道多输入配置可以同时配置多个输入插件:input { file { path => "/var/log/app/*.log" type => "app-log" } beats { port => 5044 type => "beats-log" } kafka { bootstrap_servers => "localhost:9092" topics => ["system-logs"] type => "kafka-log" }}最佳实践合理使用 start_position:生产环境通常使用 "end"配置 sincedb_path:避免重启后重复读取使用类型和标签:便于后续过滤和处理启用 SSL:保护数据传输安全监控输入性能:使用指标监控输入插件的性能