对象存储

大文件分段上传

2024-09-05 03:40:01

前提条件

  • 开通对象存储服务。

  • 在天翼云对象存储控制台获取Access Key、Secret Key和外网访问控制的endpoint。

  • 安装Python环境,下载对应版本的ZOS官方Python SDK (本文以sdk_python3.X.tar为例),请参见开发者文档

环境配置

l  解压sdk_python3.X.tar,在解压出的sdk文件夹下,打开cmd导入需要的包。

pip3 install -r requirements.txt

l  把sdk文件夹中的service-2.sns.sdk-extras.json和service-2.s3.sdk-extras.json文件放到“C:\Users\你的用户名.aws\models\s3\2006-03-01”文件夹(如果不存在该文件,可自行创建)下。

分段上传

操作场景

  • 大型文件上传:当需要上传大型文件时,使用分段上传可以更高效地完成任务。相比于一次性上传整个文件,分段上传允许在网络故障或其他原因导致上传中断时,只需要重新传输中断的片段,而不需要重新上传整个文件。

  • 网络不稳定环境:在网络连接不稳定或带宽有限的环境中,分段上传可以降低因网络中断或超时导致整个文件上传失败的风险。当上传过程中发生中断,只需要重新上传中断的片段,而不需要重新上传整个文件。

  • 文件大小不确定:可以在需要上传的文件大小还不确定的情况下开始上传,这种场景在视频监控等行业应用中比较常见。

操作步骤

1.     创建py文件,并引入boto3包的session模块。

from boto3.sessionimport Session

2.     配置用于访问对象存储服务的凭证Access Key、Secret Key和外网访问地址endpoint。

access_key = "此处输入你的Access Key" # 这里输入你的Access Key
secret_key = "此处输入你的Secret Key" # 这里输入你的Secret Key
url = "此处输入你的endpoint" # 这里输入你的endpoint

3.     获取对象存储服务的操作客户端。

session =Session(access_key, secret_key)
s3_client =session.client('s3', endpoint_url=url)

4.     开启分块上传,获取uploadID。

# 在’’中填入要上传对象的桶名(必须已存在)和对象名。
dict_cmp = s3_client.create_multipart_upload(Bucket='?', Key='?')
uploadID = dict_cmp['UploadId'] # 记录uploadID,用于后续上传分块。

5.     分块读取要上传的文件。

# 设置列表,用于收集文件的所有分块
chunks = []
# 分块读取文件
with open("此处输入你的文件地址", 'rb') as f:
    while True:
      chunk = f.read(5 * 1024 * 1024) # 此处输入分块大小,本示例是5M,分块大小最低为5M
      if not chunk:
        break
      chunks.append(chunk)

6.     上传所有分块并记录各个分块的ETag和编号。

# 设置列表,用于记录上传分块的ETag和编号。
parts = []
i = 0
for chunk in chunks:
    i += 1
    # 上传分块
    dict_up = s3_client.upload_part(Bucket='?', Body=chunk,
                                    Key='?', PartNumber=i,
                                    UploadId=uploadID)
    # 记录该分块的ETag和编号
    part = {
        'ETag': dict_up['ETag'],
        'PartNumber': i,
    }
    parts.append(part)

7.     完成所有分块的拼接,存入ZOS。

# 完成所有分块拼接后,上传成功。
s3_client.complete_multipart_upload(
        Bucket='?',
        Key='?',
        MultipartUpload={
            'Parts': parts
        },
        UploadId=uploadID,
    )

整体代码

from boto3.session import Session
 
access_key = "此处输入你的Access Key" # 这里输入你的Access Key
secret_key = "此处输入你的Secret Key" # 这里输入你的Secret Key
url = "此处输入你的endpoint" # 这里输入你的endpoint
 
 
key = '?' # 传入桶内后的文件名
bucketName = '?' # 要传入文件的桶名
 
try:
    session = Session(access_key, secret_key)
    s3_client = session.client('s3', endpoint_url=url)
    # 开启分块上传,获取uploadID
    dict_cmp = s3_client.create_multipart_upload(Bucket=bucketName, Key=key)
    uploadID = dict_cmp['UploadId']#记录uploadID,用于后续上传分块。
 
    filePos = "此处输入文件路径"
    size = 5 * 1024 * 1024 #此处输入分块大小,本示例是5M,分块大小最低为5M
 
    # 设置列表,用于收集文件的所有分块
    chunks = []
    # 分块读取文件
    with open(filePos, 'rb') as f:
        while True:
            chunk = f.read(size)
            if not chunk:
                break
            chunks.append(chunk)
    # 设置列表,用于收集上传分块的ETag和编号。
    parts = []
    i = 0
    for chunk in chunks:
        i += 1
        # 上传分块
        dict_up = s3_client.upload_part(Bucket=bucketName, Body=chunk,
                                        Key=key, PartNumber=i,
                                        UploadId=uploadID)
        # 记录该分块的ETag和编号
        part = {
            'ETag': dict_up['ETag'],
            'PartNumber': i,
        }
        parts.append(part)
 
    # 完成整个拼接传入
    s3_client.complete_multipart_upload(
        Bucket=bucketName,
        Key=key,
        MultipartUpload={
            'Parts': parts
        },
        UploadId=uploadID,
    )
    print('文件上传成功!')
except BaseException:
    print("上传异常,请检查各个参数后重试。")
finally:
    # 关闭文件流
    f.close()

追加上传

操作场景

通过普通上传创建的对象,用户无法在原对象上进行追加写操作,如果对象内容发生了改变,只能重新上传同名对象来进行修改。这在日志、视频监控等数据复写较频繁的场景中使用不方便。所以可以通过追加上传的方式来只上传增加部分的内容,增强扩展性,提高文件的上传效率。

操作步骤

1.     创建py文件,并引入boto3包的session模块。

from boto3.sessionimport Session

2.     配置用于访问对象存储服务的凭证Access Key、Secret Key和外网访问地址endpoint。

access_key = "此处输入你的Access Key" # 这里输入你的Access Key
secret_key = "此处输入你的Secret Key" # 这里输入你的Secret Key
url = "此处输入你的endpoint" # 这里输入你的endpoint

3.     获取对象存储服务的操作客户端。

session =Session(access_key, secret_key)
s3_client =session.client('s3', endpoint_url=url)

4.     读取文件并记录文件md5值。

with open('输入要上传文件的路径', 'rb') as file:
    data = file.read()
    md5 = hashlib.md5(data).digest()
    md5 = base64.b64encode(md5)

5.     上传对象。
(1)首次上传

s3_client.put_object(Bucket='输入要传入的桶名',
                     Metadata=dict(m1='m1'),
                     Body=data,
                     Key='输入存入后对象的键值',
                     ContentMD5=str(md5, 'utf-8'),
                     Append=True,# 开启追加上传
                     AppendPosition=0)# 指定追加上传开始的位置

(2)追加上传

# 获取需要追加上传的对象信息
response = s3_client.head_object(Bucket="bucketblocks", 
                                 Key="mac.txt")
# 获取当前对象长度
appendPos = response['ContentLength']
 
s3_client.put_object(Bucket='输入要传入的桶名',
                     Metadata=dict(m1='m1'),
                     Body=data,
                     Key='输入存入后对象的键值',
                     ContentMD5=str(md5, 'utf-8'),
                     Append=True,# 开启追加上传
                     AppendPosition=appendPos)# 指定追加上传开始的位置

整体代码

from boto3.session import Session
import hashlib
import base64
 
access_key = "此处输入你的Access Key" # 这里输入你的Access Key
secret_key = "此处输入你的Secret Key" # 这里输入你的Secret Key
url = "此处输入你的endpoint" # 这里输入你的endpoint
 
# 上传文件桶的桶名
bname = '输入要传入的桶名'
# 文件在桶内存储的key值
key = '输入存入后对象的键值'
# 追加上传开始的位置,默认为0
appendPos = 0
 
try:
    # 读取文件并记录文件md5值
    with open('输入要上传文件的路径', 'rb') as file:
        data = file.read()
        md5 = hashlib.md5(data).digest()
        md5 = base64.b64encode(md5)
    # 获取对象存储服务的操作客户端
    session = Session(access_key, secret_key)
    s3_client = session.client("s3", endpoint_url=url)
    # 获取桶内所有对象信息
    response = s3_client.list_objects(Bucket=bname)
    # 遍历所有对象键值
    for obj in response['Contents']:
        # 桶中存在与待上传对象同key的对象,需要判断该对象能否进行追加上传
        if obj['Key'] == key:
            res = s3_client.head_object(Bucket=bname, Key=key)
            if res['ResponseMetadata']['HTTPHeaders']['x-rgw-object-type'] != 'Appendable':
                print('该对象不是Appendable类型,无法进行追加上传.')
                raise Exception
            #桶内对象可以进行追加上传,更新追加上传开始的位置
            appendPos = res['ContentLength']
    # 完成追加上传
    s3_client.put_object(Bucket=bname,
                         Metadata=dict(m1='m1'),
                         Body=data,
                         Key=key,
                         ContentMD5=str(md5, 'utf-8'),
                         Append=True,
                         AppendPosition=appendPos)
    if appendPos == 0:
        print('初次上传成功.')
    else:
        print('追加上传成功.')
 
except BaseException:
    print("参数有问题,请修改后重试.")
finally:
    # 关闭文件流
    file.close()

断点续传

操作场景

  • 大文件上传:当需要上传大文件时,如视频、备份文件或软件包等,由于上传时间较长,在上传过程中可能会遇到各种问题,例如网络中断、上传工具崩溃或用户手动中断等。断点续传的操作场景允许用户在上传中断后,能够从中断点继续上传,而不需要重新上传整个文件。

  • 网络不稳定:在网络环境不稳定的情况下,如移动网络或低带宽连接,大文件的上传过程可能会中断或超时。通过断点续传,在宽带不稳定的情况下,上传仍可恢复并继续进行。

  • 长时间上传:某些上传任务可能需要较长的时间才能完成,这可能会增加上传过程中意外中断的风险。断点续传可以将上传任务分段处理,并保存上传进度信息,以便在上传中断后能够恢复并继续上传。

  • 客户端或服务端故障:在上传过程中,当客户端或服务端发生故障时,断点续传允许用户重新连接并从中断点继续上传,而不会对上传任务造成重大影响。

操作步骤

1.     创建py文件,并引入boto3包的session模块。

from boto3.sessionimport Session

2.     配置用于访问对象存储服务的凭证Access Key、Secret Key和外网访问地址endpoint。

access_key = "此处输入你的Access Key" # 这里输入你的Access Key
secret_key = "此处输入你的Secret Key" # 这里输入你的Secret Key
url = "此处输入你的endpoint" # 这里输入你的endpoint

3.     获取对象存储服务的操作客户端。

session =Session(access_key, secret_key)
s3_client =session.client('s3', endpoint_url=url)

4.     查询桶中还没有完成或放弃的分块上传,获取需要续传对象的UploadId和Key。

response = s3_client.list_multipart_uploads(Bucket='输入对象传入的桶名')
uploadInfo = response['Uploads']

5.     根据上一步得到的UploadId和Key来获取当前对象已经上传完成的分段信息。

response = s3_client.list_parts(Bucket='输入对象传入的桶名', Key='Key值',
                         UploadId='UploadId值')
finishedPartsInfo = response['Parts']
# 设置列表,用于记录当前已经完成上传分块的编号。
finishedPartNumbers = []
# 记录分块的大小。
partSize = 0
# 设置列表,用于收集上传分块的ETag和编号。
parts = []
 
for finishedPart in finishedPartsInfo:
    finishedPartNumbers.append(finishedPart['PartNumber'])
    if partSize < finishedPart['Size']:
        partSize = finishedPart['Size']
    part = {
        'ETag': finishedPart['ETag'],
        'PartNumber': finishedPart['PartNumber']
    }
    parts.append(part)

           6.     上传对象中未完成的分段,并记录各分段的ETag信息。

# 设置列表,用于收集文件的所有分块
chunks = []
# 分块读取文件
with open("此处输入你的文件地址", 'rb') as f:
    while True:
        chunk = f.read(5*1024*1024)  # 此处输入分块大小,本示例是5M,分块大小最低为5M
        if not chunk:
            break
        chunks.append(chunk)
 
i = 0
for chunk in chunks:
    i += 1
    # 当该分块已经完成上传时,跳过。
    if i in finishedPartNumbers:
        continue
    # 上传分块
    response = s3_client.upload_part(Bucket=bucketName, Body=chunk,
                                    Key=key, PartNumber=i,
                                    UploadId='UploadId值')
    # 记录该分块的ETag和编号
    part = {
        'ETag': response['ETag'],
        'PartNumber': i,
    }
    parts.append(part)

7.     完成所有分块的拼接,存入ZOS。

# 完成所有分块拼接后,上传成功。
s3_client.complete_multipart_upload(
        Bucket=bucketName,
        Key=key,
        MultipartUpload={
            'Parts': parts
        },
        UploadId=uploadID,
   )


整体代码

from boto3.session import Session

access_key = "此处输入你的Access Key" # 这里输入你的Access Key
secret_key = "此处输入你的Secret Key" # 这里输入你的Secret Key
url = "此处输入你的endpoint" # 这里输入你的endpoint

bucketName = '?' # 要传入文件的桶名
filePos = "?" # 待上传文件所在位置

try:
   session = Session(access_key, secret_key)
   s3_client = session.client('s3', endpoint_url=url)
   # 查询桶中还没有完成或放弃的分块上传,获取需要续传对象的UploadId和Key。
   response = s3_client.list_multipart_uploads(Bucket=bucketName)
   uploadInfo = response['Uploads']
   if uploadInfo is None:
       print(bucketName + '桶内没有需要断点上传的对象。')
       raise BaseException
   # 获取需要续传的对象的UploadId和Key,这里以第一个对象为例
   uploadId = uploadInfo[0]['UploadId']
   key = uploadInfo[0]['Key']

   # 获取已经完成上传的分块信息
   response = s3_client.list_parts(Bucket=bucketName, Key=key, UploadId=uploadId)
   finishedPartsInfo = response['Parts']
   # 设置列表,用于记录当前已经完成上传分块的编号。
   finishedPartNumbers = []
   # 记录分块的大小。
   partSize = 0
   # 设置列表,用于收集上传分块的ETag和编号。
   parts = []
   for finishedPart in finishedPartsInfo:
       finishedPartNumbers.append(finishedPart['PartNumber'])
       if partSize < finishedPart['Size']:
           partSize = finishedPart['Size']
       part = {
           'ETag': finishedPart['ETag'],
           'PartNumber': finishedPart['PartNumber']
       }
       parts.append(part)


   # 设置列表,用于收集文件的所有分块
   chunks = []
   # 分块读取文件
   with open(filePos, 'rb') as f:
       while True:
           chunk = f.read(partSize)  # 此处输入分块大小,本示例是5M,分块大小最低为5M
           if not chunk:
               break
           chunks.append(chunk)

   i = 0
   for chunk in chunks:
       i += 1
       # 当该分块已经完成上传时,跳过。
       if i in finishedPartNumbers:
           continue
       # 上传分块
       response = s3_client.upload_part(Bucket=bucketName, Body=chunk,
                                        Key=key, PartNumber=i,
                                        UploadId=uploadId)
       # 记录该分块的ETag和编号
       part = {
           'ETag': response['ETag'],
           'PartNumber': i,
       }
       parts.append(part)

   # 完成整个拼接传入
   s3_client.complete_multipart_upload(
       Bucket=bucketName,
       Key=key,
       MultipartUpload={
           'Parts': parts
       },
       UploadId=uploadId
   )
   print('文件上传成功!')
except BaseException:
   print("上传异常,请检查各个参数后重试。")
finally:
   f.close()



lQZCqQeqst0J