孤九沫
发布于 2026-05-12 / 10 阅读
0
0

ELK接入实践

ELK架构:filebeat-3kafka-2logstash-es-kibana

首先安装filebeat,windows环境加到服务中管理,到filebeat的根目录

filebeat.inputs:
# ========== 日期轮转型日志(只采集今天 + GBK编码) ==========
- type: log
  enabled: true
  paths:
    - C:/jh_plm_log/JH_CustomizedDxTaskLog_*.txt
    - C:/jh_plm_log/ProcessCheckHandler_*.txt
    - C:/jh_plm_log/202*_log.txt
    - C:/jh_plm_log/InterfaceValue_*.txt
    - C:/jh_plm_log/JH_MS_PR_ProductRegulatory_*.txt

  # 关键:只采集【24小时内】新日志,旧文件直接忽略 ✅
  ignore_older: 24h


  # 字段(根级别,logstash能直接识别)
  fields_under_root: true
  fields:
    source: plm

  # 你的日志编码 GBK ✅
  encoding: gbk

  close_inactive: 1h
  clean_inactive: 72h

# ========== 模块配置 ==========
filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: true

# ========== ES 模板:使用你现有的 jfsc-* ✅
setup.template.name: "jfsc-*"
setup.template.pattern: "jfsc-*"
setup.template.overwrite: true
setup.template.enabled: true
setup.ilm.enabled: false

setup.template.settings:
  index.number_of_shards: 1
  index.number_of_replicas: 1

setup.kibana:
  host: "172.17.167.30:5601"

# ========== 输出到你新建的 Kafka 主题:plm ✅
output.kafka:
  enabled: true
  hosts: ["172.17.167.178:9092","172.17.167.179:9092","172.17.167.180:9092"]
  topic: "plm"          # <--- 已改成你新主题
  compression: gzip
  required_acks: -1
  partition.round_robin:
    reachable_only: true

# ========== 日志输出 ==========
logging.level: info
logging.to_files: true
logging.files:
  path: ./logs
  name: filebeat
  keepfiles: 7
  permissions: 0644

在此之前到kafka的服务器中创建主题plm

./kafka-topics.sh --create --bootstrap-server 172.17.167.178:9092,172.17.167.179:9092,172.17.167.180:9092 --topic plm --partitions 3 --replication-factor 3

./kafka-console-consumer.sh --bootstrap-server 172.17.167.178:9092 --topic plm --from-beginning

./kafka-consumer-groups.sh --list --bootstrap-server 172.17.167.178:9092,172.17.167.179:9092,172.17.167.180:9092   ##查看消费组
./kafka-topics.sh --list --bootstrap-server 172.17.167.178:9092,172.17.167.179:9092,172.17.167.180:9092 ##查看主题组

如果filebeat正常传输日志,在第二个命令会实时显示传输过来的日志。

以上如果正常,下一步调整logstash,有一个通道文件pipelines.yml logstash会按照这个文件的配置文件去加载,同级目录下创建.conf

input {
  kafka {
    bootstrap_servers => "172.17.167.178:9092,172.17.167.179:9092,172.17.167.180:9092"
    topics => ["plm"]                  # 你新建的主题
    group_id => "plm-group"            # 消费组
    auto_offset_reset => "latest"       # 只消费最新日志
    codec => "json"                    # 日志是JSON格式
    consumer_threads => 1
  }
}

filter {
  # 去掉多余字段,干净整洁
  mutate {
    remove_field => [
      "[agent]", "[ecs]", "[input]", "[@metadata]",
      "[log][file]", "[log][offset]"
    ]
  }
}

output {
  # 根据 source: plm 匹配,写入 ES
  if [source] == "plm" {
    elasticsearch {
      hosts => ["172.17.167.30:9200","172.17.167.31:9200","172.17.167.32:9200"]
      index => "plm-%{+YYYY.MM.dd}"     # 按天生成索引
      user => "elastic"
      password => "Pwd@Jahwa123"
    }
  }

  # 控制台打印(调试用,可看到日志)
  stdout {
    codec => rubydebug
  }
}

多个日志流可以用一个主题,他们用source区区分,在logstash中可以根据不同的source创建不同的索引,在logstash重启之后,在es中

再上图中就能搜到,然后创建一个kibana的数据视图,选择创建数据视图,名称输入首字母+*的通用匹配规则。这样在discover中就能按照名称找到这个日志。

问题:发现Windows server 服务器传输过来的日志,中文存在乱码

记事本打开左上角另外存查看编码格式,如果是utf8则不需要加额外参数,默认utf8.

  encoding: gbk    ##删除


评论