apisix~kafka-logger插件
參考
- https://github.com/apache/apisix/issues/5719#issuecomment-2694062542
- https://lists.apache.org/thread/rls3s8kxkx83wl64z62rgbj1ch8wk8v1
作用
將http請求與響應(yīng)的內(nèi)容發(fā)到kafka的topic,以json的形式發(fā)送存儲
配置相關(guān)
- log_format為自定義配置字段,添加后,默認(rèn)的請求響應(yīng)消息將被覆蓋
- kafka連接串修改后,需要重新啟動apisix服務(wù)
- include_req_body 是否包含請求體
- include_resp_body 是否包含響應(yīng)體,默認(rèn)插件未實現(xiàn),需要自己實現(xiàn)它
- max_req_body_bytes 最大限制請求體大小,默認(rèn)524288(512K),
- max_resp_body_bytes 最大限制響應(yīng)體大小,默認(rèn)524288(512K),
{
"_meta": {
"disable": false
},
"batch_max_size": 1,
"brokers": [
{
"host": "192.168.10.132",
"port": 9091
},
{
"host": "192.168.10.133",
"port": 9092
},
{
"host": "192.168.10.134",
"port": 9097
}
],
"disable": false,
"include_req_body": false,
"include_resp_body": false,
"max_req_body_bytes": 524288,
"max_resp_body_bytes": 524288,
"kafka_topic": "apisix-logger-test",
"log_format": {
"accessPage": "$http_accessPage",
"accessSource": "$http_accessSource",
"api": "$request_uri",
"appVersion": "$http_appVersion",
"eventsId": "$http_eventsId",
"groupName": "$http_groupName",
"groupUserName": "$http_groupUserName",
"logIp": "$http_loginIp",
"loginType": "$http_loginType",
"method": "$request_method",
"requestId": "$http_requestId",
"sessionId": "$http_sessionId",
"status": "$status",
"uniqueDeviceIdentifier": "$http_uniqueDeviceIdentifier",
"userId": "$http_sub",
"userName": "$http_preferred_username"
},
"producer_type": "async",
"required_acks": 1
}
$開頭表示是系統(tǒng)變量$http_開頭的,表示是從請求頭里獲取- 如果獲取字段的內(nèi)容為空,則不會出現(xiàn)在消息體里
- kafka里存儲的消息為以下內(nèi)容
{
"now": "2024-09-10T02:01:22+00:00",
"route_id": "528226539821597458",
"api": "/pkulaw-chat-gpt-api-4/6.0.0.0/gpt/writer/ping",
}
上面配置,是直接在路由或者全局插件配置中,可以通過apisix-dashboard進(jìn)行配置,使用log_format對記錄的信息進(jìn)行了設(shè)置,我們不建議自定義log_format,因為默認(rèn)的就是最全的信息
不使用log_format,默認(rèn)的消息體內(nèi)容如下
[
{
"client_ip": "192.168.60.136",
"upstream": "10.42.4.236:8080",
"apisix_latency": 7.0000324249268,
"start_time": 1716198064095,
"latency": 16.000032424927,
"request": {
"uri": "/kpi/hello",
"method": "GET",
"size": 1757,
"url": "http://test-apisix.pkulaw.com:9080/kpi/hello",
"headers": {
"accept": "text/html,application/xhtml+xml,application/xml;q\u003d0.9,image/avif,image/webp,*/*;q\u003d0.8",
"connection": "close",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:125.0) Gecko/20100101 Firefox/125.0",
"cookie": "Hm_lvt_8266968662c086f34b2a3e2ae9014bf8\u003d1715752532; Hm_up_8266968662c086f34b2a3e2ae9014bf8",
"sec-fetch-dest": "document",
"upgrade-insecure-requests": "1",
"accept-language": "zh-CN,en;q\u003d0.8,en-US;q\u003d0.7,zh;q\u003d0.5,zh-TW;q\u003d0.3,zh-HK;q\u003d0.2",
"sec-fetch-mode": "navigate",
"accept-encoding": "gzip, deflate, br",
"sec-fetch-site": "none",
"sec-fetch-user": "?1",
"host": "test-apisix.pkulaw.com",
"x-forwarded-for": "12.19.143.194"
},
"querystring": {}
},
"response": {
"status": 200,
"headers": {
"server": "APISIX/3.8.0",
"content-type": "application/json",
"x-ratelimit-limit": "5",
"transfer-encoding": "chunked",
"connection": "close",
"date": "Mon, 20 May 2024 09:41:04 GMT",
"x-ratelimit-reset": "60",
"x-ratelimit-remaining": "4"
},
"size": 2064
},
"route_id": "513923429800346372",
"upstream_latency": 9,
"service_id": "",
"server": {
"version": "3.8.0",
"hostname": "apisix-78bcfb45c6-26746"
}
}
]
在log_format的同步添加擴(kuò)展字段
- 這需要修改kafka-logger的源碼了
- 獲取user-agent中的操作系統(tǒng),瀏覽器等信息,你可以添加下面的方法
local function parse_user_agent(user_agent)
local os, browser, version
-- 檢測操作系統(tǒng)
if user_agent:find("Windows") then
os = "Windows"
elseif user_agent:find("Macintosh") then
os = "Mac OS"
elseif user_agent:find("Linux") then
os = "Linux"
elseif user_agent:find("Android") then
os = "Android"
elseif user_agent:find("iPhone") then
os = "iOS"
end
-- 檢測瀏覽器
if user_agent:find("Chrome") then
browser = "Chrome"
version = user_agent:match("Chrome%/(%d+%.%d+)")
elseif user_agent:find("Firefox") then
browser = "Firefox"
version = user_agent:match("Firefox%/(%d+%.%d+)")
elseif user_agent:find("Safari") then
browser = "Safari"
version = user_agent:match("Version%/(%d+%.%d+)")
elseif user_agent:find("MSIE") then
browser = "Internet Explorer"
version = user_agent:match("MSIE (%d+%.%d+)")
elseif user_agent:find("Trident") then
browser = "Internet Explorer"
version = user_agent:match("rv:(%d+%.%d+)")
end
return {
os = os or "Unknown",
browser = browser or "Unknown",
version = version or "Unknown"
}
end
在kafka-logger的_M.log(conf, ctx)方法中,添加瀏覽器擴(kuò)展字段
function _M.log(conf, ctx)
local entry
if conf.meta_format == "origin" then
entry = log_util.get_req_original(ctx, conf)
else
entry = log_util.get_log_entry(plugin_name, conf, ctx)
end
-- 添加擴(kuò)展字段開始
local user_agent = ngx.var.http_user_agent
local info = parse_user_agent(user_agent)
entry.os = info.os
entry.browser=info.browser
entry.version=info.version
-- 添加擴(kuò)展字段結(jié)束
if batch_processor_manager:add_entry(conf, entry) then
return
end
...
記錄響應(yīng)體
在_M.log(conf, ctx)方法中方法代碼
-- 響應(yīng)體
if conf.include_resp_body then
local max_resp_body_bytes = conf.max_resp_body_bytes or MAX_RESP_BODY
local resp_body = ctx.resp_body
if #resp_body > max_resp_body_bytes then
resp_body = str_sub(resp_body, 1, max_resp_body_bytes)
end
entry.responseBody = resp_body
core.log.warn("response data:", resp_body)
end
浙公網(wǎng)安備 33010602011771號