乐闻世界logo
搜索文章和话题

Logstash 有哪些常用的输入插件,如何配置文件输入和 Kafka 输入?

2月21日 15:52

Logstash 支持多种输入插件,可以从各种数据源收集数据。以下是常用的输入插件及其使用方法。

1. File 输入插件

File 插件用于从文件系统读取日志文件。

基本配置

conf
input { file { path => "/var/log/*.log" start_position => "beginning" sincedb_path => "/dev/null" } }

重要参数

  • path:要读取的文件路径,支持通配符
  • start_position:开始读取的位置(beginning 或 end)
  • sincedb_path:记录读取位置的文件路径
  • type:为事件添加类型标识
  • tags:为事件添加标签

高级配置

conf
input { file { path => ["/var/log/apache/*.log", "/var/log/nginx/*.log"] exclude => ["*.gz", "*.zip"] start_position => "beginning" sincedb_path => "/var/lib/logstash/sincedb" discover_interval => 15 stat_interval => 1 mode => "read" file_completed_action => "delete" file_completed_log_path => "/var/log/logstash/completed.log" } }

2. Beats 输入插件

Beats 插件用于接收来自 Beats(如 Filebeat、Metricbeat)的数据。

基本配置

conf
input { beats { port => 5044 } }

重要参数

  • port:监听端口
  • host:绑定地址
  • ssl:启用 SSL/TLS
  • client_inactivity_timeout:客户端不活动超时时间

SSL 配置

conf
input { beats { port => 5044 ssl => true ssl_certificate => "/path/to/cert.pem" ssl_key => "/path/to/key.pem" ssl_certificate_authorities => ["/path/to/ca.pem"] ssl_verify_mode => "force_peer" } }

3. Kafka 输入插件

Kafka 插件用于从 Kafka 消息队列消费数据。

基本配置

conf
input { kafka { bootstrap_servers => "localhost:9092" topics => ["logs"] group_id => "logstash-consumer" } }

重要参数

  • bootstrap_servers:Kafka 服务器地址
  • topics:要消费的主题列表
  • group_id:消费者组 ID
  • consumer_threads:消费者线程数
  • decorate_events:添加 Kafka 元数据到事件

高级配置

conf
input { kafka { bootstrap_servers => ["kafka1:9092", "kafka2:9092"] topics => ["app-logs", "system-logs"] group_id => "logstash-group" consumer_threads => 4 fetch_min_bytes => 1 fetch_max_wait_ms => 100 max_partition_fetch_bytes => 1048576 session_timeout_ms => 10000 auto_offset_reset => "latest" enable_auto_commit => false decorate_events => true codec => "json" } }

4. HTTP 输入插件

HTTP 插件通过 HTTP 接口接收数据。

基本配置

conf
input { http { port => 8080 codec => "json" } }

重要参数

  • port:监听端口
  • host:绑定地址
  • codec:编解码器
  • ssl:启用 SSL

认证配置

conf
input { http { port => 8080 user => "admin" password => "secret" ssl => true ssl_certificate => "/path/to/cert.pem" ssl_key => "/path/to/key.pem" } }

5. TCP/UDP 输入插件

TCP/UDP 插件用于接收网络协议数据。

TCP 配置

conf
input { tcp { port => 5000 codec => "json_lines" mode => "server" } }

UDP 配置

conf
input { udp { port => 5001 codec => "json" workers => 2 } }

6. Syslog 输入插件

Syslog 插件用于接收系统日志。

基本配置

conf
input { syslog { port => 514 type => "syslog" } }

高级配置

conf
input { syslog { port => 514 host => "0.0.0.0" codec => "plain" use_rfc5424e => true grok_patterns => ["RSYSLOGBASE"] timezone => "UTC" } }

7. JDBC 输入插件

JDBC 插件用于从数据库读取数据。

基本配置

conf
input { jdbc { jdbc_driver_library => "/path/to/mysql-connector.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb" jdbc_user => "user" jdbc_password => "password" schedule => "* * * * *" statement => "SELECT * FROM logs WHERE created_at > :sql_last_value" } }

重要参数

  • jdbc_driver_library:JDBC 驱动程序路径
  • jdbc_driver_class:JDBC 驱动类名
  • jdbc_connection_string:数据库连接字符串
  • schedule:执行计划(cron 表达式)
  • statement:SQL 查询语句
  • use_column_value:使用列值跟踪
  • tracking_column:跟踪列名
  • last_run_metadata_path:元数据存储路径

8. Redis 输入插件

Redis 插件用于从 Redis 读取数据。

基本配置

conf
input { redis { host => "localhost" port => 6379 data_type => "list" key => "logstash" } }

数据类型

  • list:列表类型
  • channel:发布订阅频道
  • pattern_channel:模式匹配频道

多输入配置

可以同时配置多个输入插件:

conf
input { file { path => "/var/log/app/*.log" type => "app-log" } beats { port => 5044 type => "beats-log" } kafka { bootstrap_servers => "localhost:9092" topics => ["system-logs"] type => "kafka-log" } }

最佳实践

  1. 合理使用 start_position:生产环境通常使用 "end"
  2. 配置 sincedb_path:避免重启后重复读取
  3. 使用类型和标签:便于后续过滤和处理
  4. 启用 SSL:保护数据传输安全
  5. 监控输入性能:使用指标监控输入插件的性能
标签:Logstash