本篇内容介绍了“s3cmd数据操作怎么实现”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
操作流程:
1.客户端完成文件切片,之后向API server提交上传操作请求,生成对应的presign URL(步骤1和2,如果想控制客户端上传数量,可以在这个阶段生成指定数量的token)
2.使用生成的presign URL构造HTTP请求,向S3服务上传数据。(步骤3)
3.客户端完成所有分块上传以后,向API server提交Complete请求,之后再由API server向S3服务发送complete请求(步骤4和5)
4.客户端获取到API server的返回,完成最后的操作。(可以考虑在这里回收token)
优点:
1. accesskey和secretkey不会存储在客户端,避免key泄露
2. 每个presignURL对应一个keyname,在有效时间内可以任意上传和覆盖已有文件,比较灵活。
3. 服务端可以结合各种Auth系统完成客户端的认证和授权,方便集成现有业务。
4. 客户端上传下载方式灵活,拿到presignURL以后,可以通过任意支持HTTP协议的客户端进行上传下载操作。
5. 适合大文件上传,对比之介绍的Presign方式,在数据上传阶段支持并发上传,上传效率有非常大的提升。
缺点:
1. 上传步骤需要多次交互,流程稍复杂。
2. 受限S3分块上传标准,文件小于5M不适用该方法。
安装服务端依赖
pip install boto pip install flask-restful
服务端demo代码如下:
# -*- coding: utf-8 -*- import time import hmac from hashlib import sha1 as sha import boto import boto.s3.connection import re py3k = False try: from urlparse import urlparse, unquote from base64 import encodestring except: py3k = True from urllib.parse import urlparse, unquote from base64 import encodebytes as encodestring from flask import Flask, request from flask_restful import Api, Resource app = Flask(__name__) api = Api(app) from boto.s3.multipart import MultiPartUpload class MultiPartUpload_Presign(MultiPartUpload): def __init__(self,id,bucket,key_name): MultiPartUpload.__init__(self) self.id = id self.bucket = bucket self.key_name = key_name def complete_upload(self): xml = self.to_xml() return self.bucket.complete_multipart_upload(self.key_name, self.id, xml) class S3PreSign(): def __init__(self, object_name,metadata=None, policy=None): self.service_url = 's3.ceph.work' #填S3服务的endpoint self.access_key = '' #access key self.secret_key = '' #secret key self.bucket_name = 'multi-upload' #bucket名称 self.object_name = str(object_name) # self.Expires = int(time.time()) + int(expires) conn = boto.connect_s3( aws_access_key_id = self.access_key, aws_secret_access_key = self.secret_key, host = self.service_url, port = 80, is_secure=False, # uncommmnt if you are not using ssl # calling_format = boto.s3.connection.OrdinaryCallingFormat(), calling_format = boto.s3.connection.SubdomainCallingFormat(), ) self.bucket = conn.get_bucket(self.bucket_name) self.upload_ID = self.Make_uploadID(self.object_name ,metadata=metadata, policy=policy) def Make_uploadID(self,object_name,metadata=None,policy=None): mpu = self.bucket.initiate_multipart_upload(object_name, metadata=metadata, policy=policy) return mpu.id def complete_upload(self,upload_ID): mpu = MultiPartUpload_Presign(id=upload_ID, bucket=self.bucket, key_name=self.object_name) status_ = 200 try: mpu.complete_upload() except: status_ = 422 finally: return status_ def get_signature_str(self, sign_str): if py3k: key = self.secret_key.encode('utf-8') msg = sign_str.encode('utf-8') else: key = self.secret_key msg = sign_str h = hmac.new(key, msg, digestmod=sha) return (encodestring(h.digest()).strip()).replace('+', '%2b') def build_url(self, expires,partNumber, Signature): url_ = "http://{bucket_name}.{service_url}/{object_name}?uploadId={uploadId}&partNumber={partNumber}&Expires={Expires}&AWSAccessKeyId={AWSAccessKeyId}&Signature={Signature}".format( bucket_name=self.bucket_name, service_url=self.service_url, object_name=self.object_name, uploadId=self.upload_ID, partNumber=partNumber, Expires= expires, AWSAccessKeyId=self.access_key, Signature=Signature ) return url_ def build_url_with_partid(self,expires, partNumber, partMd5 ): sign_str = "PUT\n{partMd5}\n\n{Expires}\n/{bucket_name}/{object_name}?partNumber={partNumber}&uploadId={uploadId}".format( partMd5=partMd5, Expires=expires, bucket_name=self.bucket_name, object_name=self.object_name, partNumber=partNumber, uploadId=self.upload_ID) Signature_ = self.get_signature_str(sign_str) return self.build_url(expires, partNumber, Signature_) class MultiPart_List(Resource): def post(self): PartNumber_ = {} metadata = {} policy = None # print request.form['keyname'] if 'keyname' in request.form: keyname = request.form['keyname'] else: return "no key", 400 if 'expires' in request.form: expires = request.form['expires'] else: return "no expires", 400 if 'contenttype' in request.form: metadata['Content-Type'] = str(request.form['contenttype']) if 'x-amz-acl' in request.form: policy = str(request.form['x-amz-acl']) for part_ in request.form: if re.match(r'^\d{1,}$',part_): # print part_ PartNumber_[part_] = request.form[part_] meatadata_rule = 'x-amz-meta-' if re.match(meatadata_rule, part_): # print part_ metadata[part_.split(meatadata_rule)[1]] = str(request.form[part_]) print metadata,policy,keyname,expires s3client = S3PreSign(keyname) result = {} result['UploadID'] = s3client.upload_ID expires = int(time.time()) + int(expires) for p_ in PartNumber_: result[p_] = s3client.build_url_with_partid(expires,p_,PartNumber_[p_]) return result, 201 class Complete_MultiPart(Resource): def post(self): if 'keyname' in request.form: keyname = request.form['keyname'] else: return "no key", 400 if 'uploadid' in request.form: uploadid = request.form['uploadid'] else: return "no UploadID", 400 s3client = S3PreSign(keyname) result = s3client.complete_upload(uploadid) return {"status_code":result}, result api.add_resource(MultiPart_List, '/presign') api.add_resource(Complete_MultiPart, '/complete') if __name__ == '__main__': app.run(debug=True)
安装客户端依赖
pip install requests
客户端demo代码如下:
# -*- coding: utf-8 -*- import requests from base64 import encodestring from hashlib import md5 import os import json from multiprocessing import Pool def multipart_upload_with_part(url_, part_file_path, partMD5): headers = {} headers["Content-MD5"] = partMD5 with open(part_file_path,'r') as fh: response = requests.put(url_, headers=headers, data=fh.read()) if response.status_code == 200: print "{} upload Sucessful !".format(part_file_path) class S3client(): def __init__(self, key_name, expires,part_num, uploadfile_path, policy=None, contenttype=None, metadata=None ,processes_num=2): self.multipart_data = {} if key_name: self.multipart_data['keyname'] = key_name if expires: self.multipart_data['expires'] = expires if policy: self.multipart_data['x-amz-acl'] = policy if contenttype: self.multipart_data['contenttype'] = contenttype if metadata: for k in metadata: self.multipart_data[k] = metadata[k] self.part_num = part_num self.processes_num = processes_num self.uploadfile_path = uploadfile_path self.server = 'http://localhost:5000/' #这里填你API服务器地址 self.upload_file_list_ = {} def split_file(self): filelist = [] statinfo = os.stat(self.uploadfile_path) chunksize = statinfo.st_size / self.part_num print "File size: %d(MB)" % (statinfo.st_size / (1024 * 1024)) print self.uploadfile_path,chunksize with open(self.uploadfile_path, "rb") as f: index = 1 while True: chunk = f.read(chunksize) if (chunk): fn = "%s.part.%d" % (self.uploadfile_path, index) # print "creating", fn with open(fn, "wb") as fw: fw.write(chunk) partMD5 = self.compute_hash(fn) tmp_ = {} tmp_[fn] = str(partMD5) filelist.append(tmp_) index = index + 1 else: break return filelist def compute_hash(self, filepath, buf_size=8192, size=None, hash_algorithm=md5): hash_obj = hash_algorithm() with open(filepath) as fp: spos = fp.tell() if size and size < buf_size: s = fp.read(size) else: s = fp.read(buf_size) while s: if not isinstance(s, bytes): s = s.encode('utf-8') hash_obj.update(s) if size: size -= len(s) if size <= 0: break if size and size < buf_size: s = fp.read(size) else: s = fp.read(buf_size) base64_digest = encodestring(hash_obj.digest()).decode('utf-8') if base64_digest[-1] == '\n': base64_digest = base64_digest[0:-1] return base64_digest def make_upload_list(self): upload_file_list = self.split_file() for f in upload_file_list: part_path = f.keys()[0] partMD5 = f.values()[0] # partnum_ = f.keys()[0].split(".")[-1] yield {part_path:partMD5} def get_multipart_presignurl(self): upload_file_list = self.make_upload_list() for i in upload_file_list: self.multipart_data[i.keys()[0].split(".")[-1]] = i.values()[0] self.upload_file_list_[i.keys()[0].split(".")[-1]] = {i.keys()[0]:i.values()[0]} url_ = self.server + "presign" r = requests.post(url_, data=self.multipart_data) allurl_ = json.loads(r.text) UploadID = allurl_.pop('UploadID') return UploadID,allurl_ def complete(self,UploadID,key_name): data = {"uploadid":UploadID,'keyname':key_name} url_ = self.server + "complete" r = requests.post(url_, data=data) if r.status_code == 200: print "Multipart upload finished!" else: print "Multipart upload failed!" def upload_mulprocess(self,allurl_): p = Pool(processes=self.processes_num) for url in allurl_: partNUm = url tmp_file = self.upload_file_list_[partNUm] filepath = tmp_file.keys()[0] partMD5 = tmp_file.values()[0] put_url = allurl_[url] p.apply_async(multipart_upload_with_part, (put_url,filepath,partMD5,)) print 'Waiting for all subprocesses done...' p.close() p.join() if __name__ == "__main__": key_name = 'abc.json' #上传的object名称 part_num = 6 #文件切分数量 expires = 300 #签名有效时长 file_path = '/tmp/abc.json' #上传文件本地路径 processes_num = 2 #上传并发数 contenttype = 'application/json' #文件的Content-type policy = 'public-read' #设置object的ACL权限 metadata = {'x-amz-meta-abc':'abcd'} #object的metadata #第一步:参数初始化 s3client = S3client(key_name,expires,part_num,file_path,policy,contenttype,metadata,2) #第二步:生成PresignURL UploadID,upload_file_list = s3client.get_multipart_presignurl() #第三步:使用生成的PresignURL上传数据 s3client.upload_mulprocess(upload_file_list) #第四步:提交compelte请求,完成最后的各个分块数据逻辑合并 s3client.complete(UploadID,key_name)
“s3cmd数据操作怎么实现”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。