From ae3c73f610bf7356427613cf63a005e6556f1b55 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Tue, 16 Jan 2024 17:30:24 +0000 Subject: [PATCH] stats: fix race between ResetCounters and stopAverageLoop called from time.AfterFunc Before this change StatsInfo.ResetCounters() and stopAverageLoop() (when called from time.AfterFunc) could race on StatsInfo.average. This was because the deferred stopAverageLoop accessed StatsInfo.average without locking. For some reason this only ever happened on macOS. This caused the CI to fail on macOS thus causing the macOS builds not to appear. This commit fixes the problem with a bit of extra locking. It also renames all StatsInfo methods that should be called without the lock to start with an initial underscore as this is the convention we use elsewhere. Fixes #7567 --- fs/accounting/prometheus.go | 2 +- fs/accounting/stats.go | 34 ++++++++++++++++++++++++---------- fs/accounting/stats_test.go | 12 ++++++------ 3 files changed, 31 insertions(+), 17 deletions(-) diff --git a/fs/accounting/prometheus.go b/fs/accounting/prometheus.go index 385a94e62..cad792804 100644 --- a/fs/accounting/prometheus.go +++ b/fs/accounting/prometheus.go @@ -90,7 +90,7 @@ func (c *RcloneCollector) Collect(ch chan<- prometheus.Metric) { s.mu.RLock() ch <- prometheus.MustNewConstMetric(c.bytesTransferred, prometheus.CounterValue, float64(s.bytes)) - ch <- prometheus.MustNewConstMetric(c.transferSpeed, prometheus.GaugeValue, s.speed()) + ch <- prometheus.MustNewConstMetric(c.transferSpeed, prometheus.GaugeValue, s._speed()) ch <- prometheus.MustNewConstMetric(c.numOfErrors, prometheus.CounterValue, float64(s.errors)) ch <- prometheus.MustNewConstMetric(c.numOfCheckFiles, prometheus.CounterValue, float64(s.checks)) ch <- prometheus.MustNewConstMetric(c.transferredFiles, prometheus.CounterValue, float64(s.transfers)) diff --git a/fs/accounting/stats.go b/fs/accounting/stats.go index 665f03451..3d89defd4 100644 --- a/fs/accounting/stats.go +++ b/fs/accounting/stats.go @@ -139,10 +139,10 @@ func (s *StatsInfo) RemoteStats() (out rc.Params, err error) { return out, nil } -// speed returns the average speed of the transfer in bytes/second +// _speed returns the average speed of the transfer in bytes/second // // Call with lock held -func (s *StatsInfo) speed() float64 { +func (s *StatsInfo) _speed() float64 { return s.average.speed } @@ -213,8 +213,9 @@ func (trs timeRanges) total() (total time.Duration) { // Total duration is union of durations of all transfers belonging to this // object. +// // Needs to be protected by mutex. -func (s *StatsInfo) totalDuration() time.Duration { +func (s *StatsInfo) _totalDuration() time.Duration { // copy of s.oldTimeRanges with extra room for the current transfers timeRanges := make(timeRanges, len(s.oldTimeRanges), len(s.oldTimeRanges)+len(s.startedTransfers)) copy(timeRanges, s.oldTimeRanges) @@ -313,7 +314,7 @@ func (s *StatsInfo) calculateTransferStats() (ts transferStats) { // we take it off here to avoid double counting ts.totalBytes = s.transferQueueSize + s.bytes + transferringBytesTotal - transferringBytesDone ts.speed = s.average.speed - dt := s.totalDuration() + dt := s._totalDuration() ts.transferTime = dt.Seconds() return ts @@ -355,20 +356,33 @@ func (s *StatsInfo) averageLoop() { } } +// Start the average loop func (s *StatsInfo) startAverageLoop() { + s.mu.RLock() + defer s.mu.RUnlock() s.average.startOnce.Do(func() { s.average.stopped.Add(1) go s.averageLoop() }) } -func (s *StatsInfo) stopAverageLoop() { +// Stop the average loop +// +// Call with the mutex held +func (s *StatsInfo) _stopAverageLoop() { s.average.stopOnce.Do(func() { close(s.average.stop) s.average.stopped.Wait() }) } +// Stop the average loop +func (s *StatsInfo) stopAverageLoop() { + s.mu.RLock() + defer s.mu.RUnlock() + s._stopAverageLoop() +} + // String convert the StatsInfo to a string for printing func (s *StatsInfo) String() string { // NB if adding more stats in here, remember to add them into @@ -682,7 +696,7 @@ func (s *StatsInfo) ResetCounters() { s.startedTransfers = nil s.oldDuration = 0 - s.stopAverageLoop() + s._stopAverageLoop() s.average = averageValues{stop: make(chan bool)} } @@ -822,11 +836,11 @@ func (s *StatsInfo) AddTransfer(transfer *Transfer) { s.mu.Unlock() } -// removeTransfer removes a reference to the started transfer in +// _removeTransfer removes a reference to the started transfer in // position i. // // Must be called with the lock held -func (s *StatsInfo) removeTransfer(transfer *Transfer, i int) { +func (s *StatsInfo) _removeTransfer(transfer *Transfer, i int) { now := time.Now() // add finished transfer onto old time ranges @@ -858,7 +872,7 @@ func (s *StatsInfo) RemoveTransfer(transfer *Transfer) { s.mu.Lock() for i, tr := range s.startedTransfers { if tr == transfer { - s.removeTransfer(tr, i) + s._removeTransfer(tr, i) break } } @@ -876,7 +890,7 @@ func (s *StatsInfo) PruneTransfers() { if len(s.startedTransfers) > MaxCompletedTransfers+s.ci.Transfers { for i, tr := range s.startedTransfers { if tr.IsDone() { - s.removeTransfer(tr, i) + s._removeTransfer(tr, i) break } } diff --git a/fs/accounting/stats_test.go b/fs/accounting/stats_test.go index 773311824..35c22037f 100644 --- a/fs/accounting/stats_test.go +++ b/fs/accounting/stats_test.go @@ -157,7 +157,7 @@ func TestStatsTotalDuration(t *testing.T) { s.AddTransfer(tr1) s.mu.Lock() - total := s.totalDuration() + total := s._totalDuration() s.mu.Unlock() assert.Equal(t, 1, len(s.startedTransfers)) @@ -175,7 +175,7 @@ func TestStatsTotalDuration(t *testing.T) { s.AddTransfer(tr1) s.mu.Lock() - total := s.totalDuration() + total := s._totalDuration() s.mu.Unlock() assert.Equal(t, time.Since(time1)/time.Second, total/time.Second) @@ -213,7 +213,7 @@ func TestStatsTotalDuration(t *testing.T) { time.Sleep(time.Millisecond) s.mu.Lock() - total := s.totalDuration() + total := s._totalDuration() s.mu.Unlock() assert.Equal(t, time.Duration(30), total/time.Second) @@ -244,7 +244,7 @@ func TestStatsTotalDuration(t *testing.T) { }) s.mu.Lock() - total := s.totalDuration() + total := s._totalDuration() s.mu.Unlock() assert.Equal(t, startTime.Sub(time1)/time.Second, total/time.Second) @@ -449,7 +449,7 @@ func TestPruneTransfers(t *testing.T) { } s.mu.Lock() - assert.Equal(t, time.Duration(test.Transfers)*time.Second, s.totalDuration()) + assert.Equal(t, time.Duration(test.Transfers)*time.Second, s._totalDuration()) assert.Equal(t, test.Transfers, len(s.startedTransfers)) s.mu.Unlock() @@ -458,7 +458,7 @@ func TestPruneTransfers(t *testing.T) { } s.mu.Lock() - assert.Equal(t, time.Duration(test.Transfers)*time.Second, s.totalDuration()) + assert.Equal(t, time.Duration(test.Transfers)*time.Second, s._totalDuration()) assert.Equal(t, test.ExpectedStartedTransfers, len(s.startedTransfers)) s.mu.Unlock()