Logstash
Logstash是一个开源的数据收集引擎,主要用于处理和转换各种日志和事件数据。它支持从多种来源(如文件、网络、消息队列和数据库等)收集数据,并将其转换为统一的格式,以便存储、分析和可视化。Logstash提供了大量的插件和过滤器,可以用于数据转换、数据清洗、数据标准化和数据增强等方面。它还支持多种输出,如Elasticsearch、Redis、Kafka和Splunk等。Logstash可以与Elasticsearch和Kibana等开源工具集成,形成一个完整的ELK(Elasticsearch、Logstash和Kibana)堆栈,用于搜索、分析和可视化数据。Logstash适用于多种场景,如安全监控、日志管理、应用性能监控和业务分析等。

查看更多相关内容
Logstash 在 ELK Stack 中扮演什么角色,与 Elasticsearch 和 Kibana 如何协作?ELK Stack 是由 Elasticsearch、Logstash 和 Kibana 三个开源项目组成的完整日志分析平台。它们各自承担不同的职责,协同工作实现日志的收集、处理、存储和可视化。
## ELK Stack 组件
### 1. Elasticsearch
**角色**:搜索引擎和数据存储
**主要功能**:
- 分布式、RESTful 风格的搜索和数据分析引擎
- 存储和索引大量数据
- 提供强大的全文搜索能力
- 支持复杂的数据聚合和分析
**特点**:
- 高性能、可扩展
- 近实时搜索
- 支持多种数据类型
- 提供 RESTful API
### 2. Logstash
**角色**:数据收集和处理管道
**主要功能**:
- 从多种数据源收集数据
- 解析、过滤和转换数据
- 将处理后的数据发送到目标系统
**特点**:
- 丰富的插件生态系统
- 灵活的数据处理能力
- 支持实时数据处理
- 可扩展的架构
### 3. Kibana
**角色**:数据可视化和分析平台
**主要功能**:
- 创建各种图表和仪表板
- 数据探索和分析
- 日志搜索和过滤
- 报告生成和导出
**特点**:
- 直观的用户界面
- 丰富的可视化选项
- 支持实时数据展示
- 可定制的仪表板
## ELK Stack 工作流程
```
数据源 → Logstash → Elasticsearch → Kibana
↓
数据处理
```
### 详细流程
1. **数据采集**
- Logstash 从各种数据源(文件、数据库、消息队列等)采集数据
- 也可以使用 Beats(Filebeat、Metricbeat 等)轻量级采集器
2. **数据处理**
- Logstash 对采集的数据进行解析、过滤和转换
- 使用 Grok、Mutate、Date 等过滤器处理数据
3. **数据存储**
- 处理后的数据发送到 Elasticsearch 进行索引和存储
- Elasticsearch 提供高效的搜索和检索能力
4. **数据可视化**
- Kibana 从 Elasticsearch 读取数据
- 创建图表、仪表板进行数据展示和分析
## 实际应用场景
### 1. 日志管理
```
应用服务器 → Filebeat → Logstash → Elasticsearch → Kibana
```
- 收集应用服务器日志
- 解析和结构化日志数据
- 存储和搜索日志
- 可视化日志分析
### 2. 系统监控
```
服务器 → Metricbeat → Logstash → Elasticsearch → Kibana
```
- 收集系统指标(CPU、内存、磁盘等)
- 聚合和分析监控数据
- 创建监控仪表板
- 设置告警规则
### 3. 安全分析
```
防火墙/IDS → Packetbeat → Logstash → Elasticsearch → Kibana
```
- 收集安全事件数据
- 分析安全威胁
- 可视化安全态势
- 生成安全报告
## Logstash 在 ELK Stack 中的作用
### 1. 数据转换
- 将非结构化日志转换为结构化数据
- 统一不同格式的日志
- 丰富数据内容(添加地理位置、用户代理信息等)
### 2. 数据过滤
- 过滤不需要的日志
- 提取关键字段
- 数据清洗和去重
### 3. 数据路由
- 根据日志类型路由到不同的索引
- 将错误日志发送到专门的存储
- 支持多输出目标
### 4. 数据缓冲
- 使用消息队列(Kafka、Redis)作为缓冲
- 处理突发流量
- 提高系统稳定性
## ELK Stack 优势
### 1. 开源免费
- 所有组件都是开源的
- 活跃的社区支持
- 丰富的文档和教程
### 2. 高度可扩展
- 支持水平扩展
- 处理大规模数据
- 适应业务增长
### 3. 灵活可定制
- 丰富的插件和配置选项
- 支持自定义开发
- 适应各种业务场景
### 4. 实时处理
- 近实时的数据处理和展示
- 快速响应业务需求
- 支持实时监控和告警
## 替代方案
### 1. EFK Stack
- 使用 Fluentd 替代 Logstash
- Fluentd 更轻量级
- 适合 Kubernetes 环境
### 2. ELKB Stack
- 添加 Beats 组件
- Beats 更轻量级的数据采集
- 适合边缘节点部署
### 3. 商业方案
- Splunk
- Datadog
- Sumo Logic
## 最佳实践
1. **合理规划架构**:根据业务需求选择合适的组件和配置
2. **监控和告警**:建立完善的监控和告警机制
3. **数据生命周期管理**:合理设置数据保留策略
4. **安全配置**:启用 SSL/TLS,配置访问控制
5. **性能优化**:根据数据量调整配置参数
服务端 · 2月21日 16:06
Logstash 中 Grok 过滤器的作用是什么,如何使用 Grok 解析日志?Grok 是 Logstash 中最强大和最常用的过滤器之一,它用于将非结构化的文本数据解析为结构化的数据格式。
## Grok 基本概念
Grok 基于正则表达式,通过预定义的模式将文本解析为字段。Grok 语法格式为:
```
%{PATTERN:field_name}
```
其中:
- **PATTERN**:预定义的模式名称
- **field_name**:解析后存储的字段名称
## 常用 Grok 模式
### 基础模式
- `%{NUMBER:num}`:匹配数字
- `%{WORD:word}`:匹配单词
- `%{DATA:data}`:匹配任意数据
- `%{GREEDYDATA:msg}`:贪婪匹配剩余数据
- `%{IP:ip}`:匹配 IP 地址
- `%{DATE:date}`:匹配日期
### 日志模式
- `%{COMBINEDAPACHELOG}`:Apache 组合日志格式
- `%{COMMONAPACHELOG}`:Apache 通用日志格式
- `%{NGINXACCESS}`:Nginx 访问日志格式
- `%{SYSLOGBASE}`:系统日志基础格式
## 实际应用示例
### 1. Apache 访问日志解析
```conf
filter {
grok {
match => {
"message" => "%{COMBINEDAPACHELOG}"
}
}
}
```
解析后会生成以下字段:
- clientip
- ident
- auth
- timestamp
- verb
- request
- httpversion
- response
- bytes
- referrer
- agent
### 2. 自定义日志格式
假设日志格式为:
```
2024-02-21 10:30:45 [INFO] User john.doe logged in from 192.168.1.100
```
配置如下:
```conf
filter {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{LOGLEVEL:level}\] %{GREEDYDATA:message}"
}
}
}
```
### 3. 复杂日志解析
```conf
filter {
grok {
match => {
"message" => "%{IP:client_ip} - %{USER:user} \[%{HTTPDATE:timestamp}\] \"%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:response_code} %{NUMBER:bytes} \"%{DATA:referrer}\" \"%{DATA:agent}\""
}
}
}
```
## 自定义 Grok 模式
可以在配置文件中定义自定义模式:
```conf
filter {
grok {
patterns_dir => ["/path/to/patterns"]
match => {
"message" => "%{CUSTOM_PATTERN:custom_field}"
}
}
}
```
在 patterns 文件中定义:
```
CUSTOM_PATTERN [0-9]{3}-[A-Z]{2}
```
## 多模式匹配
Grok 支持多个匹配模式,按顺序尝试:
```conf
filter {
grok {
match => {
"message" => [
"%{COMBINEDAPACHELOG}",
"%{COMMONAPACHELOG}",
"%{NGINXACCESS}"
]
}
}
}
```
## Grok 调试工具
### 1. Grok Debugger
使用在线 Grok Debugger 工具测试和调试模式:
- Kibana Dev Tools 中的 Grok Debugger
- Elastic 官方在线调试器
### 2. 添加标签便于调试
```conf
filter {
grok {
match => { "message" => "%{PATTERN:field}" }
add_tag => ["_grokparsefailure"]
tag_on_failure => ["_grokparsefailure"]
}
}
```
## 性能优化
1. **使用预编译模式**:Logstash 会缓存编译后的模式
2. **避免贪婪匹配**:使用更精确的模式提高性能
3. **减少模式数量**:只使用必要的模式
4. **使用条件判断**:对特定类型的数据应用特定的 grok 模式
## 最佳实践
1. **从简单到复杂**:先测试简单的模式,逐步增加复杂度
2. **使用命名捕获组**:提高代码可读性
3. **处理解析失败**:使用 `_grokparsefailure` 标签处理解析失败的情况
4. **文档化自定义模式**:为自定义模式添加注释说明
5. **版本控制**:将自定义模式文件纳入版本控制
服务端 · 2月21日 16:02
什么是 Logstash,它的主要功能和工作原理是什么?Logstash 是一个开源的服务器端数据处理管道,能够同时从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的"存储库"中。
## 核心功能
Logstash 的主要功能包括:
- **数据采集**:从各种数据源收集日志和事件数据
- **数据转换**:解析、过滤、丰富和规范化数据
- **数据输出**:将处理后的数据发送到目标存储系统
## 工作原理
Logstash 采用插件式架构,主要包含三个组件:
### 1. Input 插件
负责从数据源读取数据,常见的数据源包括:
- 文件(File)
- 系统日志(Syslog)
- 网络协议(HTTP、TCP、UDP)
- 消息队列(Kafka、RabbitMQ)
- 数据库(JDBC)
- Beats(Filebeat、Metricbeat 等)
### 2. Filter 插件
对数据进行解析、过滤和转换,常用的过滤器包括:
- **grok**:将非结构化数据解析为结构化格式
- **mutate**:对字段进行重命名、删除、替换等操作
- **date**:解析时间戳并转换为 Logstash @timestamp 字段
- **geoip**:根据 IP 地址添加地理位置信息
- **ruby**:使用 Ruby 代码进行复杂的数据转换
### 3. Output 插件
将处理后的数据发送到目标系统,常见的输出目标包括:
- Elasticsearch
- 文件系统
- 消息队列
- 数据库
- 监控系统
## 典型应用场景
1. **日志聚合**:收集分布式系统中的各种日志
2. **日志分析**:对日志进行解析、过滤和结构化
3. **数据转换**:将不同格式的数据转换为统一格式
4. **实时监控**:实时处理和转发监控数据
5. **ELK Stack**:与 Elasticsearch 和 Kibana 组成完整的日志分析平台
## 优势特点
- **灵活性**:支持多种输入和输出格式
- **可扩展性**:丰富的插件生态系统
- **实时处理**:支持流式数据处理
- **易于配置**:使用简单的配置文件定义数据处理流程
- **高可用性**:支持集群部署和负载均衡
服务端 · 2月21日 15:52
Logstash 中如何使用条件判断语句,有哪些常见的条件操作符?Logstash 支持条件判断语句,可以根据字段值、标签或其他条件来控制数据流。这使得我们能够对不同的数据应用不同的处理逻辑。
## 条件判断语法
Logstash 支持以下条件操作符:
### 比较操作符
- `==`:等于
- `!=`:不等于
- `<`:小于
- `>`:大于
- `<=`:小于等于
- `>=`:大于等于
### 逻辑操作符
- `and`:逻辑与
- `or`:逻辑或
- `nand`:逻辑与非
- `xor`:逻辑异或
- `not`:逻辑非
### 正则表达式
- `=~`:匹配正则表达式
- `!~`:不匹配正则表达式
### 包含操作
- `in`:包含在数组中
- `not in`:不包含在数组中
## 基本条件判断
### if 语句
```conf
filter {
if [type] == "apache" {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
}
}
```
### if-else 语句
```conf
filter {
if [type] == "apache" {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
} else if [type] == "nginx" {
grok {
match => { "message" => "%{NGINXACCESS}" }
}
} else {
grok {
match => { "message" => "%{COMMONAPACHELOG}" }
}
}
}
```
## 字段值判断
### 字符串比较
```conf
filter {
if [log_level] == "ERROR" {
mutate {
add_tag => ["error_log"]
}
}
}
```
### 数值比较
```conf
filter {
if [response] >= 400 {
mutate {
add_tag => ["http_error"]
}
}
}
```
### 正则表达式匹配
```conf
filter {
if [message] =~ /exception/i {
mutate {
add_tag => ["exception"]
}
}
}
```
## 标签判断
### 检查标签是否存在
```conf
filter {
if "error" in [tags] {
# 处理错误日志
}
}
```
### 添加标签
```conf
filter {
if [status] >= 500 {
mutate {
add_tag => ["server_error"]
}
} else if [status] >= 400 {
mutate {
add_tag => ["client_error"]
}
}
}
```
## 复杂条件判断
### 多条件组合
```conf
filter {
if [type] == "apache" and [response] >= 400 {
mutate {
add_tag => ["apache_error"]
}
}
}
```
### 使用括号分组
```conf
filter {
if ([type] == "apache" or [type] == "nginx") and [response] >= 400 {
mutate {
add_tag => ["web_error"]
}
}
}
```
### 嵌套条件
```conf
filter {
if [type] == "apache" {
if [response] >= 500 {
mutate {
add_tag => ["apache_server_error"]
}
} else if [response] >= 400 {
mutate {
add_tag => ["apache_client_error"]
}
}
}
}
```
## 在 Input 中使用条件判断
```conf
input {
file {
path => "/var/log/*.log"
type => "system"
}
if [type] == "system" {
file {
path => "/var/log/syslog"
type => "syslog"
}
}
}
```
## 在 Filter 中使用条件判断
### 根据字段值应用不同的过滤器
```conf
filter {
if [type] == "json" {
json {
source => "message"
}
} else {
grok {
match => { "message" => "%{COMMONAPACHELOG}" }
}
}
}
```
### 处理解析失败
```conf
filter {
grok {
match => { "message" => "%{PATTERN:field}" }
tag_on_failure => ["_grokparsefailure"]
}
if "_grokparsefailure" in [tags] {
# 处理解析失败的情况
mutate {
add_field => { "parse_error" => "true" }
}
}
}
```
## 在 Output 中使用条件判断
### 根据日志级别路由到不同的索引
```conf
output {
if [log_level] == "ERROR" {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "error-logs-%{+YYYY.MM.dd}"
}
} else {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "access-logs-%{+YYYY.MM.dd}"
}
}
}
```
### 多输出条件路由
```conf
output {
# 错误日志发送到专门的索引
if [level] == "ERROR" {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "errors-%{+YYYY.MM.dd}"
}
}
# 访问日志发送到 Kafka
if [type] == "access" {
kafka {
bootstrap_servers => "localhost:9092"
topic_id => "access-logs"
}
}
# 所有日志都输出到文件备份
file {
path => "/backup/all-logs.log"
}
}
```
## 字段存在性检查
### 检查字段是否存在
```conf
filter {
if [user_id] {
# user_id 字段存在
mutate {
add_tag => ["has_user_id"]
}
}
}
```
### 检查字段是否为空
```conf
filter {
if [user_id] and [user_id] != "" {
# user_id 字段存在且不为空
mutate {
add_tag => ["valid_user_id"]
}
}
}
```
## 数组操作
### 检查数组是否包含元素
```conf
filter {
if "error" in [tags] {
# tags 数组包含 "error"
}
}
```
### 检查数组长度
```conf
filter {
if [tags] and [tags].length > 0 {
# tags 数组不为空
}
}
```
## 最佳实践
1. **使用条件判断提高性能**:避免对不必要的数据进行处理
2. **合理使用标签**:使用标签标记不同类型的数据
3. **处理异常情况**:使用条件判断处理解析失败等异常
4. **代码可读性**:使用括号和缩进提高配置文件的可读性
5. **测试验证**:使用测试数据验证条件判断逻辑
## 实际应用示例
### 完整的日志处理流程
```conf
input {
file {
path => "/var/log/app/*.log"
start_position => "beginning"
}
}
filter {
# 根据日志类型应用不同的解析逻辑
if [message] =~ /^\{/ {
# JSON 格式日志
json {
source => "message"
}
} else {
# 文本格式日志
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{LOGLEVEL:level}\] %{GREEDYDATA:msg}" }
}
}
# 根据日志级别添加标签
if [level] == "ERROR" or [level] == "FATAL" {
mutate {
add_tag => ["error", "alert"]
}
}
# 解析时间戳
if [timestamp] {
date {
match => ["timestamp", "ISO8601"]
}
}
}
output {
# 错误日志发送到专门的索引
if "alert" in [tags] {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "alerts-%{+YYYY.MM.dd}"
}
} else {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "logs-%{+YYYY.MM.dd}"
}
}
}
```
服务端 · 2月21日 15:52
如何部署和管理 Logstash 集群,有哪些高可用方案?Logstash 支持集群部署,可以通过多个 Logstash 实例组成集群来提高处理能力和可用性。以下是 Logstash 集群部署和管理的相关内容。
## 集群架构
### 单机部署
```
数据源 → Logstash → Elasticsearch
```
适用于小规模场景,单台 Logstash 实例处理所有数据。
### 集群部署
```
数据源 → 负载均衡器 → Logstash 集群 → Elasticsearch
├── Logstash Node 1
├── Logstash Node 2
└── Logstash Node 3
```
适用于大规模场景,多个 Logstash 实例分担负载。
## 负载均衡策略
### 1. 使用 Beats 负载均衡
```conf
# Filebeat 配置
output.logstash:
hosts: ["logstash1:5044", "logstash2:5044", "logstash3:5044"]
loadbalance: true
worker: 2
```
### 2. 使用消息队列
```
数据源 → Kafka → Logstash 集群 → Elasticsearch
├── Logstash 1
├── Logstash 2
└── Logstash 3
```
### 3. 使用负载均衡器
```
数据源 → Nginx/HAProxy → Logstash 集群 → Elasticsearch
```
## 持久化队列
Logstash 支持持久化队列,可以在重启时保留数据,防止数据丢失。
### 启用持久化队列
```conf
# logstash.yml
queue.type: persisted
path.queue: /path/to/queue/data
queue.page_capacity: 250mb
queue.max_events: 0
queue.max_bytes: 1gb
queue.drain: true
```
### 内存队列
```conf
# logstash.yml
queue.type: memory
queue.max_events: 10000
```
## 配置管理
### 1. 配置文件同步
使用配置管理工具(如 Ansible、Puppet、Chef)同步配置文件到所有节点。
### 2. 配置中心
使用配置中心(如 Consul、etcd)管理配置。
### 3. 配置版本控制
将配置文件纳入版本控制系统(Git)。
## 监控和告警
### 1. Logstash 监控 API
```bash
# 查看节点信息
curl -XGET 'localhost:9600/_node'
# 查看管道统计
curl -XGET 'localhost:9600/_node/stats/pipelines?pretty'
# 查看插件统计
curl -XGET 'localhost:9600/_node/stats/plugins?pretty'
```
### 2. Prometheus 集成
```conf
# logstash.yml
http.host: "0.0.0.0"
http.port: 9600
monitoring.enabled: true
monitoring.elasticsearch.hosts: ["http://es:9200"]
```
### 3. 关键指标
- Events per second (EPS)
- Pipeline latency
- Queue size
- JVM memory usage
- CPU usage
## 高可用性
### 1. 多实例部署
部署多个 Logstash 实例,避免单点故障。
### 2. 持久化队列
启用持久化队列,防止数据丢失。
### 3. 健康检查
配置健康检查,自动重启失败的实例。
### 4. 自动扩缩容
根据负载自动调整实例数量。
## 性能调优
### 1. Pipeline Workers
```conf
# logstash.yml
pipeline.workers: 4
```
设置为 CPU 核心数的 1-2 倍。
### 2. Batch Size
```conf
# logstash.yml
pipeline.batch.size: 500
```
增加批量大小可以提高吞吐量。
### 3. JVM 内存
```bash
# config/jvm.options
-Xms4g
-Xmx4g
```
### 4. 垃圾回收器
```bash
# config/jvm.options
-XX:+UseG1GC
```
## 故障排查
### 1. 查看日志
```bash
tail -f /var/log/logstash/logstash-plain.log
```
### 2. 检查配置
```bash
bin/logstash --config.test_and_exit -f /path/to/config.conf
```
### 3. 调试模式
```bash
bin/logstash --config.debug -f /path/to/config.conf
```
### 4. 查看管道状态
```bash
curl -XGET 'localhost:9600/_node/stats/pipelines?pretty'
```
## 实际部署示例
### Docker Compose 部署
```yaml
version: '3'
services:
logstash1:
image: docker.elastic.co/logstash/logstash:8.0.0
volumes:
- ./config/logstash1.conf:/usr/share/logstash/pipeline/logstash.conf
- ./config/logstash.yml:/usr/share/logstash/config/logstash.yml
ports:
- "5044:5044"
- "9600:9600"
environment:
- "LS_JAVA_OPTS=-Xms2g -Xmx2g"
logstash2:
image: docker.elastic.co/logstash/logstash:8.0.0
volumes:
- ./config/logstash2.conf:/usr/share/logstash/pipeline/logstash.conf
- ./config/logstash.yml:/usr/share/logstash/config/logstash.yml
ports:
- "5045:5044"
- "9601:9600"
environment:
- "LS_JAVA_OPTS=-Xms2g -Xmx2g"
```
### Kubernetes 部署
```yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: logstash
spec:
replicas: 3
selector:
matchLabels:
app: logstash
template:
metadata:
labels:
app: logstash
spec:
containers:
- name: logstash
image: docker.elastic.co/logstash/logstash:8.0.0
ports:
- containerPort: 5044
- containerPort: 9600
resources:
limits:
memory: "4Gi"
cpu: "2"
requests:
memory: "2Gi"
cpu: "1"
volumeMounts:
- name: config
mountPath: /usr/share/logstash/pipeline
volumes:
- name: config
configMap:
name: logstash-config
```
## 最佳实践
1. **规划容量**:根据数据量规划集群规模
2. **监控告警**:建立完善的监控和告警机制
3. **配置管理**:使用配置管理工具统一管理配置
4. **数据备份**:定期备份配置和数据
5. **安全加固**:启用 SSL/TLS,配置访问控制
6. **性能测试**:上线前进行充分的性能测试
7. **文档记录**:记录部署和运维文档
服务端 · 2月21日 15:52
Logstash 有哪些常用的输出插件,如何配置 Elasticsearch 输出?Logstash 支持多种输出插件,可以将处理后的数据发送到各种目标系统。以下是常用的输出插件及其配置方法。
## 1. Elasticsearch 输出插件
Elasticsearch 是 Logstash 最常用的输出目标。
### 基本配置
```conf
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "logstash-%{+YYYY.MM.dd}"
}
}
```
### 重要参数
- **hosts**:Elasticsearch 节点地址列表
- **index**:索引名称,支持日期模式
- **document_type**:文档类型(ES 7.x 后已废弃)
- **document_id**:文档 ID
- **action**:操作类型(index、create、update、delete)
- **pipeline**:ES 管道名称
### 高级配置
```conf
output {
elasticsearch {
hosts => ["http://es1:9200", "http://es2:9200"]
index => "app-logs-%{[service]}-%{+YYYY.MM.dd}"
document_id => "%{[@metadata][_id]}"
action => "update"
doc_as_upsert => true
pipeline => "timestamp_pipeline"
# 性能优化
flush_size => 500
idle_flush_time => 1
retry_on_conflict => 3
# SSL 配置
ssl => true
cacert => "/path/to/ca.crt"
user => "elastic"
password => "changeme"
}
}
```
### 条件索引
```conf
output {
if [type] == "error" {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "error-logs-%{+YYYY.MM.dd}"
}
} else {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "access-logs-%{+YYYY.MM.dd}"
}
}
}
```
## 2. File 输出插件
File 插件将数据写入文件系统。
### 基本配置
```conf
output {
file {
path => "/path/to/output.log"
}
}
```
### 重要参数
- **path**:输出文件路径
- **codec**:编解码器
- **flush_interval**:刷新间隔
- **gzip**:启用 gzip 压缩
### 高级配置
```conf
output {
file {
path => "/var/log/logstash/%{type}-%{+YYYY-MM-dd}.log"
codec => line { format => "%{message}" }
flush_interval => 5
gzip => true
file_mode => 0644
dir_mode => 0755
}
}
```
## 3. Kafka 输出插件
Kafka 插件将数据发送到 Kafka 消息队列。
### 基本配置
```conf
output {
kafka {
bootstrap_servers => "localhost:9092"
topic_id => "processed-logs"
}
}
```
### 重要参数
- **bootstrap_servers**:Kafka 服务器地址
- **topic_id**:主题名称
- **codec**:编解码器
- **compression_type**:压缩类型(none、gzip、snappy、lz4、zstd)
### 高级配置
```conf
output {
kafka {
bootstrap_servers => ["kafka1:9092", "kafka2:9092"]
topic_id => "processed-logs"
codec => "json"
compression_type => "snappy"
acks => "all"
retries => 3
batch_size => 16384
linger_ms => 10
buffer_memory => 33554432
# SSL 配置
security_protocol => "SSL"
ssl_keystore_location => "/path/to/keystore.jks"
ssl_keystore_password => "password"
ssl_truststore_location => "/path/to/truststore.jks"
ssl_truststore_password => "password"
}
}
```
### 动态主题
```conf
output {
kafka {
bootstrap_servers => "localhost:9092"
topic_id => "%{[service]}-logs"
}
}
```
## 4. Redis 输出插件
Redis 插件将数据发送到 Redis。
### 基本配置
```conf
output {
redis {
host => "localhost"
port => 6379
data_type => "list"
key => "logstash"
}
}
```
### 数据类型
- **list**:列表类型
- **channel**:发布订阅频道
- **set**:集合类型
### 高级配置
```conf
output {
redis {
host => "redis.example.com"
port => 6379
data_type => "list"
key => "logstash-%{[type]}"
codec => "json"
db => 0
password => "secret"
timeout => 5
reconnect_attempts => 3
reconnect_interval => 2
}
}
```
## 5. HTTP 输出插件
HTTP 插件通过 HTTP 接口发送数据。
### 基本配置
```conf
output {
http {
url => "http://example.com/api/logs"
http_method => "post"
format => "json"
}
}
```
### 重要参数
- **url**:目标 URL
- **http_method**:HTTP 方法(post、put、patch)
- **format**:数据格式(json、form、message)
- **headers**:HTTP 请求头
### 高级配置
```conf
output {
http {
url => "http://api.example.com/v1/logs"
http_method => "post"
format => "json"
headers => {
"Content-Type" => "application/json"
"Authorization" => "Bearer %{[api_token]}"
}
mapping => {
"timestamp" => "%{@timestamp}"
"message" => "%{message}"
"level" => "%{[log_level]}"
}
pool_size => 50
pool_max_per_route => 25
keepalive => true
retry_non_idempotent => true
}
}
```
## 6. Stdout 输出插件
Stdout 插件将数据输出到标准输出,常用于调试。
### 基本配置
```conf
output {
stdout {
codec => rubydebug
}
}
```
### 编解码器选项
- **rubydebug**:格式化输出
- **json**:JSON 格式
- **json_lines**:每行一个 JSON
- **dots**:点号输出
## 7. 多输出配置
可以同时配置多个输出插件:
```conf
output {
# 输出到 Elasticsearch
elasticsearch {
hosts => ["http://localhost:9200"]
index => "logs-%{+YYYY.MM.dd}"
}
# 同时输出到文件备份
file {
path => "/backup/logs-%{+YYYY-MM-dd}.log"
}
# 错误日志发送到 Kafka
if [level] == "ERROR" {
kafka {
bootstrap_servers => "localhost:9092"
topic_id => "error-logs"
}
}
}
```
## 8. 条件输出
使用条件语句控制数据流向:
```conf
output {
if [type] == "apache" {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "apache-%{+YYYY.MM.dd}"
}
} else if [type] == "nginx" {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "nginx-%{+YYYY.MM.dd}"
}
} else {
file {
path => "/var/log/other-logs.log"
}
}
}
```
## 最佳实践
1. **批量写入**:使用 flush_size 和 idle_flush_time 优化性能
2. **错误处理**:配置重试机制和错误日志
3. **索引策略**:合理设计索引命名和分片策略
4. **安全配置**:使用 SSL/TLS 保护数据传输
5. **监控指标**:监控输出插件的性能指标
6. **备份策略**:重要数据配置多个输出目标
服务端 · 2月21日 15:52
Logstash 有哪些常用的输入插件,如何配置文件输入和 Kafka 输入?Logstash 支持多种输入插件,可以从各种数据源收集数据。以下是常用的输入插件及其使用方法。
## 1. File 输入插件
File 插件用于从文件系统读取日志文件。
### 基本配置
```conf
input {
file {
path => "/var/log/*.log"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
```
### 重要参数
- **path**:要读取的文件路径,支持通配符
- **start_position**:开始读取的位置(beginning 或 end)
- **sincedb_path**:记录读取位置的文件路径
- **type**:为事件添加类型标识
- **tags**:为事件添加标签
### 高级配置
```conf
input {
file {
path => ["/var/log/apache/*.log", "/var/log/nginx/*.log"]
exclude => ["*.gz", "*.zip"]
start_position => "beginning"
sincedb_path => "/var/lib/logstash/sincedb"
discover_interval => 15
stat_interval => 1
mode => "read"
file_completed_action => "delete"
file_completed_log_path => "/var/log/logstash/completed.log"
}
}
```
## 2. Beats 输入插件
Beats 插件用于接收来自 Beats(如 Filebeat、Metricbeat)的数据。
### 基本配置
```conf
input {
beats {
port => 5044
}
}
```
### 重要参数
- **port**:监听端口
- **host**:绑定地址
- **ssl**:启用 SSL/TLS
- **client_inactivity_timeout**:客户端不活动超时时间
### SSL 配置
```conf
input {
beats {
port => 5044
ssl => true
ssl_certificate => "/path/to/cert.pem"
ssl_key => "/path/to/key.pem"
ssl_certificate_authorities => ["/path/to/ca.pem"]
ssl_verify_mode => "force_peer"
}
}
```
## 3. Kafka 输入插件
Kafka 插件用于从 Kafka 消息队列消费数据。
### 基本配置
```conf
input {
kafka {
bootstrap_servers => "localhost:9092"
topics => ["logs"]
group_id => "logstash-consumer"
}
}
```
### 重要参数
- **bootstrap_servers**:Kafka 服务器地址
- **topics**:要消费的主题列表
- **group_id**:消费者组 ID
- **consumer_threads**:消费者线程数
- **decorate_events**:添加 Kafka 元数据到事件
### 高级配置
```conf
input {
kafka {
bootstrap_servers => ["kafka1:9092", "kafka2:9092"]
topics => ["app-logs", "system-logs"]
group_id => "logstash-group"
consumer_threads => 4
fetch_min_bytes => 1
fetch_max_wait_ms => 100
max_partition_fetch_bytes => 1048576
session_timeout_ms => 10000
auto_offset_reset => "latest"
enable_auto_commit => false
decorate_events => true
codec => "json"
}
}
```
## 4. HTTP 输入插件
HTTP 插件通过 HTTP 接口接收数据。
### 基本配置
```conf
input {
http {
port => 8080
codec => "json"
}
}
```
### 重要参数
- **port**:监听端口
- **host**:绑定地址
- **codec**:编解码器
- **ssl**:启用 SSL
### 认证配置
```conf
input {
http {
port => 8080
user => "admin"
password => "secret"
ssl => true
ssl_certificate => "/path/to/cert.pem"
ssl_key => "/path/to/key.pem"
}
}
```
## 5. TCP/UDP 输入插件
TCP/UDP 插件用于接收网络协议数据。
### TCP 配置
```conf
input {
tcp {
port => 5000
codec => "json_lines"
mode => "server"
}
}
```
### UDP 配置
```conf
input {
udp {
port => 5001
codec => "json"
workers => 2
}
}
```
## 6. Syslog 输入插件
Syslog 插件用于接收系统日志。
### 基本配置
```conf
input {
syslog {
port => 514
type => "syslog"
}
}
```
### 高级配置
```conf
input {
syslog {
port => 514
host => "0.0.0.0"
codec => "plain"
use_rfc5424e => true
grok_patterns => ["RSYSLOGBASE"]
timezone => "UTC"
}
}
```
## 7. JDBC 输入插件
JDBC 插件用于从数据库读取数据。
### 基本配置
```conf
input {
jdbc {
jdbc_driver_library => "/path/to/mysql-connector.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"
jdbc_user => "user"
jdbc_password => "password"
schedule => "* * * * *"
statement => "SELECT * FROM logs WHERE created_at > :sql_last_value"
}
}
```
### 重要参数
- **jdbc_driver_library**:JDBC 驱动程序路径
- **jdbc_driver_class**:JDBC 驱动类名
- **jdbc_connection_string**:数据库连接字符串
- **schedule**:执行计划(cron 表达式)
- **statement**:SQL 查询语句
- **use_column_value**:使用列值跟踪
- **tracking_column**:跟踪列名
- **last_run_metadata_path**:元数据存储路径
## 8. Redis 输入插件
Redis 插件用于从 Redis 读取数据。
### 基本配置
```conf
input {
redis {
host => "localhost"
port => 6379
data_type => "list"
key => "logstash"
}
}
```
### 数据类型
- **list**:列表类型
- **channel**:发布订阅频道
- **pattern_channel**:模式匹配频道
## 多输入配置
可以同时配置多个输入插件:
```conf
input {
file {
path => "/var/log/app/*.log"
type => "app-log"
}
beats {
port => 5044
type => "beats-log"
}
kafka {
bootstrap_servers => "localhost:9092"
topics => ["system-logs"]
type => "kafka-log"
}
}
```
## 最佳实践
1. **合理使用 start_position**:生产环境通常使用 "end"
2. **配置 sincedb_path**:避免重启后重复读取
3. **使用类型和标签**:便于后续过滤和处理
4. **启用 SSL**:保护数据传输安全
5. **监控输入性能**:使用指标监控输入插件的性能
服务端 · 2月21日 15:52
Logstash 有哪些常用的过滤器,如何使用 Grok 和 Mutate 过滤器?Logstash 提供了多种过滤器插件,用于对数据进行解析、转换和丰富。以下是常用的过滤器及其使用方法。
## 1. Grok 过滤器
Grok 是最强大的过滤器,用于将非结构化数据解析为结构化数据。
### 基本用法
```conf
filter {
grok {
match => {
"message" => "%{COMBINEDAPACHELOG}"
}
}
}
```
### 多模式匹配
```conf
filter {
grok {
match => {
"message" => [
"%{COMBINEDAPACHELOG}",
"%{COMMONAPACHELOG}",
"%{NGINXACCESS}"
]
}
}
}
```
### 自定义模式
```conf
filter {
grok {
patterns_dir => ["/path/to/patterns"]
match => {
"message" => "%{CUSTOM_PATTERN:custom_field}"
}
}
}
```
## 2. Mutate 过滤器
Mutate 过滤器用于对字段进行各种操作。
### 重命名字段
```conf
filter {
mutate {
rename => { "old_name" => "new_name" }
}
}
```
### 转换字段类型
```conf
filter {
mutate {
convert => {
"status" => "integer"
"price" => "float"
"enabled" => "boolean"
}
}
}
```
### 删除字段
```conf
filter {
mutate {
remove_field => ["temp_field", "debug_info"]
}
}
```
### 替换字段值
```conf
filter {
mutate {
replace => { "message" => "new message" }
}
}
```
### 添加字段
```conf
filter {
mutate {
add_field => {
"environment" => "production"
"processed_at" => "%{@timestamp}"
}
}
}
```
### 合并字段
```conf
filter {
mutate {
merge => { "field1" => "field2" }
}
}
```
## 3. Date 过滤器
Date 过滤器用于解析时间戳并转换为 Logstash 的 @timestamp 字段。
### 基本用法
```conf
filter {
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
}
}
```
### 多种日期格式
```conf
filter {
date {
match => [
"timestamp",
"dd/MMM/yyyy:HH:mm:ss Z",
"yyyy-MM-dd HH:mm:ss",
"ISO8601"
]
}
}
```
### 自定义目标字段
```conf
filter {
date {
match => ["log_time", "yyyy-MM-dd HH:mm:ss"]
target => "parsed_time"
}
}
```
### 时区设置
```conf
filter {
date {
match => ["timestamp", "yyyy-MM-dd HH:mm:ss"]
timezone => "Asia/Shanghai"
}
}
```
## 4. GeoIP 过滤器
GeoIP 过滤器根据 IP 地址添加地理位置信息。
### 基本用法
```conf
filter {
geoip {
source => "client_ip"
}
}
```
### 指定目标字段
```conf
filter {
geoip {
source => "client_ip"
target => "geoip"
}
}
```
### 指定数据库路径
```conf
filter {
geoip {
source => "client_ip"
database => "/path/to/GeoLite2-City.mmdb"
}
}
```
### 指定字段
```conf
filter {
geoip {
source => "client_ip"
fields => ["city_name", "country_name", "location"]
}
}
```
## 5. Useragent 过滤器
Useragent 过滤器解析 User-Agent 字符串。
### 基本用法
```conf
filter {
useragent {
source => "agent"
}
}
```
### 指定目标字段
```conf
filter {
useragent {
source => "agent"
target => "ua"
}
}
```
## 6. CSV 过滤器
CSV 过滤器解析 CSV 格式的数据。
### 基本用法
```conf
filter {
csv {
separator => ","
columns => ["name", "age", "city"]
}
}
```
### 自动检测列名
```conf
filter {
csv {
separator => ","
autodetect_column_types => true
}
}
```
## 7. JSON 过滤器
JSON 过滤器解析 JSON 字符串。
### 基本用法
```conf
filter {
json {
source => "message"
}
}
```
### 指定目标字段
```conf
filter {
json {
source => "message"
target => "parsed_json"
}
}
```
### 保留原始字段
```conf
filter {
json {
source => "message"
remove_field => ["message"]
}
}
```
## 8. Ruby 过滤器
Ruby 过滤器允许使用 Ruby 代码进行复杂的数据处理。
### 基本用法
```conf
filter {
ruby {
code => 'event.set("computed_field", event.get("field1") + event.get("field2"))'
}
}
```
### 复杂逻辑
```conf
filter {
ruby {
code => '
if event.get("status").to_i >= 400
event.tag("error")
else
event.tag("success")
end
'
}
}
```
### 数组操作
```conf
filter {
ruby {
code => '
items = event.get("items")
if items.is_a?(Array)
event.set("item_count", items.length)
event.set("total_price", items.sum { |i| i["price"] })
end
'
}
}
```
## 9. Drop 过滤器
Drop 过滤器用于丢弃事件。
### 条件丢弃
```conf
filter {
if [log_level] == "DEBUG" {
drop { }
}
}
```
### 百分比丢弃
```conf
filter {
ruby {
code => 'event.cancel if rand < 0.1'
}
}
```
## 10. Aggregate 过滤器
Aggregate 过滤器用于聚合多个事件。
### 基本用法
```conf
filter {
aggregate {
task_id => "%{user_id}"
code => '
map["count"] ||= 0
map["count"] += 1
'
push_map_as_event => true
timeout => 60
}
}
```
## 过滤器组合
多个过滤器可以组合使用:
```conf
filter {
# 解析日志格式
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
# 转换字段类型
mutate {
convert => { "response" => "integer" }
}
# 解析时间戳
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
}
# 添加地理位置信息
geoip {
source => "clientip"
}
# 解析 User-Agent
useragent {
source => "agent"
}
}
```
## 最佳实践
1. **过滤器顺序**:按逻辑顺序排列过滤器
2. **条件判断**:使用条件语句避免不必要的处理
3. **性能优化**:避免使用复杂的 Ruby 代码
4. **错误处理**:处理解析失败的情况
5. **测试验证**:使用 Grok Debugger 等工具测试过滤器
服务端 · 2月21日 15:52
Logstash 有哪些常用的插件,如何安装和管理插件?Logstash 提供了丰富的插件生态系统,可以通过插件扩展功能。以下是关于 Logstash 插件的相关内容。
## 插件类型
Logstash 插件主要分为三类:
### 1. Input 插件
负责从数据源读取数据。
**常用插件**:
- **file**:从文件系统读取文件
- **beats**:接收来自 Beats 的数据
- **kafka**:从 Kafka 消费数据
- **http**:通过 HTTP 接口接收数据
- **tcp/udp**:接收 TCP/UDP 数据
- **syslog**:接收系统日志
- **jdbc**:从数据库读取数据
- **redis**:从 Redis 读取数据
- **s3**:从 AWS S3 读取数据
- **elasticsearch**:从 Elasticsearch 读取数据
### 2. Filter 插件
负责对数据进行处理和转换。
**常用插件**:
- **grok**:解析非结构化数据
- **mutate**:字段操作(重命名、删除、转换等)
- **date**:解析时间戳
- **geoip**:添加地理位置信息
- **useragent**:解析 User-Agent
- **json**:解析 JSON 数据
- **csv**:解析 CSV 数据
- **ruby**:使用 Ruby 代码处理数据
- **aggregate**:聚合多个事件
- **drop**:丢弃事件
### 3. Output 插件
负责将数据发送到目标系统。
**常用插件**:
- **elasticsearch**:发送到 Elasticsearch
- **file**:写入文件
- **kafka**:发送到 Kafka
- **redis**:发送到 Redis
- **http**:通过 HTTP 发送数据
- **stdout**:输出到标准输出
- **email**:发送邮件
- **s3**:发送到 AWS S3
- **mongodb**:发送到 MongoDB
## 插件管理
### 1. 查看已安装插件
```bash
bin/logstash-plugin list
```
### 2. 查看插件详细信息
```bash
bin/logstash-plugin list --verbose
```
### 3. 安装插件
```bash
# 从官方仓库安装
bin/logstash-plugin install logstash-output-s3
# 指定版本安装
bin/logstash-plugin install logstash-output-s3 --version 10.0.0
# 从本地文件安装
bin/logstash-plugin install /path/to/plugin.zip
```
### 4. 更新插件
```bash
# 更新所有插件
bin/logstash-plugin update
# 更新指定插件
bin/logstash-plugin update logstash-output-s3
```
### 5. 卸载插件
```bash
bin/logstash-plugin uninstall logstash-output-s3
```
### 6. 验证插件
```bash
bin/logstash-plugin verify
```
## 常用插件详解
### 1. File Input 插件
```conf
input {
file {
path => "/var/log/*.log"
start_position => "beginning"
sincedb_path => "/dev/null"
type => "syslog"
tags => ["system"]
}
}
```
### 2. Grok Filter 插件
```conf
filter {
grok {
match => {
"message" => "%{COMBINEDAPACHELOG}"
}
patterns_dir => ["/path/to/patterns"]
overwrite => ["message"]
}
}
```
### 3. Elasticsearch Output 插件
```conf
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "logstash-%{+YYYY.MM.dd}"
document_type => "_doc"
flush_size => 500
idle_flush_time => 1
}
}
```
### 4. Kafka Output 插件
```conf
output {
kafka {
bootstrap_servers => "localhost:9092"
topic_id => "logs"
codec => "json"
compression_type => "snappy"
}
}
```
## 自定义插件开发
### 1. 插件类型选择
根据需求选择开发 Input、Filter 或 Output 插件。
### 2. 创建插件项目
```bash
# 使用 Logstash 插件生成器
gem install logstash-plugin-generator
logstash-plugin generate --type input --name myinput
```
### 3. 插件结构
```
logstash-input-myinput/
├── lib/
│ └── logstash/
│ └── inputs/
│ └── myinput.rb
├── spec/
│ └── inputs/
│ └── myinput_spec.rb
├── Gemfile
├── logstash-input-myinput.gemspec
└── README.md
```
### 4. 插件代码示例
```ruby
# lib/logstash/inputs/myinput.rb
require "logstash/inputs/base"
require "logstash/namespace"
require "socket"
class LogStash::Inputs::Myinput < LogStash::Inputs::Base
config_name "myinput"
config :host, :validate => :string, :default => "0.0.0.0"
config :port, :validate => :number, :required => true
def register
@logger.info("Registering myinput", :host => @host, :port => @port)
end
def run(queue)
@server = TCPServer.new(@host, @port)
loop do
client = @server.accept
Thread.new do
begin
while line = client.gets
event = LogStash::Event.new("message" => line)
decorate(event)
queue << event
end
rescue => e
@logger.error("Error", :exception => e)
ensure
client.close
end
end
end
end
def stop
@server.close if @server
end
end
```
### 5. 构建和安装插件
```bash
# 构建 gem 包
gem build logstash-input-myinput.gemspec
# 安装插件
bin/logstash-plugin install logstash-input-myinput-1.0.0.gem
```
## 插件配置最佳实践
### 1. 插件顺序
按照数据处理流程合理排列插件顺序:
```
Input → Filter → Output
```
### 2. 条件判断
使用条件语句避免不必要的插件处理:
```conf
filter {
if [type] == "apache" {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
}
}
```
### 3. 错误处理
处理插件执行失败的情况:
```conf
filter {
grok {
match => { "message" => "%{PATTERN:field}" }
tag_on_failure => ["_grokparsefailure"]
}
if "_grokparsefailure" in [tags] {
# 处理解析失败
}
}
```
### 4. 性能优化
- 使用批量处理提高性能
- 避免使用复杂的 Ruby 代码
- 合理配置线程数和批量大小
## 插件版本管理
### 1. 查看插件版本
```bash
bin/logstash-plugin list --verbose | grep logstash-output-s3
```
### 2. 锁定插件版本
在 Gemfile 中指定插件版本:
```ruby
gem "logstash-output-s3", "~> 10.0"
```
### 3. 版本兼容性
确保插件版本与 Logstash 版本兼容。
## 插件测试
### 1. 单元测试
```ruby
# spec/inputs/myinput_spec.rb
require "logstash/devutils/rspec/spec_helper"
require "logstash/inputs/myinput"
describe LogStash::Inputs::Myinput do
it "should register" do
input = LogStash::Inputs::Myinput.new("port" => 1234)
expect { input.register }.not_to raise_error
end
end
```
### 2. 集成测试
使用测试数据验证插件功能。
## 社区插件
Logstash 社区提供了大量第三方插件,可以通过以下方式查找:
- Logstash 官方插件仓库
- GitHub 搜索
- Elastic 社区论坛
## 最佳实践
1. **选择合适的插件**:根据需求选择最适合的插件
2. **保持插件更新**:定期更新插件以获得最新功能和修复
3. **测试插件**:在生产环境使用前充分测试插件
4. **监控插件性能**:监控插件的性能指标
5. **文档记录**:记录自定义插件的使用方法和配置
服务端 · 2月21日 15:52
如何优化 Logstash 的性能,有哪些常见的优化策略?Logstash 的性能优化是一个重要的话题,特别是在处理大量日志数据时。以下是几个关键的优化策略。
## 1. JVM 内存配置
### 堆内存设置
Logstash 运行在 JVM 上,合理的堆内存配置至关重要:
```bash
# 在 config/jvm.options 中设置
-Xms2g
-Xmx2g
```
**最佳实践**:
- 堆内存不要超过系统物理内存的 50%
- 设置 Xms 和 Xmx 相同值,避免动态调整带来的性能损耗
- 对于大数据量场景,建议堆内存设置为 4-8GB
### JVM 参数优化
```bash
# 使用 G1 垃圾回收器
-XX:+UseG1GC
# 设置 GC 线程数
-XX:ConcGCThreads=2
-XX:ParallelGCThreads=4
# 设置新生代比例
-XX:NewRatio=1
```
## 2. Pipeline 配置优化
### Pipeline Workers
```conf
pipeline.workers: 4
```
- 默认值为 CPU 核心数
- 增加 workers 可以提高并行处理能力
- 建议设置为 CPU 核心数的 1-2 倍
### Batch Size
```conf
pipeline.batch.size: 125
```
- 每个 worker 一次处理的批量大小
- 默认值为 125,可根据实际情况调整
- 增大批量大小可以提高吞吐量,但会增加延迟
### Batch Delay
```conf
pipeline.batch.delay: 50
```
- 批量处理的延迟时间(毫秒)
- 默认值为 50ms
- 降低延迟可以提高实时性,但可能降低吞吐量
## 3. 过滤器优化
### 减少不必要的过滤器
```conf
filter {
# 只对特定类型的数据应用过滤器
if [type] == "apache" {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
}
}
```
### 使用条件判断
```conf
filter {
# 避免对已经解析过的数据重复处理
if [parsed] != "true" {
grok {
match => { "message" => "%{PATTERN:field}" }
add_field => { "parsed" => "true" }
}
}
}
```
### 优化 Grok 模式
- 使用更精确的模式,避免贪婪匹配
- 将常用模式放在前面
- 使用多模式匹配时,将最可能匹配的模式放在前面
## 4. 输入输出优化
### 文件输入优化
```conf
input {
file {
path => "/var/log/*.log"
# 从文件末尾开始读取
start_position => "end"
# 禁用 sincedb 文件(仅用于测试)
sincedb_path => "/dev/null"
# 增加读取缓冲区大小
file_completed_action => "delete"
}
}
```
### Elasticsearch 输出优化
```conf
output {
elasticsearch {
hosts => ["http://localhost:9200"]
# 批量提交大小
flush_size => 500
# 批量提交超时时间
idle_flush_time => 1
# 启用压缩
http_compression => true
# 增加连接池大小
pool_max => 10
}
}
```
## 5. 监控和调试
### 启用监控
```conf
# 在 logstash.yml 中配置
http.host: "0.0.0.0"
http.port: 9600
```
### 查看管道统计
```bash
curl -XGET 'localhost:9600/_node/stats/pipelines?pretty'
```
### 日志级别调整
```conf
# 在 logstash.yml 中设置
log.level: info
```
## 6. 架构优化
### 使用消息队列
在 Logstash 前后添加消息队列(如 Kafka、RabbitMQ):
- 解耦数据生产者和消费者
- 提供缓冲能力,应对突发流量
- 支持多消费者并行处理
### 集群部署
- 使用多个 Logstash 实例组成集群
- 通过负载均衡器分发流量
- 提高整体处理能力和可用性
### 使用 Beats
- 使用 Filebeat、Metricbeat 等 Beats 轻量级数据采集器
- Beats 资源占用更少,适合在边缘节点部署
- Logstash 专注于数据处理和转换
## 7. 实际案例
### 高吞吐量场景
```conf
# logstash.yml
pipeline.workers: 8
pipeline.batch.size: 500
pipeline.batch.delay: 10
# config/jvm.options
-Xms8g
-Xmx8g
-XX:+UseG1GC
```
### 低延迟场景
```conf
# logstash.yml
pipeline.workers: 4
pipeline.batch.size: 50
pipeline.batch.delay: 5
```
## 性能测试
使用 logstash-input-generator 进行性能测试:
```conf
input {
generator {
lines => ["test line"]
count => 100000
}
}
output {
stdout { codec => dots }
}
```
监控指标:
- Events per second (EPS)
- CPU 使用率
- 内存使用情况
- 网络吞吐量
服务端 · 2月21日 15:52