扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
小编给大家分享一下python如何实现查询bucket已用量脚本,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!
吉林网站制作公司哪家好,找创新互联公司!从网页设计、网站建设、微信开发、APP开发、自适应网站建设等网站项目制作,到程序开发,运营维护。创新互联公司成立与2013年到现在10年的时间,我们拥有了丰富的建站经验和运维经验,来保证我们的工作的顺利进行。专注于网站建设就选创新互联公司。
目前仅支持ceph的s3方案,具体配置看说明
# -*- coding: utf-8 -*- import requests import json from email.utils import formatdate import hmac py3k = False from hashlib import sha1 as sha try: from urlparse import urlparse from base64 import encodestring except: py3k = True from urllib.parse import urlparse from base64 import encodebytes as encodestring class AuthBase(object): """Base class that all auth implementations derive from""" def __call__(self, r): raise NotImplementedError('Auth hooks must be callable.') class S3Auth(AuthBase): """Attaches AWS Authentication to the given Request object.""" service_base_url = 's3.amazonaws.com' # List of Query String Arguments of Interest special_params = [ 'acl', 'location', 'logging', 'partNumber', 'policy', 'requestPayment', 'torrent', 'versioning', 'versionId', 'versions', 'website', 'uploads', 'uploadId', 'response-content-type', 'response-content-language', 'response-expires', 'response-cache-control', 'delete', 'lifecycle', 'response-content-disposition', 'response-content-encoding' ] def __init__(self, access_key, secret_key, service_url=None): if service_url: self.service_base_url = service_url self.access_key = str(access_key) self.secret_key = str(secret_key) self.au ="" def __call__(self, r): # Create date header if it is not created yet. if not 'date' in r.headers and not 'x-amz-date' in r.headers: r.headers['date'] = formatdate( timeval=None, localtime=False, usegmt=True) signature = self.get_signature(r) if py3k: signature = signature.decode('utf-8') r.headers['Authorization'] = 'AWS %s:%s' % (self.access_key, signature) self.au = r.headers # print self.au return r def get_signature(self, r): canonical_string = self.get_canonical_string( r.url, r.headers, r.method) if py3k: key = self.secret_key.encode('utf-8') msg = canonical_string.encode('utf-8') else: key = self.secret_key msg = canonical_string h = hmac.new(key, msg, digestmod=sha) return encodestring(h.digest()).strip() def get_canonical_string(self, url, headers, method): parsedurl = urlparse(url) objectkey = parsedurl.path[1:] query_args = sorted(parsedurl.query.split('&')) bucket = parsedurl.netloc[:-len(self.service_base_url)] if len(bucket) > 1: # remove last dot bucket = bucket[:-1] interesting_headers = { 'content-md5': '', 'content-type': '', 'date': ''} for key in headers: lk = key.lower() try: lk = lk.decode('utf-8') except: pass if headers[key] and (lk in interesting_headers.keys() or lk.startswith('x-amz-')): interesting_headers[lk] = headers[key].strip() # If x-amz-date is used it supersedes the date header. if not py3k: if 'x-amz-date' in interesting_headers: interesting_headers['date'] = '' else: if 'x-amz-date' in interesting_headers: interesting_headers['date'] = '' buf = '%s\n' % method for key in sorted(interesting_headers.keys()): val = interesting_headers[key] if key.startswith('x-amz-'): buf += '%s:%s\n' % (key, val) else: buf += '%s\n' % val # append the bucket if it exists if bucket != '': buf += '/%s' % bucket # add the objectkey. even if it doesn't exist, add the slash buf += '/%s' % objectkey params_found = False # handle special query string arguments for q in query_args: k = q.split('=')[0] if k in self.special_params: if params_found: buf += '&%s' % q else: buf += '?%s' % q params_found = True return buf class S3Admin(): def __init__(self): self.access_key = '' #填access_key self.secret_key = '' #填secret_key self.endpoint = 's3.ceph.work' #填endpoint def get_bucket_usage(self,bucketname): #url = 'http://%s/%s/' % (self.endpoint, bucketname) #path style url = 'http://%s.%s/' % (bucketname,self.endpoint) #virtual hosted styple r = requests.head(url, auth=S3Auth(self.access_key, self.secret_key, self.endpoint)) print r.headers return r.headers s3client = S3Admin() bucket_name= 'xxx' #替换成相应的bucket名称 result = s3client.get_bucket_usage(bucket_name) print 'objects_num= %s , total_Bytes_Used= %s ' % (result['X-RGW-Object-Count'],result['X-RGW-Bytes-Used']) #注意 objects_num 为当前bucket内的object数量,total_Bytes_Used为当前bucket内的已用容量(单位为Byte)
以上是“python如何实现查询bucket已用量脚本”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注创新互联行业资讯频道!
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流