開源數據采集工具 logstash(收集日志)/telegraf(收集指標)
Telegraf 是一個用 Go 編寫的代理程序,是收集和報告指標和數據的代理。可收集系統和服務的統計數據,并寫入到 InfluxDB 數據庫。Telegraf 具有內存占用小的特點,通過插件系統開發人員可輕松添加支持其他服務的擴展。
Telegraf是TICK Stack的一部分,是一個插件驅動的服務器代理,用于收集和報告指標。
Telegraf 集成了直接從其運行的容器和系統中提取各種指標,事件和日志,從第三方API提取指標,甚至通過StatsD和Kafka消費者服務監聽指標。
它還具有輸出插件,可將指標發送到各種其他數據存儲,服務和消息隊列,包括InfluxDB,Graphite,OpenTSDB,Datadog,Librato,Kafka,MQTT,NSQ等等。
Telegraf作為數據采集模塊,需要安裝至被監控的目標主機上。Telegraf設計目標是較小的內存使用,通過插件來構建各種服務和第三方組件的metrics收集
Telegraf由4個獨立的插件驅動:
- Input Plugins
輸入插件,收集系統、服務、第三方組件的數據。 - Processor Plugins
處理插件,轉換、處理、過濾數據。 - Aggregator Plugins
聚合插件,數據特征聚合。 - Output Plugins
輸出插件,寫metrics數據。
在平臺監控系統中,可以使用 Telegraf 采集多種組件的運行信息,而不需要自己手寫腳本定時采集,大大降低數據獲取的難度;且 Telegraf 配置極為簡單,只要有基本的 Linux 基礎即可快速上手。Telegraf 按照時間序列采集數據,數據結構中包含時序信息,時序數據庫就是為此類數據設計而來,使用 Influxdb 可以針采集得到的數據完成各種分析計算操作。
telegraf接入influxdb
[root@node1 ~]# wget http://get.influxdb.org/telegraf/telegraf-0.11.1-1.x86_64.rpm [root@node1 ~]# rpm -ivh telegraf-0.11.1-1.x86_64.rpm [root@node1 ~]# systemctl start telegraf
[root@node1 ~]# vim /etc/telegraf/telegraf.conf
## 修改內容如下:
[agent]
## Default data collection interval for all inputs
interval = "10s"
[[inputs.cpu]]
## no configuration options required
[[inputs.mem]]
## no configuration options required
[[outputs.influxdb]] urls = ["http://localhost:8086"] # required database = "telegraf" # required retention_policy = "" precision = "s" timeout = "5s" username = "telegraf" password = "password" [root@node1 ~]# systemctl restart telegraf
docker-compose.yml
version: '3' services: telegraf: image: telegraf volumes: - ./telegraf.conf:/etc/telegraf/telegraf.conf:ro
添加各種設備(SNMP、HTTP、MQTT、OPC UA)并擴展Telegraf配置:
[[inputs.opcua]] name = "opcua" endpoint = "opc.tcp://opcua_server:4840/freeopcua/server/" security_policy = "None" security_mode = "None" auth_method = "Anonymous" nodes = [ { name = "status", namespace = "2", identifier_type = "i", identifier = "2" } ] [[inputs.http_response]] interval = "10s" name_override = "http_metrics" urls = ["http://http_api:8080/metrics"] method = "GET" response_timeout = "5s" data_format = "json" [[inputs.snmp]] agents = ["snmp_agent"] version = 2 community = "public" interval = "10s" timeout = "5s" [[inputs.snmp.field]] name = "cpu" oid = "1.3.6.1.4.1.2021.11.11.0" [[inputs.snmp.field]] name = "memory" oid = "1.3.6.1.4.1.2021.4.6.0" [[inputs.mqtt_consumer]] servers = ["tcp://mqtt_broker:1883"] topics = ["sensor/cpu/#", "sensor/mem/#"] data_format = "value" data_type = "float"
# 從RabbitMQ Management API獲取指標
[[inputs.rabbitmq]]
## RabbitMQ Management API 的URL
url = "http://localhost:15672"
username = "telegraf"
password = "your_secure_password"
## 要采集的指標類型
# 采集節點指標(內存、磁盤等)
collect = ["connections", "queues", "exchange", "node", "overview"]
## 可選:只監控特定的隊列,使用正則匹配
# queues_include = [".*"] # 監控所有隊列
# queues_include = ["important_queue", "task_.*"] # 監控指定隊列
[inputs.rabbitmq.tags]
environment = "staging"
source = "rabbitmq-cluster"
# 從Kafka主題消費消息
[[inputs.kafka_consumer]]
## Kafka Broker 地址
brokers = ["localhost:9092"]
## 要消費的主題列表
topics = ["app_metrics", "server_stats"]
## 消費者組ID,用于偏移量管理
consumer_group = "telegraf_consumers"
## 數據格式:Kafka消息是二進制的,需要告訴Telegraf如何解析
## 假設你的Kafka消息是JSON格式的InfluxDB Line Protocol
data_format = "influx"
## 如果消息是JSON格式,但不是Line Protocol,可以這樣解析:
# data_format = "json"
# json_query = "" # 如果JSON不是數組,需要指定一個查詢來定位數據
# tag_keys = ["host", "region"] # 指定哪些JSON字段作為Tag
# json_string_fields = ["message"] # 指定哪些字段作為String類型字段
## 可選:連接Kafka的認證信息(如果Kafka需要SASL/SSL)
# sasl_username = "kafka-user"
# sasl_password = "kafka-password"
[inputs.kafka_consumer.tags]
source = "kafka-cluster-1"
data_topic = "app_metrics"
# 從PostgreSQL服務器獲取指標
[[inputs.postgresql]]
## 指定連接地址,可以同時監控多個數據庫實例
address = "host=localhost user=telegraf password=your_secure_password sslmode=disable"
## 可選:指定要連接的數據庫,默認為 'postgres'
# databases = ["app_db", "postgres"]
## 要采集的指標
# 采集數據庫大小、表統計、連接數等
# 采集詳細的查詢統計(需要pg_stat_statements)
[[inputs.postgresql.query]]
query="SELECT * FROM pg_stat_database"
measurement="pg_stat_database"
[[inputs.postgresql.query]]
query="SELECT * FROM pg_stat_statements"
measurement="pg_stat_statements"
# 這個查詢可能返回大量數據,建議啟用標簽限制
# withdbname = false
# taglimit = 10
[inputs.postgresql.tags]
environment = "production"
source = "postgres-primary"
# 讀取MySQL的指標和統計信息
[[inputs.mysql]]
## 指定MySQL服務器連接地址,%s會被替換為下面的數據庫名
servers = ["tcp(127.0.0.1:3306)/"]
## 步驟1中創建的監控用戶和密碼
username = "telegraf"
password = "your_secure_password"
## 要采集的指標列表
# 采集全局狀態
metric_types = ["global_status", "innodb_metrics", "binary_logs", "table_schema", "user_statistics"]
# 可選:指定要監控的數據庫,如果為空則監控所有
# databases = ["app_db", "test_db"]
# 表架構指標采集的時間間隔(較慢,可以設置長一些)
interval_slow = "30m"
## 可選:添加標簽,便于在InfluxDB中篩選
[inputs.mysql.tags]
environment = "production"
source = "mysql-primary"
[[inputs.postgresql_extensible]]
address = "host=localhost user=telegraf password=your_secure_password dbname=your_database_name"
## 自定義查詢 1:訂單統計
[[inputs.postgresql_extensible.query]]
measurement = "order_metrics"
sql = """SELECT
COUNT(*) as total_orders,
SUM(amount) as daily_revenue,
COUNT(CASE WHEN status = 'pending' THEN 1 END) as pending_orders,
date_trunc('hour', created_at) as time
FROM orders
WHERE created_at >= NOW() - INTERVAL '1 hour'
GROUP BY time"""
# 將 'time' 字段作為時間戳
timestamp = "time"
## 自定義查詢 2:產品庫存監控
[[inputs.postgresql_extensible.query]]
measurement = "inventory"
sql = """SELECT
product_id,
product_name,
quantity_in_stock,
(quantity_in_stock < low_stock_threshold) as is_low_stock
FROM products"""
# 不指定 timestamp,使用 Telegraf 采集時間作為時間戳
[[inputs.mysql]]
servers = ["tcp(localhost:3306)/your_database_name"]
username = "telegraf"
password = "your_secure_password"
# 禁用默認的監控指標采集(可選,如果只想采集業務數據)
# metric_types = []
## 自定義指標查詢
[[inputs.mysql.metric_query]]
# 查詢名稱,會作為 measurement 名稱
measurement = "user_metrics"
# 自定義 SQL 查詢
query = """SELECT
COUNT(*) as total_users,
COUNT(CASE WHEN created_at >= CURDATE() THEN 1 END) as new_users_today,
DATE(created_at) as date
FROM users
GROUP BY DATE(created_at)"""
# 指定時間戳字段(可選)
# timestamp = "date"
[[outputs.influxdb_v2]] urls = ["http://influxdb:8086"] token = "replace-with-your-own-token" organization = "test-org" bucket = "metrics"
Telegraf 是 InfluxData 平臺的核心數據采集組件。它本身不是一個數據源,而是一個擁有超過200個插件的采集器,可以從海量的數據源中拉取或接收數據,然后寫入到 InfluxDB。
-
接入方式:通過配置 Telegraf 的 Input Plugins 和 Output Plugins。
-
支持的數據源(通過Telegraf):
-
系統指標:CPU、內存、磁盤、網絡、進程(通過
cpu,mem,disk,net等插件)。 -
數據庫:MySQL, PostgreSQL, MongoDB, Redis, Elasticsearch, SQL Server, Oracle 等。
-
消息隊列:Kafka, RabbitMQ, MQTT(也可作為輸入)。
-
云服務:AWS CloudWatch, Google Cloud Monitoring, Azure Monitor。
-
容器與編排:Docker, Kubernetes。
-
日志文件:通過
tail插件讀取日志文件。 -
API 數據:通過
http插件從任何提供 JSON/XML 等格式的 REST API 拉取數據。 -
網絡設備:通過
snmp插件采集網絡設備指標。 -
硬件傳感器:通過
sensors插件讀取主板傳感器數據。
-
| 特性 | Telegraf | Logstash |
|---|---|---|
| 核心定位 | 指標收集代理 | 日志處理管道 |
| 開發背景 | InfluxData(時間序列數據庫廠商) | Elastic(搜索和分析引擎廠商) |
| 架構設計 | 基于插件的代理,輕量級 | 基于 JVM 的完整處理管道 |
| 資源消耗 | 低內存(通常 10-50MB) | 高內存(通常 500MB-1GB+) |
| 性能 | 高吞吐,低延遲 | 中等吞吐,處理能力強 |
| 數據模型 | 主要為指標和時序數據 | 主要為日志和事件數據 |
| 配置復雜度 | 簡單直觀的 TOML 配置 | 靈活的 Ruby DSL 配置 |
Telegraf:
-
為時間序列數據而生,專門優化用于指標收集
-
與 InfluxDB 緊密集成,但支持多種輸出
-
"Batteries included" 理念 - 開箱即用
Logstash:
-
為日志處理而生,是 ELK/ELK Stack 的核心組件
-
強調數據的解析、轉換和豐富
-
"Pipeline" 理念 - 靈活可擴展的數據管道

浙公網安備 33010602011771號