s3: implement reading and writing of metadata #111

This commit is contained in:
Nick Craig-Wood 2022-05-24 12:32:39 +01:00
parent a692bd2cd4
commit 22abd785eb
3 changed files with 306 additions and 48 deletions

View File

@ -71,6 +71,10 @@ func init() {
}
return nil, fmt.Errorf("unknown state %q", config.State)
},
MetadataInfo: &fs.MetadataInfo{
System: systemMetadataInfo,
Help: `User metadata is stored as x-amz-meta- keys. S3 metadata keys are case insensitive and are always returned in lower case.`,
},
Options: []fs.Option{{
Name: fs.ConfigProvider,
Help: "Choose your S3 provider.",
@ -1984,8 +1988,8 @@ circumstances or for testing.
// Constants
const (
metaMtime = "Mtime" // the meta key to store mtime in - e.g. X-Amz-Meta-Mtime
metaMD5Hash = "Md5chksum" // the meta key to store md5hash in
metaMtime = "mtime" // the meta key to store mtime in - e.g. X-Amz-Meta-Mtime
metaMD5Hash = "md5chksum" // the meta key to store md5hash in
// The maximum size of object we can COPY - this should be 5 GiB but is < 5 GB for b2 compatibility
// See https://forum.rclone.org/t/copying-files-within-a-b2-bucket/16680/76
maxSizeForCopy = 4768 * 1024 * 1024
@ -2000,6 +2004,57 @@ const (
maxExpireDuration = fs.Duration(7 * 24 * time.Hour) // max expiry is 1 week
)
// system metadata keys which this backend owns
var systemMetadataInfo = map[string]fs.MetadataHelp{
"cache-control": {
Help: "Cache-Control header",
Type: "string",
Example: "no-cache",
},
"content-disposition": {
Help: "Content-Disposition header",
Type: "string",
Example: "inline",
},
"content-encoding": {
Help: "Content-Encoding header",
Type: "string",
Example: "gzip",
},
"content-language": {
Help: "Content-Language header",
Type: "string",
Example: "en-US",
},
"content-type": {
Help: "Content-Type header",
Type: "string",
Example: "text/plain",
},
// "tagging": {
// Help: "x-amz-tagging header",
// Type: "string",
// Example: "tag1=value1&tag2=value2",
// },
"tier": {
Help: "Tier of the object",
Type: "string",
Example: "GLACIER",
ReadOnly: true,
},
"mtime": {
Help: "Time of last modification, read from rclone metadata",
Type: "RFC 3339",
Example: "2006-01-02T15:04:05.999999999Z07:00",
},
"btime": {
Help: "Time of file birth (creation) read from Last-Modified header",
Type: "RFC 3339",
Example: "2006-01-02T15:04:05.999999999Z07:00",
ReadOnly: true,
},
}
// Options defines the configuration for this backend
type Options struct {
Provider string `config:"provider"`
@ -2079,7 +2134,13 @@ type Object struct {
lastModified time.Time // Last modified
meta map[string]string // The object metadata if known - may be nil - with lower case keys
mimeType string // MimeType of object - may be ""
storageClass string // e.g. GLACIER
// Metadata as pointers to strings as they often won't be present
storageClass *string // e.g. GLACIER
cacheControl *string // Cache-Control: header
contentDisposition *string // Content-Disposition: header
contentEncoding *string // Content-Encoding: header
contentLanguage *string // Content-Language: header
}
// ------------------------------------------------------------
@ -2573,6 +2634,9 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
f.features = (&fs.Features{
ReadMimeType: true,
WriteMimeType: true,
ReadMetadata: true,
WriteMetadata: true,
UserMetadata: true,
BucketBased: true,
BucketBasedRootOK: true,
SetTier: true,
@ -2623,7 +2687,7 @@ func (f *Fs) newObjectWithInfo(ctx context.Context, remote string, info *s3.Obje
}
o.setMD5FromEtag(aws.StringValue(info.ETag))
o.bytes = aws.Int64Value(info.Size)
o.storageClass = aws.StringValue(info.StorageClass)
o.storageClass = info.StorageClass
} else if !o.fs.opt.NoHeadObject {
err := o.readMetaData(ctx) // reads info and meta, returning an error
if err != nil {
@ -3491,7 +3555,7 @@ func (f *Fs) Command(ctx context.Context, name string, arg []string, opt map[str
st.Status = "Not an S3 object"
return
}
if o.storageClass != "GLACIER" && o.storageClass != "DEEP_ARCHIVE" {
if o.storageClass == nil || (*o.storageClass != "GLACIER" && *o.storageClass != "DEEP_ARCHIVE") {
st.Status = "Not GLACIER or DEEP_ARCHIVE storage class"
return
}
@ -3749,7 +3813,8 @@ func (o *Object) readMetaData(ctx context.Context) (err error) {
if err != nil {
return err
}
o.setMetaData(resp.ETag, resp.ContentLength, resp.LastModified, resp.Metadata, resp.ContentType, resp.StorageClass)
o.setMetaData(resp)
// resp.ETag, resp.ContentLength, resp.LastModified, resp.Metadata, resp.ContentType, resp.StorageClass)
return nil
}
@ -3774,14 +3839,14 @@ func mapToS3Metadata(meta map[string]string) map[string]*string {
return s3Meta
}
func (o *Object) setMetaData(etag *string, contentLength *int64, lastModified *time.Time, meta map[string]*string, mimeType *string, storageClass *string) {
func (o *Object) setMetaData(resp *s3.HeadObjectOutput) {
// Ignore missing Content-Length assuming it is 0
// Some versions of ceph do this due their apache proxies
if contentLength != nil {
o.bytes = *contentLength
if resp.ContentLength != nil {
o.bytes = *resp.ContentLength
}
o.setMD5FromEtag(aws.StringValue(etag))
o.meta = s3MetadataToMap(meta)
o.setMD5FromEtag(aws.StringValue(resp.ETag))
o.meta = s3MetadataToMap(resp.Metadata)
// Read MD5 from metadata if present
if md5sumBase64, ok := o.meta[metaMD5Hash]; ok {
md5sumBytes, err := base64.StdEncoding.DecodeString(md5sumBase64)
@ -3793,14 +3858,20 @@ func (o *Object) setMetaData(etag *string, contentLength *int64, lastModified *t
o.md5 = hex.EncodeToString(md5sumBytes)
}
}
o.storageClass = aws.StringValue(storageClass)
if lastModified == nil {
if resp.LastModified == nil {
o.lastModified = time.Now()
fs.Logf(o, "Failed to read last modified")
} else {
o.lastModified = *lastModified
o.lastModified = *resp.LastModified
}
o.mimeType = aws.StringValue(mimeType)
o.mimeType = aws.StringValue(resp.ContentType)
// Set system metadata
o.storageClass = resp.StorageClass
o.cacheControl = resp.CacheControl
o.contentDisposition = resp.ContentDisposition
o.contentEncoding = resp.ContentEncoding
o.contentLanguage = resp.ContentLanguage
}
// ModTime returns the modification time of the object
@ -3839,7 +3910,7 @@ func (o *Object) SetModTime(ctx context.Context, modTime time.Time) error {
o.meta[metaMtime] = swift.TimeToFloatString(modTime)
// Can't update metadata here, so return this error to force a recopy
if o.storageClass == "GLACIER" || o.storageClass == "DEEP_ARCHIVE" {
if o.storageClass != nil && (*o.storageClass == "GLACIER" || *o.storageClass == "DEEP_ARCHIVE") {
return fs.ErrorCantSetModTime
}
@ -3900,17 +3971,34 @@ func (o *Object) downloadFromURL(ctx context.Context, bucketPath string, options
metaData := make(map[string]*string)
for key, value := range resp.Header {
if strings.HasPrefix(key, "x-amz-meta") {
key = strings.ToLower(key)
if strings.HasPrefix(key, "x-amz-meta-") {
metaKey := strings.TrimPrefix(key, "x-amz-meta-")
metaData[strings.Title(metaKey)] = &value[0]
metaData[metaKey] = &value[0]
}
}
storageClass := resp.Header.Get("X-Amz-Storage-Class")
contentType := resp.Header.Get("Content-Type")
etag := resp.Header.Get("Etag")
header := func(k string) *string {
v := resp.Header.Get(k)
if v == "" {
return nil
}
return &v
}
o.setMetaData(&etag, contentLength, &lastModified, metaData, &contentType, &storageClass)
var head = s3.HeadObjectOutput{
ETag: header("Etag"),
ContentLength: contentLength,
LastModified: &lastModified,
Metadata: metaData,
CacheControl: header("Cache-Control"),
ContentDisposition: header("Content-Disposition"),
ContentEncoding: header("Content-Encoding"),
ContentLanguage: header("Content-Language"),
ContentType: header("Content-Type"),
StorageClass: header("X-Amz-Storage-Class"),
}
o.setMetaData(&head)
return resp.Body, err
}
@ -3985,7 +4073,10 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read
fs.Debugf(o, "Failed to find length in %q", contentRange)
}
}
o.setMetaData(resp.ETag, size, resp.LastModified, resp.Metadata, resp.ContentType, resp.StorageClass)
var head s3.HeadObjectOutput
structs.SetFrom(&head, resp)
head.ContentLength = size
o.setMetaData(&head)
return resp.Body, nil
}
@ -4322,11 +4413,56 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
multipart := size < 0 || size >= int64(o.fs.opt.UploadCutoff)
// Set the mtime in the meta data
metadata := map[string]*string{
metaMtime: aws.String(swift.TimeToFloatString(modTime)),
req := s3.PutObjectInput{
Bucket: &bucket,
ACL: &o.fs.opt.ACL,
Key: &bucketPath,
}
// Fetch metadata if --metadata is in use
meta, err := fs.GetMetadataOptions(ctx, src, options)
if err != nil {
return fmt.Errorf("failed to read metadata from source object: %w", err)
}
req.Metadata = make(map[string]*string, len(meta)+2)
// merge metadata into request and user metadata
for k, v := range meta {
pv := aws.String(v)
k = strings.ToLower(k)
switch k {
case "cache-control":
req.CacheControl = pv
case "content-disposition":
req.ContentDisposition = pv
case "content-encoding":
req.ContentEncoding = pv
case "content-language":
req.ContentLanguage = pv
case "content-type":
req.ContentType = pv
case "x-amz-tagging":
req.Tagging = pv
case "tier":
// ignore
case "mtime":
// mtime in meta overrides source ModTime
metaModTime, err := time.Parse(time.RFC3339Nano, v)
if err != nil {
fs.Debugf(o, "failed to parse metadata %s: %q: %v", k, v, err)
} else {
modTime = metaModTime
}
case "btime":
// write as metadata since we can't set it
req.Metadata[k] = pv
default:
req.Metadata[k] = pv
}
}
// Set the mtime in the meta data
req.Metadata[metaMtime] = aws.String(swift.TimeToFloatString(modTime))
// read the md5sum if available
// - for non multipart
// - so we can add a ContentMD5
@ -4346,20 +4482,15 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
// - a multipart upload
// - the Etag is not an MD5, eg when using SSE/SSE-C
// provided checksums aren't disabled
metadata[metaMD5Hash] = &md5sumBase64
req.Metadata[metaMD5Hash] = &md5sumBase64
}
}
}
}
// Guess the content type
mimeType := fs.MimeType(ctx, src)
req := s3.PutObjectInput{
Bucket: &bucket,
ACL: &o.fs.opt.ACL,
Key: &bucketPath,
ContentType: &mimeType,
Metadata: metadata,
// Set the content type it it isn't set already
if req.ContentType == nil {
req.ContentType = aws.String(fs.MimeType(ctx, src))
}
if size >= 0 {
req.ContentLength = &size
@ -4438,19 +4569,19 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
// so make up the object as best we can assuming it got
// uploaded properly. If size < 0 then we need to do the HEAD.
if o.fs.opt.NoHead && size >= 0 {
o.md5 = md5sumHex
o.bytes = size
o.lastModified = time.Now()
o.meta = s3MetadataToMap(req.Metadata)
o.mimeType = aws.StringValue(req.ContentType)
o.storageClass = aws.StringValue(req.StorageClass)
var head s3.HeadObjectOutput
structs.SetFrom(&head, req)
head.ETag = &md5sumHex // doesn't matter quotes are misssing
head.ContentLength = &size
// If we have done a single part PUT request then we can read these
if gotEtag != "" {
o.setMD5FromEtag(gotEtag)
head.ETag = &gotEtag
}
if !o.lastModified.IsZero() {
o.lastModified = lastModified
if lastModified.IsZero() {
lastModified = time.Now()
}
head.LastModified = &lastModified
o.setMetaData(&head)
return nil
}
@ -4460,7 +4591,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
if err != nil {
return err
}
o.setMetaData(head.ETag, head.ContentLength, head.LastModified, head.Metadata, head.ContentType, head.StorageClass)
o.setMetaData(head)
if o.fs.opt.UseMultipartEtag.Value && !o.fs.etagIsNotMD5 && wantETag != "" && head.ETag != nil && *head.ETag != "" {
gotETag := strings.Trim(strings.ToLower(*head.ETag), `"`)
if wantETag != gotETag {
@ -4511,16 +4642,61 @@ func (o *Object) SetTier(tier string) (err error) {
if err != nil {
return err
}
o.storageClass = tier
o.storageClass = &tier
return err
}
// GetTier returns storage class as string
func (o *Object) GetTier() string {
if o.storageClass == "" {
if o.storageClass == nil || *o.storageClass == "" {
return "STANDARD"
}
return o.storageClass
return *o.storageClass
}
// Metadata returns metadata for an object
//
// It should return nil if there is no Metadata
func (o *Object) Metadata(ctx context.Context) (metadata fs.Metadata, err error) {
err = o.readMetaData(ctx)
if err != nil {
return nil, err
}
metadata = make(fs.Metadata, len(o.meta)+7)
for k, v := range o.meta {
switch k {
case metaMtime:
if modTime, err := swift.FloatStringToTime(v); err == nil {
metadata["mtime"] = modTime.Format(time.RFC3339Nano)
}
case metaMD5Hash:
// don't write hash metadata
default:
metadata[k] = v
}
}
if o.mimeType != "" {
metadata["content-type"] = o.mimeType
}
// metadata["x-amz-tagging"] = ""
if !o.lastModified.IsZero() {
metadata["btime"] = o.lastModified.Format(time.RFC3339Nano)
}
// Set system metadata
setMetadata := func(k string, v *string) {
if v == nil || *v == "" {
return
}
metadata[k] = *v
}
setMetadata("cache-control", o.cacheControl)
setMetadata("content-disposition", o.contentDisposition)
setMetadata("content-encoding", o.contentEncoding)
setMetadata("content-language", o.contentLanguage)
setMetadata("tier", o.storageClass)
return metadata, nil
}
// Check the interfaces are satisfied
@ -4535,4 +4711,5 @@ var (
_ fs.MimeTyper = &Object{}
_ fs.GetTierer = &Object{}
_ fs.SetTierer = &Object{}
_ fs.Metadataer = &Object{}
)

View File

@ -0,0 +1,61 @@
package s3
import (
"context"
"fmt"
"testing"
"time"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fstest"
"github.com/rclone/rclone/fstest/fstests"
"github.com/rclone/rclone/lib/random"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func (f *Fs) InternalTestMetadata(t *testing.T) {
ctx := context.Background()
contents := random.String(100)
item := fstest.NewItem("test-metadata", contents, fstest.Time("2001-05-06T04:05:06.499999999Z"))
btime := time.Now()
metadata := fs.Metadata{
"cache-control": "no-cache",
"content-disposition": "inline",
"content-encoding": "gzip",
"content-language": "en-US",
"content-type": "text/plain",
"mtime": "2009-05-06T04:05:06.499999999Z",
// "tier" - read only
// "btime" - read only
}
obj := fstests.PutTestContentsMetadata(ctx, t, f, &item, contents, true, "text/html", metadata)
defer func() {
assert.NoError(t, obj.Remove(ctx))
}()
o := obj.(*Object)
gotMetadata, err := o.Metadata(ctx)
require.NoError(t, err)
for k, v := range metadata {
got := gotMetadata[k]
switch k {
case "mtime":
assert.True(t, fstest.Time(v).Equal(fstest.Time(got)))
case "btime":
gotBtime := fstest.Time(got)
dt := gotBtime.Sub(btime)
assert.True(t, dt < time.Minute && dt > -time.Minute, fmt.Sprintf("btime more than 1 minute out want %v got %v delta %v", btime, gotBtime, dt))
assert.True(t, fstest.Time(v).Equal(fstest.Time(got)))
case "tier":
assert.NotEqual(t, "", got)
default:
assert.Equal(t, v, got, k)
}
}
}
func (f *Fs) InternalTest(t *testing.T) {
t.Run("Metadata", f.InternalTestMetadata)
}
var _ fstests.InternalTester = (*Fs)(nil)

View File

@ -2369,6 +2369,26 @@ Properties:
- Type: Tristate
- Default: unset
### Metadata
User metadata is stored as x-amz-meta- keys. S3 metadata keys are case insensitive and are always returned in lower case.
Here are the possible system metadata items for the s3 backend.
| Name | Help | Type | Example | Read Only |
|------|------|------|---------|-----------|
| btime | Time of file birth (creation) read from Last-Modified header | RFC 3339 | 2006-01-02T15:04:05.999999999Z07:00 | **Y** |
| cache-control | Cache-Control header | string | no-cache | N |
| content-disposition | Content-Disposition header | string | inline | N |
| content-encoding | Content-Encoding header | string | gzip | N |
| content-language | Content-Language header | string | en-US | N |
| content-type | Content-Type header | string | text/plain | N |
| mtime | Time of last modification, read from rclone metadata | RFC 3339 | 2006-01-02T15:04:05.999999999Z07:00 | N |
| tier | Tier of the object | string | GLACIER | **Y** |
See the [metadata](/docs/#metadata) docs for more info.
## Backend commands
Here are the commands specific to the s3 backend.