一、 問題:在使用Splunk做可視化面板【關系圖】和【桑基圖】的時候,想要過濾掉與某個節點不相關的數據,比如:
查看與src=110.68.0.178相關的數據的關系圖:
測試數據:src-dest.csv
| src | dest | count |
| 107.159.0.32 | 157.28.151.20 | 81 |
| 110.68.0.178 | 157.28.19.184 | 69 |
| 110.68.0.178 | 157.28.151.50 | 172 |
| 152.115.104.104 | 121.138.108.76 | 234 |
| 152.115.104.104 | 28.185.93.35 | 120 |
| 112.72.128.231 | 157.28.151.50 | 175 |
| 118.234.0.179 | 157.28.19.184 | 85 |
| 118.234.0.179 | 157.28.151.50 | 171 |
| 119.10.184.222 | 157.28.12.66 | 175 |
| 28.0.64.67 | 157.28.12.66 | 173 |
| 125.38.92.28 | 11.19.51.10 | 197 |
| 125.38.92.28 | 116.195.40.100 | 115 |
| 138.214.0.44 | 157.28.151.20 | 346 |
| 14.194.0.151 | 157.28.12.5 | 430 |
| 155.35.233.233 | 157.28.151.60 | 171 |
| 160.243.0.39 | 157.28.12.66 | 246 |
| 183.91.192.126 | 157.28.14.50 | 411 |
| 183.91.192.126 | 157.28.151.50 | 510 |
| 195.163.79.25 | 121.138.108.81 | 36 |
關系圖:

桑基圖:

二 、方法:
使用 自定義命令 diagramsinge
SPL搜索語句:| inputlookup src-dest.csv |stats sum(count) as total by src dest | diagramsinge src_field=src dest_field=dest value=110.68.0.178 iter_mode=all
三 、結果:


四、實現過程
a、編寫命令腳本diagramsinge.py
路徑 $SPLUNK_HOME/etc/apps/{app_name}/bin
腳本
#!/usr/bin/env python # coding=utf-8 # # Copyright ? 2011-2015 Splunk, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"): you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. from __future__ import absolute_import, division, print_function, unicode_literals # import app import sys import os import logging as logger #logger.basicConfig(level=logger.INFO, format='%(asctime)s %(levelname)s %(message)s', # filename=os.path.join('/opt/splunk/', 'test.log'), filemode='a') from splunklib.searchcommands import dispatch, EventingCommand, Configuration, Option, validators from splunklib import six def formattsinge(sourcedata, filterarray, max_order, results, src, dest,iter_mode): filterarray2 = [] sourcedata2 = [] unsourcedata2 = {} limit_bool = True if max_order <= -1 or max_order > 0 else False if filterarray and sourcedata and limit_bool: for row in sourcedata: haved_bool = True for filter in filterarray: if haved_bool: if iter_mode == 'src' or iter_mode == 'all': if filter == row[src]: filterarray2.append(row[dest]) results.append(row) unsourcedata2[row[src] + row[dest]] = row haved_bool = False if iter_mode == 'dest' or iter_mode == 'all': if filter == row[dest]: filterarray2.append(row[src]) results.append(row) unsourcedata2[row[src] + row[dest]] = row haved_bool = False max_order -= 1 unsourcedata2_keys = unsourcedata2.keys() for row1 in sourcedata: if row1[src] + row1[dest] not in unsourcedata2_keys: sourcedata2.append(row1) formattsinge(sourcedata2, filterarray2, max_order, results, src, dest,iter_mode) else: return results return results def formattsinge_sort(sourcedata, filterarray, max_order, results, src, dest, sort_field,iter_mode): filterarray2 = [] sourcedata2 = [] unsourcedata2 = {} limit_bool = True if max_order == -1 or max_order > 0 else False if filterarray and sourcedata and limit_bool and len(filterarray) < max_order: for row in sourcedata: haved_bool = True for filter in filterarray: if haved_bool: filters = filter.split('_') if len(filter.split('_')) > 1 else [filter, 0] if iter_mode == 'src' or iter_mode == 'all': if filters[0] == row[src] and filters[1] < row[sort_field]: filterarray2.append(row[dest] + '_' + row[sort_field]) results.append(row) unsourcedata2[row[src] + row[dest]] = row haved_bool = False if iter_mode == 'dest' or iter_mode == 'all': if filters[0] == row[dest] and filters[1] > row[sort_field]: filterarray2.append(row[src] + '_' + row[sort_field]) results.append(row) unsourcedata2[row[src] + row[dest]] = row haved_bool = False max_order -= 1 unsourcedata2_keys = unsourcedata2.keys() for row1 in sourcedata: if row1[src] + row1[dest] not in unsourcedata2_keys: sourcedata2.append(row1) formattsinge_sort(sourcedata2, filterarray2, max_order, results, src, dest, sort_field,iter_mode) else: return results return results @Configuration() class DiagramSingeCommand(EventingCommand): # self.logger.debug("command start run ...") limit = Option(require=False, validate=validators.Integer()) src_field = Option(require=True, validate=validators.Fieldname()) dest_field = Option(require=True, validate=validators.Fieldname()) sort_field = Option(require=False, validate=validators.Fieldname()) value = Option(require=True) iter_mode = Option(require=False) def transform(self, records): self.logger.debug('CountMatchesCommand: %s', records) # logs command line src_field = self.src_field dest_field = self.dest_field sort_field = self.sort_field value = [str(self.value)] results_default = [] iter_mode = self.iter_mode if self.iter_mode and self.iter_mode in ['src', 'dest', 'all'] else 'src' limit = self.limit if self.limit else 1000 records_list = [] for r in records: records_list.append(r) if sort_field: records = formattsinge_sort(records_list, value, limit, results_default, src_field, dest_field,sort_field,iter_mode) else : records = formattsinge(records_list, value, limit, results_default, src_field, dest_field,iter_mode) for record in records: yield record dispatch(DiagramSingeCommand, sys.argv, sys.stdin, sys.stdout, __name__)
b、配置文件:commands.conf
路徑: $SPLUNK_HOME/etc/apps/{app_name}/local
[diagramsinge]
filename = diagramsinge.py
supports_getinfo = false
supports_rawargs = true
chunked = true
c、配置文件:searchbnf.conf
路徑: $SPLUNK_HOME/etc/apps/{app_name}/local
[diagramsinge-command] syntax = diagramsinge src_field=<field_name> dest_field=<field_name> value=<value> limit=<int> sort_field=<field_name> iter_mode=<mode> alias = shortdesc = 格式化統計結果. description = \ 追溯與指定源關聯的結果 comment1 = \ 運行此命令后會修改事件告警狀態. example1 = | stats count by src dest \ | diagramsinge src_field="src" dest_field="dest" value="127.0.0.1" appears-in = 1.5 maintainer = dnoble usage = public tags = stats
命令注解*
名稱:diagramsinge
描述:diagramsinge命令類型為Dataset processing,Dataset processing命令要求整個數據集就位,然后命令才能運行。這些命令不進行轉換,不進行分發,不進行流傳輸并且不進行編排。diagramsinge通過對輸入的數據及給定的參數對數據進行過濾。
用例:| inputlookup src-dest.csv |stats sum(count) as total by src dest | diagramsinge src_field=src dest_field=dest value=110.68.0.178 iter_mode=all
起始節點:src,目的節點:dest,搜索條件為:110.68.0.178,模式:all,查詢所有與110.68.0.178有直接或間接關聯的數據。
| inputlookup src-dest.csv |stats sum(count) as total max(_time) as time by src dest | diagramsinge src_field=src dest_field=dest value=110.68.0.178 iter_mode=src sort_field=time
起始節點:src,目的節點:dest,搜索條件為:110.68.0.178,模式:src,排序:time,查詢所有src=110.68.0.178的數據,src=110.68.0.178時對應的dest作為src,以此類推。
參數:src_field
【字段名稱】 指定起始節點的字段名
dest_field
【字段名稱】 指定目的節點的字段名
value
【字段值】 指定關注的節點值
iter_mode
src (默認)將關注的節點值作為起始節點,并將其對應的目的節點作為起始節點,以此類推。
dest 將關注的節點值作為目的節點,并將其對應的起始節點作為目的節點,以此類推。
all 將關注的節點值作為起始節點和目的節點,并將其對應的節點作為起始節點及目的節點,以此類推。
sort_field
【字段名稱】(測試)依據指定字段的值對結果排序,以使每個鏈路在時間上連續,注意會排除時續不一致的結果。
limit
【數值】(默認1000)限制節點計算次數。
使用條件:在自定義diagramsinge,需要有列出節點字段
| stats count by src dest
或者
| table src dest count
參考:
https://dev.splunk.com/enterprise/docs/devtools/customsearchcommands/
https://docs.splunk.com/Documentation/Splunk/latest/Search/Typesofcommands
https://github.com/splunk/splunk-sdk-python/tree/master/examples/searchcommands_template/bin
浙公網安備 33010602011771號