fs: use atomic types

This commit is contained in:
Roberto Ricci 2023-08-18 16:56:26 +02:00 committed by Nick Craig-Wood
parent 50d0597d56
commit 01a155fb00
5 changed files with 62 additions and 62 deletions

View File

@ -51,11 +51,11 @@ type checkMarch struct {
ioMu sync.Mutex ioMu sync.Mutex
wg sync.WaitGroup wg sync.WaitGroup
tokens chan struct{} tokens chan struct{}
differences int32 differences atomic.Int32
noHashes int32 noHashes atomic.Int32
srcFilesMissing int32 srcFilesMissing atomic.Int32
dstFilesMissing int32 dstFilesMissing atomic.Int32
matches int32 matches atomic.Int32
opt CheckOpt opt CheckOpt
} }
@ -83,8 +83,8 @@ func (c *checkMarch) DstOnly(dst fs.DirEntry) (recurse bool) {
err := fmt.Errorf("file not in %v", c.opt.Fsrc) err := fmt.Errorf("file not in %v", c.opt.Fsrc)
fs.Errorf(dst, "%v", err) fs.Errorf(dst, "%v", err)
_ = fs.CountError(err) _ = fs.CountError(err)
atomic.AddInt32(&c.differences, 1) c.differences.Add(1)
atomic.AddInt32(&c.srcFilesMissing, 1) c.srcFilesMissing.Add(1)
c.report(dst, c.opt.MissingOnSrc, '-') c.report(dst, c.opt.MissingOnSrc, '-')
case fs.Directory: case fs.Directory:
// Do the same thing to the entire contents of the directory // Do the same thing to the entire contents of the directory
@ -105,8 +105,8 @@ func (c *checkMarch) SrcOnly(src fs.DirEntry) (recurse bool) {
err := fmt.Errorf("file not in %v", c.opt.Fdst) err := fmt.Errorf("file not in %v", c.opt.Fdst)
fs.Errorf(src, "%v", err) fs.Errorf(src, "%v", err)
_ = fs.CountError(err) _ = fs.CountError(err)
atomic.AddInt32(&c.differences, 1) c.differences.Add(1)
atomic.AddInt32(&c.dstFilesMissing, 1) c.dstFilesMissing.Add(1)
c.report(src, c.opt.MissingOnDst, '+') c.report(src, c.opt.MissingOnDst, '+')
case fs.Directory: case fs.Directory:
// Do the same thing to the entire contents of the directory // Do the same thing to the entire contents of the directory
@ -157,16 +157,16 @@ func (c *checkMarch) Match(ctx context.Context, dst, src fs.DirEntry) (recurse b
_ = fs.CountError(err) _ = fs.CountError(err)
c.report(src, c.opt.Error, '!') c.report(src, c.opt.Error, '!')
} else if differ { } else if differ {
atomic.AddInt32(&c.differences, 1) c.differences.Add(1)
err := errors.New("files differ") err := errors.New("files differ")
// the checkFn has already logged the reason // the checkFn has already logged the reason
_ = fs.CountError(err) _ = fs.CountError(err)
c.report(src, c.opt.Differ, '*') c.report(src, c.opt.Differ, '*')
} else { } else {
atomic.AddInt32(&c.matches, 1) c.matches.Add(1)
c.report(src, c.opt.Match, '=') c.report(src, c.opt.Match, '=')
if noHash { if noHash {
atomic.AddInt32(&c.noHashes, 1) c.noHashes.Add(1)
fs.Debugf(dstX, "OK - could not check hash") fs.Debugf(dstX, "OK - could not check hash")
} else { } else {
fs.Debugf(dstX, "OK") fs.Debugf(dstX, "OK")
@ -177,8 +177,8 @@ func (c *checkMarch) Match(ctx context.Context, dst, src fs.DirEntry) (recurse b
err := fmt.Errorf("is file on %v but directory on %v", c.opt.Fsrc, c.opt.Fdst) err := fmt.Errorf("is file on %v but directory on %v", c.opt.Fsrc, c.opt.Fdst)
fs.Errorf(src, "%v", err) fs.Errorf(src, "%v", err)
_ = fs.CountError(err) _ = fs.CountError(err)
atomic.AddInt32(&c.differences, 1) c.differences.Add(1)
atomic.AddInt32(&c.dstFilesMissing, 1) c.dstFilesMissing.Add(1)
c.report(src, c.opt.MissingOnDst, '+') c.report(src, c.opt.MissingOnDst, '+')
} }
case fs.Directory: case fs.Directory:
@ -190,8 +190,8 @@ func (c *checkMarch) Match(ctx context.Context, dst, src fs.DirEntry) (recurse b
err := fmt.Errorf("is file on %v but directory on %v", c.opt.Fdst, c.opt.Fsrc) err := fmt.Errorf("is file on %v but directory on %v", c.opt.Fdst, c.opt.Fsrc)
fs.Errorf(dst, "%v", err) fs.Errorf(dst, "%v", err)
_ = fs.CountError(err) _ = fs.CountError(err)
atomic.AddInt32(&c.differences, 1) c.differences.Add(1)
atomic.AddInt32(&c.srcFilesMissing, 1) c.srcFilesMissing.Add(1)
c.report(dst, c.opt.MissingOnSrc, '-') c.report(dst, c.opt.MissingOnSrc, '-')
default: default:
@ -235,33 +235,33 @@ func CheckFn(ctx context.Context, opt *CheckOpt) error {
} }
func (c *checkMarch) reportResults(ctx context.Context, err error) error { func (c *checkMarch) reportResults(ctx context.Context, err error) error {
if c.dstFilesMissing > 0 { if c.dstFilesMissing.Load() > 0 {
fs.Logf(c.opt.Fdst, "%d files missing", c.dstFilesMissing) fs.Logf(c.opt.Fdst, "%d files missing", c.dstFilesMissing.Load())
} }
if c.srcFilesMissing > 0 { if c.srcFilesMissing.Load() > 0 {
entity := "files" entity := "files"
if c.opt.Fsrc == nil { if c.opt.Fsrc == nil {
entity = "hashes" entity = "hashes"
} }
fs.Logf(c.opt.Fsrc, "%d %s missing", c.srcFilesMissing, entity) fs.Logf(c.opt.Fsrc, "%d %s missing", c.srcFilesMissing.Load(), entity)
} }
fs.Logf(c.opt.Fdst, "%d differences found", accounting.Stats(ctx).GetErrors()) fs.Logf(c.opt.Fdst, "%d differences found", accounting.Stats(ctx).GetErrors())
if errs := accounting.Stats(ctx).GetErrors(); errs > 0 { if errs := accounting.Stats(ctx).GetErrors(); errs > 0 {
fs.Logf(c.opt.Fdst, "%d errors while checking", errs) fs.Logf(c.opt.Fdst, "%d errors while checking", errs)
} }
if c.noHashes > 0 { if c.noHashes.Load() > 0 {
fs.Logf(c.opt.Fdst, "%d hashes could not be checked", c.noHashes) fs.Logf(c.opt.Fdst, "%d hashes could not be checked", c.noHashes.Load())
} }
if c.matches > 0 { if c.matches.Load() > 0 {
fs.Logf(c.opt.Fdst, "%d matching files", c.matches) fs.Logf(c.opt.Fdst, "%d matching files", c.matches.Load())
} }
if err != nil { if err != nil {
return err return err
} }
if c.differences > 0 { if c.differences.Load() > 0 {
// Return an already counted error so we don't double count this error too // Return an already counted error so we don't double count this error too
err = fserrors.FsError(fmt.Errorf("%d differences found", c.differences)) err = fserrors.FsError(fmt.Errorf("%d differences found", c.differences.Load()))
fserrors.Count(err) fserrors.Count(err)
return err return err
} }
@ -430,7 +430,7 @@ func CheckSum(ctx context.Context, fsrc, fsum fs.Fs, sumFile string, hashType ha
if lastErr == nil { if lastErr == nil {
lastErr = err lastErr = err
} }
atomic.AddInt32(&c.dstFilesMissing, 1) c.dstFilesMissing.Add(1)
c.reportFilename(filename, opt.MissingOnDst, '+') c.reportFilename(filename, opt.MissingOnDst, '+')
} }
@ -457,8 +457,8 @@ func (c *checkMarch) checkSum(ctx context.Context, obj fs.Object, download bool,
err = errors.New("sum not found") err = errors.New("sum not found")
_ = fs.CountError(err) _ = fs.CountError(err)
fs.Errorf(obj, "%v", err) fs.Errorf(obj, "%v", err)
atomic.AddInt32(&c.differences, 1) c.differences.Add(1)
atomic.AddInt32(&c.srcFilesMissing, 1) c.srcFilesMissing.Add(1)
c.report(obj, c.opt.MissingOnSrc, '-') c.report(obj, c.opt.MissingOnSrc, '-')
return return
} }
@ -515,12 +515,12 @@ func (c *checkMarch) matchSum(ctx context.Context, sumHash, objHash string, obj
case objHash == "": case objHash == "":
fs.Debugf(nil, "%v = %s (sum)", hashType, sumHash) fs.Debugf(nil, "%v = %s (sum)", hashType, sumHash)
fs.Debugf(obj, "%v - could not check hash (%v)", hashType, c.opt.Fdst) fs.Debugf(obj, "%v - could not check hash (%v)", hashType, c.opt.Fdst)
atomic.AddInt32(&c.noHashes, 1) c.noHashes.Add(1)
atomic.AddInt32(&c.matches, 1) c.matches.Add(1)
c.report(obj, c.opt.Match, '=') c.report(obj, c.opt.Match, '=')
case objHash == sumHash: case objHash == sumHash:
fs.Debugf(obj, "%v = %s OK", hashType, sumHash) fs.Debugf(obj, "%v = %s OK", hashType, sumHash)
atomic.AddInt32(&c.matches, 1) c.matches.Add(1)
c.report(obj, c.opt.Match, '=') c.report(obj, c.opt.Match, '=')
default: default:
err = errors.New("files differ") err = errors.New("files differ")
@ -528,7 +528,7 @@ func (c *checkMarch) matchSum(ctx context.Context, sumHash, objHash string, obj
fs.Debugf(nil, "%v = %s (sum)", hashType, sumHash) fs.Debugf(nil, "%v = %s (sum)", hashType, sumHash)
fs.Debugf(obj, "%v = %s (%v)", hashType, objHash, c.opt.Fdst) fs.Debugf(obj, "%v = %s (%v)", hashType, objHash, c.opt.Fdst)
fs.Errorf(obj, "%v", err) fs.Errorf(obj, "%v", err)
atomic.AddInt32(&c.differences, 1) c.differences.Add(1)
c.report(obj, c.opt.Differ, '*') c.report(obj, c.opt.Differ, '*')
} }
} }

View File

@ -764,8 +764,8 @@ func DeleteFilesWithBackupDir(ctx context.Context, toBeDeleted fs.ObjectsChan, b
var wg sync.WaitGroup var wg sync.WaitGroup
ci := fs.GetConfig(ctx) ci := fs.GetConfig(ctx)
wg.Add(ci.Checkers) wg.Add(ci.Checkers)
var errorCount int32 var errorCount atomic.Int32
var fatalErrorCount int32 var fatalErrorCount atomic.Int32
for i := 0; i < ci.Checkers; i++ { for i := 0; i < ci.Checkers; i++ {
go func() { go func() {
@ -773,10 +773,10 @@ func DeleteFilesWithBackupDir(ctx context.Context, toBeDeleted fs.ObjectsChan, b
for dst := range toBeDeleted { for dst := range toBeDeleted {
err := DeleteFileWithBackupDir(ctx, dst, backupDir) err := DeleteFileWithBackupDir(ctx, dst, backupDir)
if err != nil { if err != nil {
atomic.AddInt32(&errorCount, 1) errorCount.Add(1)
if fserrors.IsFatalError(err) { if fserrors.IsFatalError(err) {
fs.Errorf(nil, "Got fatal error on delete: %s", err) fs.Errorf(nil, "Got fatal error on delete: %s", err)
atomic.AddInt32(&fatalErrorCount, 1) fatalErrorCount.Add(1)
return return
} }
} }
@ -785,9 +785,9 @@ func DeleteFilesWithBackupDir(ctx context.Context, toBeDeleted fs.ObjectsChan, b
} }
fs.Debugf(nil, "Waiting for deletions to finish") fs.Debugf(nil, "Waiting for deletions to finish")
wg.Wait() wg.Wait()
if errorCount > 0 { if errorCount.Load() > 0 {
err := fmt.Errorf("failed to delete %d files", errorCount) err := fmt.Errorf("failed to delete %d files", errorCount.Load())
if fatalErrorCount > 0 { if fatalErrorCount.Load() > 0 {
return fserrors.FatalError(err) return fserrors.FatalError(err)
} }
return err return err

View File

@ -122,7 +122,7 @@ type Jobs struct {
var ( var (
running = newJobs() running = newJobs()
jobID = int64(0) jobID atomic.Int64
executeID = uuid.New().String() executeID = uuid.New().String()
) )
@ -141,7 +141,7 @@ func SetOpt(opt *rc.Options) {
// SetInitialJobID allows for setting jobID before starting any jobs. // SetInitialJobID allows for setting jobID before starting any jobs.
func SetInitialJobID(id int64) { func SetInitialJobID(id int64) {
if !atomic.CompareAndSwapInt64(&jobID, 0, id) { if !jobID.CompareAndSwap(0, id) {
panic("Setting jobID is only possible before starting any jobs") panic("Setting jobID is only possible before starting any jobs")
} }
} }
@ -264,7 +264,7 @@ var jobKey = jobKeyType{}
// NewJob creates a Job and executes it, possibly in the background if _async is set // NewJob creates a Job and executes it, possibly in the background if _async is set
func (jobs *Jobs) NewJob(ctx context.Context, fn rc.Func, in rc.Params) (job *Job, out rc.Params, err error) { func (jobs *Jobs) NewJob(ctx context.Context, fn rc.Func, in rc.Params) (job *Job, out rc.Params, err error) {
id := atomic.AddInt64(&jobID, 1) id := jobID.Add(1)
in = in.Copy() // copy input so we can change it in = in.Copy() // copy input so we can change it
ctx, isAsync, err := getAsync(ctx, in) ctx, isAsync, err := getAsync(ctx, in)

View File

@ -229,7 +229,7 @@ func TestJobRunPanic(t *testing.T) {
func TestJobsNewJob(t *testing.T) { func TestJobsNewJob(t *testing.T) {
ctx := context.Background() ctx := context.Background()
jobID = 0 jobID.Store(0)
jobs := newJobs() jobs := newJobs()
job, out, err := jobs.NewJob(ctx, noopFn, rc.Params{"_async": true}) job, out, err := jobs.NewJob(ctx, noopFn, rc.Params{"_async": true})
require.NoError(t, err) require.NoError(t, err)
@ -241,7 +241,7 @@ func TestJobsNewJob(t *testing.T) {
func TestStartJob(t *testing.T) { func TestStartJob(t *testing.T) {
ctx := context.Background() ctx := context.Background()
jobID = 0 jobID.Store(0)
job, out, err := NewJob(ctx, longFn, rc.Params{"_async": true}) job, out, err := NewJob(ctx, longFn, rc.Params{"_async": true})
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, rc.Params{"jobid": int64(1)}, out) assert.Equal(t, rc.Params{"jobid": int64(1)}, out)
@ -249,7 +249,7 @@ func TestStartJob(t *testing.T) {
} }
func TestExecuteJob(t *testing.T) { func TestExecuteJob(t *testing.T) {
jobID = 0 jobID.Store(0)
job, out, err := NewJob(context.Background(), shortFn, rc.Params{}) job, out, err := NewJob(context.Background(), shortFn, rc.Params{})
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, int64(1), job.ID) assert.Equal(t, int64(1), job.ID)
@ -258,7 +258,7 @@ func TestExecuteJob(t *testing.T) {
func TestExecuteJobWithConfig(t *testing.T) { func TestExecuteJobWithConfig(t *testing.T) {
ctx := context.Background() ctx := context.Background()
jobID = 0 jobID.Store(0)
called := false called := false
jobFn := func(ctx context.Context, in rc.Params) (rc.Params, error) { jobFn := func(ctx context.Context, in rc.Params) (rc.Params, error) {
ci := fs.GetConfig(ctx) ci := fs.GetConfig(ctx)
@ -274,7 +274,7 @@ func TestExecuteJobWithConfig(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, true, called) assert.Equal(t, true, called)
// Retest with string parameter // Retest with string parameter
jobID = 0 jobID.Store(0)
called = false called = false
_, _, err = NewJob(ctx, jobFn, rc.Params{ _, _, err = NewJob(ctx, jobFn, rc.Params{
"_config": `{"BufferSize": "42M"}`, "_config": `{"BufferSize": "42M"}`,
@ -289,7 +289,7 @@ func TestExecuteJobWithConfig(t *testing.T) {
func TestExecuteJobWithFilter(t *testing.T) { func TestExecuteJobWithFilter(t *testing.T) {
ctx := context.Background() ctx := context.Background()
called := false called := false
jobID = 0 jobID.Store(0)
jobFn := func(ctx context.Context, in rc.Params) (rc.Params, error) { jobFn := func(ctx context.Context, in rc.Params) (rc.Params, error) {
fi := filter.GetConfig(ctx) fi := filter.GetConfig(ctx)
assert.Equal(t, fs.SizeSuffix(1024), fi.Opt.MaxSize) assert.Equal(t, fs.SizeSuffix(1024), fi.Opt.MaxSize)
@ -309,7 +309,7 @@ func TestExecuteJobWithFilter(t *testing.T) {
func TestExecuteJobWithGroup(t *testing.T) { func TestExecuteJobWithGroup(t *testing.T) {
ctx := context.Background() ctx := context.Background()
jobID = 0 jobID.Store(0)
called := false called := false
jobFn := func(ctx context.Context, in rc.Params) (rc.Params, error) { jobFn := func(ctx context.Context, in rc.Params) (rc.Params, error) {
called = true called = true
@ -327,7 +327,7 @@ func TestExecuteJobWithGroup(t *testing.T) {
func TestExecuteJobErrorPropagation(t *testing.T) { func TestExecuteJobErrorPropagation(t *testing.T) {
ctx := context.Background() ctx := context.Background()
jobID = 0 jobID.Store(0)
testErr := errors.New("test error") testErr := errors.New("test error")
errorFn := func(ctx context.Context, in rc.Params) (out rc.Params, err error) { errorFn := func(ctx context.Context, in rc.Params) (out rc.Params, err error) {
@ -339,7 +339,7 @@ func TestExecuteJobErrorPropagation(t *testing.T) {
func TestRcJobStatus(t *testing.T) { func TestRcJobStatus(t *testing.T) {
ctx := context.Background() ctx := context.Background()
jobID = 0 jobID.Store(0)
_, _, err := NewJob(ctx, longFn, rc.Params{"_async": true}) _, _, err := NewJob(ctx, longFn, rc.Params{"_async": true})
assert.NoError(t, err) assert.NoError(t, err)
@ -367,7 +367,7 @@ func TestRcJobStatus(t *testing.T) {
func TestRcJobList(t *testing.T) { func TestRcJobList(t *testing.T) {
ctx := context.Background() ctx := context.Background()
jobID = 0 jobID.Store(0)
_, _, err := NewJob(ctx, longFn, rc.Params{"_async": true}) _, _, err := NewJob(ctx, longFn, rc.Params{"_async": true})
assert.NoError(t, err) assert.NoError(t, err)
@ -396,7 +396,7 @@ func TestRcJobList(t *testing.T) {
func TestRcAsyncJobStop(t *testing.T) { func TestRcAsyncJobStop(t *testing.T) {
ctx := context.Background() ctx := context.Background()
jobID = 0 jobID.Store(0)
_, _, err := NewJob(ctx, ctxFn, rc.Params{"_async": true}) _, _, err := NewJob(ctx, ctxFn, rc.Params{"_async": true})
assert.NoError(t, err) assert.NoError(t, err)
@ -434,7 +434,7 @@ func TestRcAsyncJobStop(t *testing.T) {
func TestRcSyncJobStop(t *testing.T) { func TestRcSyncJobStop(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
go func() { go func() {
jobID = 0 jobID.Store(0)
job, out, err := NewJob(ctx, ctxFn, rc.Params{}) job, out, err := NewJob(ctx, ctxFn, rc.Params{})
assert.Error(t, err) assert.Error(t, err)
assert.Equal(t, int64(1), job.ID) assert.Equal(t, int64(1), job.ID)
@ -477,7 +477,7 @@ func TestRcSyncJobStop(t *testing.T) {
func TestRcJobStopGroup(t *testing.T) { func TestRcJobStopGroup(t *testing.T) {
ctx := context.Background() ctx := context.Background()
jobID = 0 jobID.Store(0)
_, _, err := NewJob(ctx, ctxFn, rc.Params{ _, _, err := NewJob(ctx, ctxFn, rc.Params{
"_async": true, "_async": true,
"_group": "myparty", "_group": "myparty",
@ -518,7 +518,7 @@ func TestRcJobStopGroup(t *testing.T) {
} }
func TestOnFinish(t *testing.T) { func TestOnFinish(t *testing.T) {
jobID = 0 jobID.Store(0)
done := make(chan struct{}) done := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
job, _, err := NewJob(ctx, ctxParmFn(ctx, false), rc.Params{"_async": true}) job, _, err := NewJob(ctx, ctxParmFn(ctx, false), rc.Params{"_async": true})
@ -538,7 +538,7 @@ func TestOnFinish(t *testing.T) {
} }
func TestOnFinishAlreadyFinished(t *testing.T) { func TestOnFinishAlreadyFinished(t *testing.T) {
jobID = 0 jobID.Store(0)
done := make(chan struct{}) done := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()

View File

@ -112,7 +112,7 @@ func TestPipeConcurrent(t *testing.T) {
obj1 := mockobject.New("potato").WithContent([]byte("hello"), mockobject.SeekModeNone) obj1 := mockobject.New("potato").WithContent([]byte("hello"), mockobject.SeekModeNone)
pair1 := fs.ObjectPair{Src: obj1, Dst: nil} pair1 := fs.ObjectPair{Src: obj1, Dst: nil}
ctx := context.Background() ctx := context.Background()
var count int64 var count atomic.Int64
for j := 0; j < readWriters; j++ { for j := 0; j < readWriters; j++ {
wg.Add(2) wg.Add(2)
@ -123,7 +123,7 @@ func TestPipeConcurrent(t *testing.T) {
pair2, ok := p.Get(ctx) pair2, ok := p.Get(ctx)
assert.Equal(t, pair1, pair2) assert.Equal(t, pair1, pair2)
assert.Equal(t, true, ok) assert.Equal(t, true, ok)
atomic.AddInt64(&count, -1) count.Add(-1)
} }
}() }()
go func() { go func() {
@ -132,13 +132,13 @@ func TestPipeConcurrent(t *testing.T) {
// Put an object // Put an object
ok := p.Put(ctx, pair1) ok := p.Put(ctx, pair1)
assert.Equal(t, true, ok) assert.Equal(t, true, ok)
atomic.AddInt64(&count, 1) count.Add(1)
} }
}() }()
} }
wg.Wait() wg.Wait()
assert.Equal(t, int64(0), count) assert.Equal(t, int64(0), count.Load())
} }
func TestPipeOrderBy(t *testing.T) { func TestPipeOrderBy(t *testing.T) {