diff --git a/fs/accounting/prometheus.go b/fs/accounting/prometheus.go index 385a94e62..cad792804 100644 --- a/fs/accounting/prometheus.go +++ b/fs/accounting/prometheus.go @@ -90,7 +90,7 @@ func (c *RcloneCollector) Collect(ch chan<- prometheus.Metric) { s.mu.RLock() ch <- prometheus.MustNewConstMetric(c.bytesTransferred, prometheus.CounterValue, float64(s.bytes)) - ch <- prometheus.MustNewConstMetric(c.transferSpeed, prometheus.GaugeValue, s.speed()) + ch <- prometheus.MustNewConstMetric(c.transferSpeed, prometheus.GaugeValue, s._speed()) ch <- prometheus.MustNewConstMetric(c.numOfErrors, prometheus.CounterValue, float64(s.errors)) ch <- prometheus.MustNewConstMetric(c.numOfCheckFiles, prometheus.CounterValue, float64(s.checks)) ch <- prometheus.MustNewConstMetric(c.transferredFiles, prometheus.CounterValue, float64(s.transfers)) diff --git a/fs/accounting/stats.go b/fs/accounting/stats.go index 665f03451..3d89defd4 100644 --- a/fs/accounting/stats.go +++ b/fs/accounting/stats.go @@ -139,10 +139,10 @@ func (s *StatsInfo) RemoteStats() (out rc.Params, err error) { return out, nil } -// speed returns the average speed of the transfer in bytes/second +// _speed returns the average speed of the transfer in bytes/second // // Call with lock held -func (s *StatsInfo) speed() float64 { +func (s *StatsInfo) _speed() float64 { return s.average.speed } @@ -213,8 +213,9 @@ func (trs timeRanges) total() (total time.Duration) { // Total duration is union of durations of all transfers belonging to this // object. +// // Needs to be protected by mutex. -func (s *StatsInfo) totalDuration() time.Duration { +func (s *StatsInfo) _totalDuration() time.Duration { // copy of s.oldTimeRanges with extra room for the current transfers timeRanges := make(timeRanges, len(s.oldTimeRanges), len(s.oldTimeRanges)+len(s.startedTransfers)) copy(timeRanges, s.oldTimeRanges) @@ -313,7 +314,7 @@ func (s *StatsInfo) calculateTransferStats() (ts transferStats) { // we take it off here to avoid double counting ts.totalBytes = s.transferQueueSize + s.bytes + transferringBytesTotal - transferringBytesDone ts.speed = s.average.speed - dt := s.totalDuration() + dt := s._totalDuration() ts.transferTime = dt.Seconds() return ts @@ -355,20 +356,33 @@ func (s *StatsInfo) averageLoop() { } } +// Start the average loop func (s *StatsInfo) startAverageLoop() { + s.mu.RLock() + defer s.mu.RUnlock() s.average.startOnce.Do(func() { s.average.stopped.Add(1) go s.averageLoop() }) } -func (s *StatsInfo) stopAverageLoop() { +// Stop the average loop +// +// Call with the mutex held +func (s *StatsInfo) _stopAverageLoop() { s.average.stopOnce.Do(func() { close(s.average.stop) s.average.stopped.Wait() }) } +// Stop the average loop +func (s *StatsInfo) stopAverageLoop() { + s.mu.RLock() + defer s.mu.RUnlock() + s._stopAverageLoop() +} + // String convert the StatsInfo to a string for printing func (s *StatsInfo) String() string { // NB if adding more stats in here, remember to add them into @@ -682,7 +696,7 @@ func (s *StatsInfo) ResetCounters() { s.startedTransfers = nil s.oldDuration = 0 - s.stopAverageLoop() + s._stopAverageLoop() s.average = averageValues{stop: make(chan bool)} } @@ -822,11 +836,11 @@ func (s *StatsInfo) AddTransfer(transfer *Transfer) { s.mu.Unlock() } -// removeTransfer removes a reference to the started transfer in +// _removeTransfer removes a reference to the started transfer in // position i. // // Must be called with the lock held -func (s *StatsInfo) removeTransfer(transfer *Transfer, i int) { +func (s *StatsInfo) _removeTransfer(transfer *Transfer, i int) { now := time.Now() // add finished transfer onto old time ranges @@ -858,7 +872,7 @@ func (s *StatsInfo) RemoveTransfer(transfer *Transfer) { s.mu.Lock() for i, tr := range s.startedTransfers { if tr == transfer { - s.removeTransfer(tr, i) + s._removeTransfer(tr, i) break } } @@ -876,7 +890,7 @@ func (s *StatsInfo) PruneTransfers() { if len(s.startedTransfers) > MaxCompletedTransfers+s.ci.Transfers { for i, tr := range s.startedTransfers { if tr.IsDone() { - s.removeTransfer(tr, i) + s._removeTransfer(tr, i) break } } diff --git a/fs/accounting/stats_test.go b/fs/accounting/stats_test.go index 773311824..35c22037f 100644 --- a/fs/accounting/stats_test.go +++ b/fs/accounting/stats_test.go @@ -157,7 +157,7 @@ func TestStatsTotalDuration(t *testing.T) { s.AddTransfer(tr1) s.mu.Lock() - total := s.totalDuration() + total := s._totalDuration() s.mu.Unlock() assert.Equal(t, 1, len(s.startedTransfers)) @@ -175,7 +175,7 @@ func TestStatsTotalDuration(t *testing.T) { s.AddTransfer(tr1) s.mu.Lock() - total := s.totalDuration() + total := s._totalDuration() s.mu.Unlock() assert.Equal(t, time.Since(time1)/time.Second, total/time.Second) @@ -213,7 +213,7 @@ func TestStatsTotalDuration(t *testing.T) { time.Sleep(time.Millisecond) s.mu.Lock() - total := s.totalDuration() + total := s._totalDuration() s.mu.Unlock() assert.Equal(t, time.Duration(30), total/time.Second) @@ -244,7 +244,7 @@ func TestStatsTotalDuration(t *testing.T) { }) s.mu.Lock() - total := s.totalDuration() + total := s._totalDuration() s.mu.Unlock() assert.Equal(t, startTime.Sub(time1)/time.Second, total/time.Second) @@ -449,7 +449,7 @@ func TestPruneTransfers(t *testing.T) { } s.mu.Lock() - assert.Equal(t, time.Duration(test.Transfers)*time.Second, s.totalDuration()) + assert.Equal(t, time.Duration(test.Transfers)*time.Second, s._totalDuration()) assert.Equal(t, test.Transfers, len(s.startedTransfers)) s.mu.Unlock() @@ -458,7 +458,7 @@ func TestPruneTransfers(t *testing.T) { } s.mu.Lock() - assert.Equal(t, time.Duration(test.Transfers)*time.Second, s.totalDuration()) + assert.Equal(t, time.Duration(test.Transfers)*time.Second, s._totalDuration()) assert.Equal(t, test.ExpectedStartedTransfers, len(s.startedTransfers)) s.mu.Unlock()