温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Splunk中怎么利用restapi获取数据到第三方系统

发布时间:2021-07-28 17:51:11 来源:亿速云 阅读:135 作者:Leah 栏目:大数据

Splunk中怎么利用restapi获取数据到第三方系统,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。

测试

The same python implementation for curl function

'''

sid=`curl -u admin:changeme -k https://localhost:8089/services/search/jobs -d search="search source=\"http:hec_test\" refresh | head 21" 2>/dev/null | sed "1,2d" | sed "2d" | sed "s/.*>\([0-9]*\.[0-9]*\)<.*/\1/"`

echo $sid

curl -u admin:changeme -k https://localhost:8089/services/search/jobs/$sid?output_mode=json

curl -u admin:changeme -k https://localhost:8089/services/search/jobs/$sid/results/ --get -d output_mode=json 2>/dev/null >out.json

'''

基于Python3的封装

# coding=utf-8

import urllib

import httplib2

from xml.dom import minidom

import time

import json

import traceback

class SplunkQuery(object):

    def __init__(self):

        self.baseurl = 'https://IP:8089'

        self.userName = 'xxx'

        self.password = 'xxx'

        self.sessionKey = self.get_key()

    def get_key(self):

        session_key = ""

        try:

            server_content = httplib2.Http(disable_ssl_certificate_validation=True).request(

                self.baseurl + '/services/auth/login', 'POST', headers={}, body=urllib.parse.urlencode({'username': self.userName, 'password': self.password}))[1]

            session_key = minidom.parseString(server_content).getElementsByTagName('sessionKey')[

                0].childNodes[0].nodeValue

        except:

            # traceback.print_exc()

            pass

        return session_key

    def submit_job(self, search_query, earliest_time=None, latest_time=None):

        # check if the query has the search operator

        if not search_query.startswith('search'):

            search_query = 'search ' + search_query

        data = {'search': search_query}

        if earliest_time:

            data['earliest_time'] = earliest_time

        if latest_time:

            data['latest_time'] = latest_time

        sid_body = httplib2.Http(disable_ssl_certificate_validation=True) \

            .request(self.baseurl + '/services/search/jobs',

                    'POST',

                    headers={

                        'Authorization': 'Splunk %s' % self.sessionKey},

                    body=urllib.parse.urlencode(data))[1]

        sid = minidom.parseString(sid_body).getElementsByTagName("sid")[0].childNodes[0].nodeValue

        print("sid:" + sid)

        return sid

    def request_results(self, sid):

        start = time.time()

        response = httplib2.Http(disable_ssl_certificate_validation=True) \

            .request(self.baseurl + '/services/search/jobs/' + sid +

                    "?output_mode=json", 'POST',

                    headers={

                        'Authorization': 'Splunk %s' % self.sessionKey},

                    body=urllib.parse.urlencode({}))[1]

        data = json.loads(response)

        while not data["entry"][0]["content"]["isDone"]:

            time.sleep(0.1)

            response = httplib2.Http(disable_ssl_certificate_validation=True) \

                .request(self.baseurl + '/services/search/jobs/' + sid +

                        "?output_mode=json",

                        'POST',

                        headers={

                            'Authorization': 'Splunk %s' % self.sessionKey},

                        body=urllib.parse.urlencode({}))[1]

            data = json.loads(response)

        request_time = time.time() - start

        print("result event count:", data["entry"][0]["content"]["eventCount"], "request time:", request_time)

        result_response = httplib2.Http(disable_ssl_certificate_validation=True) \

            .request(self.baseurl + '/services/search/jobs/' + sid + "/results?count=0",

                    'GET',

                    headers={

                        'Authorization': 'Splunk %s' % self.sessionKey},

                    body=urllib.parse.urlencode({"output_mode": "json"}))[1]

        results = json.loads(result_response)["results"]

        print(len(results))

        # assert data["entry"][0]["content"]["eventCount"] == len(results)

        end = time.time()

        print("result count:", len(results), "result request time:", end - start)

        # response = httplib2.Http(disable_ssl_certificate_validation=True) \

        #    .request(self.baseurl + '/services/search/jobs/' + sid +

        #              "?output_mode=json", 'DELETE',

        #              headers={

        #                  'Authorization': 'Splunk %s' % self.sessionKey},

        #              body=urllib.parse.urlencode({}))[1]

        return results

    def run(self, searchQuery, earliest_time=None, latest_time=None):

        start = time.time()

        sid = self.submit_job(searchQuery, earliest_time)

        result = self.request_results(sid)

        end = time.time()

        print("search time:", end - start)

        return result

调用

print(">>>>>>>>>>>>>>>>SplunkQuery>>>>>>>>>>>>>>>>>>>>>")

Q = SplunkQuery()

result = Q.run(searchQuery='''index=xx sourcetype=xx''')

print(result[0])

result = Q.run(searchQuery='''index=xx sourcetype=xx''', earliest_time="2020-06-19T12:00:00")

print(result[5])

看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注亿速云行业资讯频道,感谢您对亿速云的支持。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI