您的位置:首页 > 编程语言 > Python开发

Splunk二次开发使用Python 编写自定义搜索命令

2018-03-31 21:56 746 查看

前言

本文的目标是让读者对Splunk编写自定义搜索命令有个基本的概念,并不是详尽的开发指南。

自定义搜索命令简介

Splunk Spl语言是将Splunk的一系列搜索命令组织成数据处理的管道,如下图所示,提供了140多种搜索命令,基本覆盖了日常对数据处理的各种场景。其中search命令是SPL语言的默认命令,可以不明确的写在语句中。
 


 
但是在日常使用的过程中,有很多特殊的应用场景中,我们需要根据业务逻辑对数据进行处理,那么就需要一种机制,能够灵活的添加新的命令,满足业务需求,如下图所示,GOCRAZY命令就是我们编写的自定义搜索命令。

 

自定义搜索命令的执行原理

SPL的执行过程

将SPL查询语句分解为命令


分析语句中是否存在自定义搜索命令


获取外部资源路径


外部进程解析自定义命令参数


使用外部命令处理数据


自定义搜索命令的抽象概念

能够编写新的搜索命令,扩展SPL语言,实现新的数据分析。
能够使我们对搜索结果进行处理,例如接收csv结构的数据,进过处理之后,输出csv格式的数据。
实现为外部进程(一个我们编写的程序),最典型的是编写python脚本。

自定义搜索命令的底层细节

Splunkd和外部进程的交互协议

因为自定义搜索命令运行在外部进程中,所以splunkd需要与外部进程之间进行交互,splunk定义了两种交互协议:
协议V1,历史遗留协议,始于Splunk3.0开始,主要被InterSplunk.py使用。当搜索结果中的数据量非常大的时候,需要拆分成一批一批的被自定义命令处理。协议V1会对每一批数据启动一次外部进程,效率较低。
转换命令(事件命令、报表命令)的事件上限是50,000。"Transforming" commands limited to 50,000 events

协议V2,始于Splunk6.3版本中添加的新协议,主要在Python SDK中使用。两种协议都是用std in/std out进行交互。
协议V2只启动外部进行一次,每一批数据会一次发送给外部进程进行处理,只需要在commands.conf中设置chunked=true即可。
可以支持非python平台开发的程序作为自定义命令。

功能对比

 
协议
API
性能
扩展性
简单配置(配chunk=true)
是否支持其他编程语言实现
编程语言
V1
Intersplunk.py,
Python SDK




Python
V2
Python SDK



支持
Python,
任意实现了协议的二进制程序
 

性能对比


 如图中所示,使用Splunk原生命令,V2协议、V1协议实现的自定义命令,对250W数据进行处理所消耗的事件,单位是秒。使用V2协议实现的自定义命令的效率略低于使用Splunk原生命令,使用V1协议明显效率降低非常多。

搜索命令协议V2

Splunk搜索命令V2是调用响应的执行模式,即splunkd调用命令,发送数据,外部进程发挥处理结果。从技术细节上讲,splunkd和外部进程之间使用了简单的双向传输协议:
协议头是ASCII编码。
协议元数据部分使用的是JSON结构。
数据体部分使用的是CSV结构。
每一个seach都首先执行getinfo命令,然后执行execute处理结果。
Splunkd与外部进程之间通讯数据块数据结构
 

 

协议V2示例:GOCRAZY

 

 

交互过程

 

 
Splunkd首先启动外部进行,获取外部进程命令的相关信息,然后将搜索结果集拆分成一批一批的发送给外部进行处理,处理完一批将结果返回给Splunkd,Splunkd发送下一批数据,如此往复,直到搜索结果集处理完毕,销毁外部进程。

getInfo命令

splunkd发送的getInfo命令的Metadata

命令参数
完整的SPL查询字符串
执行上下文(app,用户)
搜索sid
splunkd URI和授权令牌(用于制作REST请求)

自定义命令返回的Metadata

搜索命令的类型(流式命令、有状态流式命令、事件命令、报表命令等)Type of search command (streaming/stateful/reporting/etc.)
splunkd需要提取那些字段(必须提取的)Which fields splunkd should extract (required fields)
是否生成结果(必须是search中的第一个命令)Whether or not it generates results (e.g. must be first search command)

getInfo获取的Metadata示例

{  
    "action": "getinfo",  
    "streaming_command_will_restart": false,  
    "searchinfo": {  
        "earliest_time": "0",  
        "raw_args": [  
            "LinearRegression", "petal_length", "from", "petal_width"  
        ],  
        "session_key": "...",  
        "maxresultrows": 50000,  
        "args": [  
            "LinearRegression", "petal_length", "from", "petal_width"  
        ],  
        "dispatch_dir": "/Users/jleverich/builds/conf_mlapp_demo/var/run/splunk/dispatch/1475007525.265",  
        "command": "fit",  
        "latest_time": "0",  
        "sid": "1475007525.265",  
        "splunk_version": "6.5.0",  
        "username": "admin",  
        "search": "| inputlookup iris.csv | fit LinearRegression petal_length from petal_width",  
        "splunkd_uri": "https://127.0.0.1:8090",  
        "owner": "admin",  
        "app": "Splunk_ML_Toolkit"  
    },  
    "preview": false  

搜索命令类型

流式命令

流式命令只能一条一条的处理,不能记录全局的状态。
不能也不要对搜索结果进行排序。
可以在indexer上并行执行。
例如:eval、where、rex等命令
流式命令示例

 

有状态流式命令

只能一条一条的处理,可以记录全局的状态。
不能也不要对搜索结果进行排序。
可以在只能在SearchHead上执行。
例如:accum 、streamstats 、dedup等命令。
有状态流式命令示例:


 

事件命令

把搜索结果视为一个整体进行处理
可能会重新排列搜索结果
通常维护每个事件中的所有字段,特别是:_raw,_time,index,sourcetype,source,host
只运行在Search Heard上。
可能会重复执行多次,因为Preview功能。
例如:sort 、eventstats等命令。
事件命令示例:

 

报表命令

把搜索结果视为一个整体进行处理,典型的场景是对数据做统计分析。
只运行在Search Heard上。
可能会重复执行多次,因为Preview功能。
在界面上,结果会展现在"Statistics"标签中。
例如:stats 、timechart 、 transpose等命令
报表命令示例
注意事项:
事件命令和报表命令,将搜索结果集视为一个整体进行处理,而结果集中可能存在数百万条记录,此时使用实现自定义的流式或者有状态的流式命令可能是更好的选择。

编写自定义搜索命令

创建一个App


将Splunk的Python SDK部署到bin目录中cd $PLUNK_HOME/etc/apps/MyNewApp/bin
pip install -t . splunk-sdk
注释:pip 是一个现代的,通用的 Python 包管理工具。提供了对 Python 包的查找、下载、安装、卸载的功能。

为自定义搜索命令编写一个脚本$SPLUNK_HOME/etc/apps/MyNewApp/bin/foobar.py
import sys  
from splunklib.searchcommands import dispatch, StreamingCommand, Configuration  
@Configuration()  
class FoobarCommand(StreamingCommand):  
    def stream(self, records):  
        for record in records:  
            record['foo'] = 'bar'  
            yield record  
            if __name__ == "__main__":  
                dispatch(FoobarCommand, sys.argv, sys.stdin, sys.stdout, __name__)  

在commands.conf中注册自定义搜索命令$SPLUNK_HOME/etc/apps/MyNewApp/default/commands.conf
[foobar] chunked=true   
# filename=foobar.py   
## <--- optional  

重新启动Splunk Enterprise$PLUNK_HOME/bin/splunk restart
(可选)将命令导出到其他App 


 


 
 


 

自定义命令示例

流式命令示例Python代码片段

$SPLUNK_HOME/etc/apps/MyNewApp/bin/exstream.py
import sys  
from splunklib.searchcommands import dispatch, StreamingCommand, Configuration  
@Configuration()  
class ExStreamCommand(StreamingCommand):  
    def stream(self, records):  
        for record in records:  
            record['foo'] = 'bar'  
            yield record  
            if __name__ == "__main__":  
                dispatch(ExStreamCommand, sys.argv, sys.stdin, sys.stdout, __name__) 

有状态流式命令示例Python代码片段

$SPLUNK_HOME/etc/apps/MyNewApp/bin/exstateful.py
import sys  
from splunklib.searchcommands import dispatch, StreamingCommand, Configuration  
@Configuration(local=True)  
class ExStatefulCommand(StreamingCommand):  
    def stream(self, records):  
        for record in records:  
            record['foo'] = 'bar'  
            yield record  
            if __name__ == "__main__":  
                dispatch(ExStatefulCommand, sys.argv, sys.stdin, sys.stdout, __name__)

事件命令示例Python代码片段

$SPLUNK_HOME/etc/apps/MyNewApp/bin/exevent.py
import sys  
from splunklib.searchcommands import dispatch, EventingCommand, Configuration  
@Configuration()  
class ExEventsCommand(EventingCommand):  
    def transform(self, records):  
        l = list(records)  
        l.sort(key=lambda r: r['_raw'])  
        return l  
        if __name__ == "__main__":  
            dispatch(ExEventsCommand, sys.argv, sys.stdin, sys.stdout, __name__)  

报表命令示例Python代码片段

$SPLUNK_HOME/etc/apps/MyNewApp/bin/exreport.py
import sys
from splunklib.searchcommands import dispatch, ReportingCommand, Configuration  
@Configuration()  
class ExReportCommand(ReportingCommand):  
    @Configuration()  
    def map(self, records):  
        return records  
    def reduce(self, records):  
        count = 0  
        for r in records:  
              count += 1  
             return [{'count': count}]  
         if __name__ == "__main__":  
              dispatch(ExReportCommand, sys.argv, sys.stdin, sys.stdout, __name__)  

其他资源

Splunk开发者官方文档
Splunk Python SDK源码
本文中的Python中的示例代码
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐