背景
現網上線一個統計分析業務,需要從ES中存儲的話單數據,按地市、賬號進行聚合統計,取出每個地市中流量TOP10W的用戶清單及其對應的流量等其他信息。查詢語句如下:
{
"query": {
"bool": {
"must": [
{
"term": {
"days": "2025-01-19"
}
},
{
"term": {
"bigcitycode": "1"
}
}
]
}
},
"track_total_hits": true,
"size": 0,
"aggs": {
"top_data": {
"multi_terms": {
"terms": [
{
"field": "loginname"
},
{
"field": "class"
},
{
"field": "nasipaddress"
},
{
"field": "ipattribute"
}
],
"size": "100000",
"order": {
"upFlow": "desc"
}
},
"aggs": {
"upFlow": {
"sum": {
"field": "acctinputoctets"
}
},
"downFlow": {
"sum": {
"field": "acctoutputoctets"
}
},
"sessiontimes": {
"sum": {
"field": "sessiontime"
}
}
}
}
}
}
問題
運行一段時間后,發現執行該統計任務時報錯,觸發ES熔斷器,執行其他的聚合查詢操作也同樣觸發熔斷器報錯,影響業務運行。故采用重啟ES節點的方式來臨時解決問題。
應用側報錯如下:
======2024-07-31 11:06:12.691 [RuoyiScheduler_Worker-1] ERROR com.ruoyi.quartz.util.AbstractQuartzJob - [execute,50] - 任務執行異常 - :
java.lang.reflect.InvocationTargetException: null
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.ruoyi.quartz.util.JobInvokeUtil.invokeMethod(JobInvokeUtil.java:61)
at com.ruoyi.quartz.util.JobInvokeUtil.invokeMethod(JobInvokeUtil.java:33)
at com.ruoyi.quartz.util.QuartzDisallowConcurrentExecution.doExecute(QuartzDisallowConcurrentExecution.java:19)
at com.ruoyi.quartz.util.AbstractQuartzJob.execute(AbstractQuartzJob.java:44)
at org.quartz.core.JobRunShell.run(JobRunShell.java:202)
at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573)
Caused by: org.elasticsearch.client.ResponseException: method [GET], host [http://192.168.100.100:9092], URI [/dailylog/_search?pretty], status line [HTTP/1.1 500 Internal Server Error]
{
"error" : {
"root_cause" : [
{
"type" : "task_cancelled_exception",
"reason" : "cancelled"
},
{
"type" : "task_cancelled_exception",
"reason" : "The parent task was cancelled, shouldn't start any child tasks"
}
],
"type" : "search_phase_execution_exception",
"reason" : "",
"phase" : "fetch",
"grouped" : true,
"failed_shards" : [
{
"shard" : 0,
"index" : ".ds-dailylog-2024.07.30-000642",
"node" : "NNrViOwRQ868_LJEodD5bg",
"reason" : {
"type" : "task_cancelled_exception",
"reason" : "cancelled"
}
},
{
"shard" : 1,
"index" : ".ds-dailylog-2024.07.30-000642",
"node" : "uwaJwxb9RoS1uCKjmubGtQ",
"reason" : {
"type" : "transport_exception",
"reason" : "failure to send",
"caused_by" : {
"type" : "task_cancelled_exception",
"reason" : "The parent task was cancelled, shouldn't start any child tasks"
}
}
}
],
"caused_by" : {
"type" : "circuit_breaking_exception",
"reason" : "[request] Data too large, data for [<reduce_aggs>] would be [9021524103/8.4gb], which is larger than the limit of [9019431321/8.3gb]",
"bytes_wanted" : 9021524103,
"bytes_limit" : 9019431321,
"durability" : "TRANSIENT"
}
},
"status" : 500
}
at org.elasticsearch.client.RestClient.convertResponse(RestClient.java:326)
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:296)
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:270)
at com.ruoyi.quartz.task.analysisTask.topFlowAnalyze(analysisTask.java:242)
at com.ruoyi.quartz.task.analysisTask$$FastClassBySpringCGLIB$$ae8c1db7.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
at org.springframework.aop.aspectj.MethodInvocationProceedingJoinPoint.proceed(MethodInvocationProceedingJoinPoint.java:88)
at com.ruoyi.framework.aspectj.DataSourceAspect.around(DataSourceAspect.java:49)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethodWithGivenArgs(AbstractAspectJAdvice.java:644)
at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:633)
at org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:70)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:175)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:95)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691)
at com.ruoyi.quartz.task.analysisTask$$EnhancerBySpringCGLIB$$fc8fd480.topFlowAnalyze(<generated>)
... 10 common frames omitted
ES側報錯日志如下:
[2024-07-31T11:06:08,908][WARN ][o.e.i.b.request ] [es-node-1] [request] New used memory 9046598667 [8.4gb] for data of [<reused_arrays>] would be larger than configured breaker: 9019431321 [8.3gb], breaking
[2024-07-31T11:06:09,010][WARN ][o.e.i.b.request ] [es-node-1] [request] New used memory 9021524103 [8.4gb] for data of [<reduce_aggs>] would be larger than configured breaker: 9019431321 [8.3gb], breaking
[2024-07-31T11:06:12,656][WARN ][r.suppressed ] [es-node-1] path: /dailylog/_search, params: {pretty=, index=dailylog}
org.elasticsearch.action.search.SearchPhaseExecutionException:
at org.elasticsearch.action.search.AbstractSearchAsyncAction.onPhaseFailure(AbstractSearchAsyncAction.java:661) [elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.action.search.FetchSearchPhase$1.onFailure(FetchSearchPhase.java:89) [elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:28) [elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.common.util.concurrent.TimedRunnable.doRun(TimedRunnable.java:33) [elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:732) [elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:26) [elasticsearch-7.13.0.jar:7.13.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) [?:?]
at java.lang.Thread.run(Thread.java:831) [?:?]
Caused by: org.elasticsearch.common.breaker.CircuitBreakingException: [request] Data too large, data for [<reduce_aggs>] would be [9021524103/8.4gb], which is larger than the limit of [9019431321/8.3gb]
at org.elasticsearch.common.breaker.ChildMemoryCircuitBreaker.circuitBreak(ChildMemoryCircuitBreaker.java:65) ~[elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.common.breaker.ChildMemoryCircuitBreaker.limit(ChildMemoryCircuitBreaker.java:137) ~[elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.common.breaker.ChildMemoryCircuitBreaker.addEstimateBytesAndMaybeBreak(ChildMemoryCircuitBreaker.java:92) ~[elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.action.search.QueryPhaseResultConsumer$PendingMerges.addEstimateAndMaybeBreak(QueryPhaseResultConsumer.java:272) ~[elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.action.search.QueryPhaseResultConsumer$PendingMerges.consume(QueryPhaseResultConsumer.java:311) ~[elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.action.search.QueryPhaseResultConsumer.consumeResult(QueryPhaseResultConsumer.java:110) ~[elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.action.search.AbstractSearchAsyncAction.onShardResult(AbstractSearchAsyncAction.java:551) [elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.action.search.SearchQueryThenFetchAsyncAction.onShardResult(SearchQueryThenFetchAsyncAction.java:99) ~[elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.action.search.AbstractSearchAsyncAction$1.innerOnResponse(AbstractSearchAsyncAction.java:305) ~[elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.action.search.SearchActionListener.onResponse(SearchActionListener.java:34) ~[elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.action.search.SearchActionListener.onResponse(SearchActionListener.java:18) ~[elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.action.search.SearchExecutionStatsCollector.onResponse(SearchExecutionStatsCollector.java:56) ~[elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.action.search.SearchExecutionStatsCollector.onResponse(SearchExecutionStatsCollector.java:25) ~[elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.action.ActionListenerResponseHandler.handleResponse(ActionListenerResponseHandler.java:43) ~[elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.action.search.SearchTransportService$ConnectionCountingHandler.handleResponse(SearchTransportService.java:391) ~[elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.transport.TransportService$5.handleResponse(TransportService.java:732) ~[elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.transport.TransportService$ContextRestoreResponseHandler.handleResponse(TransportService.java:1273) ~[elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.transport.InboundHandler.doHandleResponse(InboundHandler.java:291) ~[elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.transport.InboundHandler.handleResponse(InboundHandler.java:275) ~[elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.transport.InboundHandler.messageReceived(InboundHandler.java:128) ~[elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.transport.InboundHandler.inboundMessage(InboundHandler.java:84) ~[elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.transport.TcpTransport.inboundMessage(TcpTransport.java:693) ~[elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.transport.InboundPipeline.forwardFragments(InboundPipeline.java:129) ~[elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.transport.InboundPipeline.doHandleBytes(InboundPipeline.java:104) ~[elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.transport.InboundPipeline.handleBytes(InboundPipeline.java:69) ~[elasticsearch-7.13.0.jar:7.13.0]
at org.elasticsearch.transport.netty4.Netty4MessageChannelHandler.channelRead(Netty4MessageChannelHandler.java:63) ~[?:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[?:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[?:?]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[?:?]
at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:271) ~[?:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[?:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[?:?]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[?:?]
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1518) ~[?:?]
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1267) ~[?:?]
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1314) ~[?:?]
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501) ~[?:?]
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440) ~[?:?]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276) ~[?:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[?:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[?:?]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[?:?]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[?:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[?:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[?:?]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[?:?]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) ~[?:?]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) ~[?:?]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:615) ~[?:?]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:578) ~[?:?]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[?:?]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[?:?]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[?:?]
分析處理
從上面的報錯來看似乎是跑統計分析的時候需要使用的內存不夠,觸發了熔斷器,導致查詢失敗。
第一次處理
措施:拆分統計查詢處理請求,將原來一次性查詢改為分地市分批次查詢,減少每次查詢請求的數據量。
結果:上線更新后運行一段時間仍然出現熔斷器報錯問題。
第二次處理
措施:增加服務器內存,增加ES進程內存配置。
結果:擴容后運行大概2周后再次出現熔斷器錯誤,相比擴容前時間延后了幾天而已。
根本原因分析
經過前兩次的處理均沒有有效解決問題,遂進行根本原因分析。使用如下語句查看ES節點的cache使用量均均不大,且沒有上漲趨勢。
GET /_cat/nodes?v&h=name,queryCacheMemory,fielddataMemory,requestCacheMemory,requestCacheHitC
name queryCacheMemory fielddataMemory requestCacheMemory
es-node-1 2.3mb 0b 10.9kb
es-node-2 991.3mb 233.1mb 241.8mb
es-node-3 557.3mb 155.6mb 181mb
分析過程中發現熔斷器有一個預估值指標,使用如下語句查看
GET /_nodes/stats/breaker?human
"breakers" : {
"request" : {
"limit_size_in_bytes" : 15461882265,
"limit_size" : "14.3gb",
"estimated_size_in_bytes" : 6426516720,
"estimated_size" : "5.9gb",
"overhead" : 1.0,
"tripped" : 0
},
"fielddata" : {
"limit_size_in_bytes" : 10307921510,
"limit_size" : "9.5gb",
"estimated_size_in_bytes" : 244509384,
"estimated_size" : "233.1mb",
"overhead" : 1.03,
"tripped" : 0
},
"in_flight_requests" : {
"limit_size_in_bytes" : 25769803776,
"limit_size" : "24gb",
"estimated_size_in_bytes" : 68322,
"estimated_size" : "66.7kb",
"overhead" : 2.0,
"tripped" : 0
},
"model_inference" : {
"limit_size_in_bytes" : 12884901888,
"limit_size" : "12gb",
"estimated_size_in_bytes" : 0,
"estimated_size" : "0b",
"overhead" : 1.0,
"tripped" : 0
},
"accounting" : {
"limit_size_in_bytes" : 25769803776,
"limit_size" : "24gb",
"estimated_size_in_bytes" : 3447342,
"estimated_size" : "3.2mb",
"overhead" : 1.0,
"tripped" : 0
},
"parent" : {
"limit_size_in_bytes" : 24481313587,
"limit_size" : "22.7gb",
"estimated_size_in_bytes" : 14780020624,
"estimated_size" : "13.7gb",
"overhead" : 1.0,
"tripped" : 0
}
}
其中 estimated_size 為每類熔斷器的預估使用值,該值與限制的最大值會有一定差異,根據資料分析該預估值若與最大值較接近了可能導致熔斷器報錯,固增加了對該值得監控。
后續通過持續的監控采集,發現 request 熔斷器的預估內存每次在業務統計分析執行后均會上漲且不回落,最終引發熔斷器報錯。
解決辦法
目前確定是因為統計分析任務導致ES的Request熔斷器預估內存上漲引發的報錯,但是不確定是不是ES的某種機制或者BUG導致任務執行完成后預估內存依然不回降。
方案一
- 措施:采用Composite分批查詢方式,從ES取出某地市的完整數據,在本地進行全量排序取TOP值。
- 優點:減少單次查詢的數據量,降低ES內存壓力。
- 缺點:需要在本地處理大量數據,可能增加本地計算壓力。
方案二
- 措施:將
multi_terms聚合方式調整為嵌套聚合,如下:
{
"query": {
"bool": {
"must": [
{
"term": {
"days": "2024-11-04"
}
},
{
"term": {
"bigcitycode": "1"
}
}
]
}
},
"track_total_hits": false,
"size": 0,
"aggs": {
"top_data": {
"terms": {
"field": "loginname",
"size": 100000,
"order": {
"upFlow": "desc"
}
},
"aggs": {
"upFlow": {
"sum": {
"field": "acctinputoctets"
}
},
"downFlow": {
"sum": {
"field": "acctoutputoctets"
}
},
"sessiontimes": {
"sum": {
"field": "sessiontime"
}
},
"basip":{
"top_hits": {
"size": 1,
"_source": ["nasipaddress"]
}
}
}
}
}
}
- 優點:雖然一次性取出10W條數據,但預估內存值在執行完成后可回歸正常水平。
- 結果:采用此方案后,問題得到有效解決。
后續思考
若生產中遇到需要對大量數據進行聚合分析但不需要全局排序的場景,仍然優先考慮采用Composite分批查詢的方式。
浙公網安備 33010602011771號