温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Python脚本删除查询出来的数据进

发布时间:2020-05-28 17:03:58 来源:亿速云 阅读:274 作者:鸽子 栏目:系统运维

需求背景

业务系统将各类的报表和统计数据存放于ES中,由于历史原因,系统每天均以全量方式进行统计,随着时间的推移,ES的数据存储空间压力巨大。同时由于没有规划好es的索引使用,个别索引甚至出现超过最大文档数限制的问题,现实情况给运维人员带来的挑战是需要以最小的代价来解决这个问题。下面以内网开发、测试环境举例使用python脚本解决这个问题。

Each Elasticsearch shard is a Lucene index. There is a maximum number of documents you can have in a single Lucene index. As of LUCENE-5843, the limit is 2,147,483,519 (= Integer.MAX_VALUE - 128) documents. You can monitor shard sizes using the _cat/shards API.

实现思路

es本身支持“_delete_by_query”的形式对查询出来的数据进行删除。首先我们通过”_cat/indices“入口获取当前es服务上所有的索引信息。Python脚本删除查询出来的数据进

第一列表示索引当前的健康状态
第三列表示索引的名称
第四列表示索引在服务器上的存储目录名
第五、六列表示索引的副本数和分片信息
第七列表示当前索引的文档数
最后两列分别表示当前索引的存储占用空间,倒数第二列等于倒数第一列乘以副本数

其次我们通过curl形式拼接成删除命令发送给es服务端执行,其中createtime字段为数据的产生时间,单位为毫秒

curl -X POST "http://192.168.1.19:9400/fjhb-surveyor-v2/_delete_by_query?pretty" -H 'Content-Type: application/json' -d '
     {"query":{ "range": {
            "createTime": {   
                "lt": 1580400000000,    
                "format": "epoch_millis"
            }
        }
}}'

具体实现

#!/usr/bin/python
# -*- coding: UTF-8 -*-

###导入必须的模块
import requests
import time
import datetime
import os

#定义获取ES数据字典函数,返回索引名和索引占用存储空间大小字典
def getData(env):
    header = {"Content-Type": "application/x-www-form-urlencoded",
              "user-agent": "User-Agent:Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.94 Safari/537.36"
}
    data = {}
    with open('result.txt','w+') as f:
        req = requests.get(url=env+'/_cat/indices',headers=header).text
        f.write(req)
        f.seek(0)
        for line in f.readlines():
            data[line.split()[2]] = line.split()[-1]
    return data

#定义unix时间转换函数,以毫秒形式返回,返回值为int类型
def unixTime(day):
    today = datetime.date.today()
    target_day = today + datetime.timedelta(day)
    unixtime = int(time.mktime(target_day.timetuple())) * 1000
    return unixtime

#定义删除es数据函数,调用系统curl命令进行删除,需要传入环境、需要删除数据的时间范围(即多少天之前的数据)参数,由于索引数量众多,我们只处理超过1G的索引即可
def delData(env,day):
    header = 'Content-Type: application/json'
    for key, value in getData(env).items():
        if 'gb' in value:
            size = float(value.split('gb')[0])
            if size > 1:
                url = dev + '/' + key + '/_delete_by_query?pretty'
                command = ("curl -X POST \"%s\" -H '%s' "
                           "-d '{\"query\":{ \"range\": {\"createTime\": {\"lt\": %s,\"format\": \"epoch_millis\"}}}}'" % (
                           url, header, day))
                print(command)
                os.system(command)

if __name__ == '__main__':
    dev = 'http://192.168.1.19:9400'
    test1 = 'http://192.168.1.19:9200'
    test2 = 'http://192.168.1.19:9600'
    day = unixTime(-30)
    delData(dev,day)
    delData(test1,day)
    delData(test2,day)

结果验证

删除前
Python脚本删除查询出来的数据进
删除后
Python脚本删除查询出来的数据进

注意事项

1、目前脚本采用操作系统crontab方式调度,一天运行一次
2、首次删除因为数据量庞大,需要耗费较长时间,后续每天删除一天的数据量,删除效率尚可
3、脚本未考虑服务器报错等例外情况与告警通知,实际应用场景需要进行补充
4、脚本未考虑日志记录,实际应用场景需要进行补充

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI