ELK架构:filebeat-3kafka-2logstash-es-kibana
首先安装filebeat,windows环境加到服务中管理,到filebeat的根目录
filebeat.inputs:
# ========== 日期轮转型日志(只采集今天 + GBK编码) ==========
- type: log
enabled: true
paths:
- C:/jh_plm_log/JH_CustomizedDxTaskLog_*.txt
- C:/jh_plm_log/ProcessCheckHandler_*.txt
- C:/jh_plm_log/202*_log.txt
- C:/jh_plm_log/InterfaceValue_*.txt
- C:/jh_plm_log/JH_MS_PR_ProductRegulatory_*.txt
# 关键:只采集【24小时内】新日志,旧文件直接忽略 ✅
ignore_older: 24h
# 字段(根级别,logstash能直接识别)
fields_under_root: true
fields:
source: plm
# 你的日志编码 GBK ✅
encoding: gbk
close_inactive: 1h
clean_inactive: 72h
# ========== 模块配置 ==========
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: true
# ========== ES 模板:使用你现有的 jfsc-* ✅
setup.template.name: "jfsc-*"
setup.template.pattern: "jfsc-*"
setup.template.overwrite: true
setup.template.enabled: true
setup.ilm.enabled: false
setup.template.settings:
index.number_of_shards: 1
index.number_of_replicas: 1
setup.kibana:
host: "172.17.167.30:5601"
# ========== 输出到你新建的 Kafka 主题:plm ✅
output.kafka:
enabled: true
hosts: ["172.17.167.178:9092","172.17.167.179:9092","172.17.167.180:9092"]
topic: "plm" # <--- 已改成你新主题
compression: gzip
required_acks: -1
partition.round_robin:
reachable_only: true
# ========== 日志输出 ==========
logging.level: info
logging.to_files: true
logging.files:
path: ./logs
name: filebeat
keepfiles: 7
permissions: 0644在此之前到kafka的服务器中创建主题plm
./kafka-topics.sh --create --bootstrap-server 172.17.167.178:9092,172.17.167.179:9092,172.17.167.180:9092 --topic plm --partitions 3 --replication-factor 3
./kafka-console-consumer.sh --bootstrap-server 172.17.167.178:9092 --topic plm --from-beginning
./kafka-consumer-groups.sh --list --bootstrap-server 172.17.167.178:9092,172.17.167.179:9092,172.17.167.180:9092 ##查看消费组
./kafka-topics.sh --list --bootstrap-server 172.17.167.178:9092,172.17.167.179:9092,172.17.167.180:9092 ##查看主题组如果filebeat正常传输日志,在第二个命令会实时显示传输过来的日志。
以上如果正常,下一步调整logstash,有一个通道文件pipelines.yml logstash会按照这个文件的配置文件去加载,同级目录下创建.conf
input {
kafka {
bootstrap_servers => "172.17.167.178:9092,172.17.167.179:9092,172.17.167.180:9092"
topics => ["plm"] # 你新建的主题
group_id => "plm-group" # 消费组
auto_offset_reset => "latest" # 只消费最新日志
codec => "json" # 日志是JSON格式
consumer_threads => 1
}
}
filter {
# 去掉多余字段,干净整洁
mutate {
remove_field => [
"[agent]", "[ecs]", "[input]", "[@metadata]",
"[log][file]", "[log][offset]"
]
}
}
output {
# 根据 source: plm 匹配,写入 ES
if [source] == "plm" {
elasticsearch {
hosts => ["172.17.167.30:9200","172.17.167.31:9200","172.17.167.32:9200"]
index => "plm-%{+YYYY.MM.dd}" # 按天生成索引
user => "elastic"
password => "Pwd@Jahwa123"
}
}
# 控制台打印(调试用,可看到日志)
stdout {
codec => rubydebug
}
}多个日志流可以用一个主题,他们用source区区分,在logstash中可以根据不同的source创建不同的索引,在logstash重启之后,在es中

再上图中就能搜到,然后创建一个kibana的数据视图,选择创建数据视图,名称输入首字母+*的通用匹配规则。这样在discover中就能按照名称找到这个日志。
问题:发现Windows server 服务器传输过来的日志,中文存在乱码
记事本打开左上角另外存查看编码格式,如果是utf8则不需要加额外参数,默认utf8.
encoding: gbk ##删除