小编给大家分享一下python如何实现查询bucket已用量脚本,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!
目前仅支持ceph的s3方案,具体配置看说明
# -*- coding: utf-8 -*- import requests import json from email.utils import formatdate import hmac py3k = False from hashlib import sha1 as sha try: from urlparse import urlparse from base64 import encodestring except: py3k = True from urllib.parse import urlparse from base64 import encodebytes as encodestring class AuthBase(object): """Base class that all auth implementations derive from""" def __call__(self, r): raise NotImplementedError('Auth hooks must be callable.') class S3Auth(AuthBase): """Attaches AWS Authentication to the given Request object.""" service_base_url = 's3.amazonaws.com' # List of Query String Arguments of Interest special_params = [ 'acl', 'location', 'logging', 'partNumber', 'policy', 'requestPayment', 'torrent', 'versioning', 'versionId', 'versions', 'website', 'uploads', 'uploadId', 'response-content-type', 'response-content-language', 'response-expires', 'response-cache-control', 'delete', 'lifecycle', 'response-content-disposition', 'response-content-encoding' ] def __init__(self, access_key, secret_key, service_url=None): if service_url: self.service_base_url = service_url self.access_key = str(access_key) self.secret_key = str(secret_key) self.au ="" def __call__(self, r): # Create date header if it is not created yet. if not 'date' in r.headers and not 'x-amz-date' in r.headers: r.headers['date'] = formatdate( timeval=None, localtime=False, usegmt=True) signature = self.get_signature(r) if py3k: signature = signature.decode('utf-8') r.headers['Authorization'] = 'AWS %s:%s' % (self.access_key, signature) self.au = r.headers # print self.au return r def get_signature(self, r): canonical_string = self.get_canonical_string( r.url, r.headers, r.method) if py3k: key = self.secret_key.encode('utf-8') msg = canonical_string.encode('utf-8') else: key = self.secret_key msg = canonical_string h = hmac.new(key, msg, digestmod=sha) return encodestring(h.digest()).strip() def get_canonical_string(self, url, headers, method): parsedurl = urlparse(url) objectkey = parsedurl.path[1:] query_args = sorted(parsedurl.query.split('&')) bucket = parsedurl.netloc[:-len(self.service_base_url)] if len(bucket) > 1: # remove last dot bucket = bucket[:-1] interesting_headers = { 'content-md5': '', 'content-type': '', 'date': ''} for key in headers: lk = key.lower() try: lk = lk.decode('utf-8') except: pass if headers[key] and (lk in interesting_headers.keys() or lk.startswith('x-amz-')): interesting_headers[lk] = headers[key].strip() # If x-amz-date is used it supersedes the date header. if not py3k: if 'x-amz-date' in interesting_headers: interesting_headers['date'] = '' else: if 'x-amz-date' in interesting_headers: interesting_headers['date'] = '' buf = '%s\n' % method for key in sorted(interesting_headers.keys()): val = interesting_headers[key] if key.startswith('x-amz-'): buf += '%s:%s\n' % (key, val) else: buf += '%s\n' % val # append the bucket if it exists if bucket != '': buf += '/%s' % bucket # add the objectkey. even if it doesn't exist, add the slash buf += '/%s' % objectkey params_found = False # handle special query string arguments for q in query_args: k = q.split('=')[0] if k in self.special_params: if params_found: buf += '&%s' % q else: buf += '?%s' % q params_found = True return buf class S3Admin(): def __init__(self): self.access_key = '' #填access_key self.secret_key = '' #填secret_key self.endpoint = 's3.ceph.work' #填endpoint def get_bucket_usage(self,bucketname): #url = 'http://%s/%s/' % (self.endpoint, bucketname) #path style url = 'http://%s.%s/' % (bucketname,self.endpoint) #virtual hosted styple r = requests.head(url, auth=S3Auth(self.access_key, self.secret_key, self.endpoint)) print r.headers return r.headers s3client = S3Admin() bucket_name= 'xxx' #替换成相应的bucket名称 result = s3client.get_bucket_usage(bucket_name) print 'objects_num= %s , total_Bytes_Used= %s ' % (result['X-RGW-Object-Count'],result['X-RGW-Bytes-Used']) #注意 objects_num 为当前bucket内的object数量,total_Bytes_Used为当前bucket内的已用容量(单位为Byte)
以上是“python如何实现查询bucket已用量脚本”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注亿速云行业资讯频道!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。