b2: fix stats accounting for upload - fixes #602

This commit is contained in:
Nick Craig-Wood 2016-08-22 21:19:38 +01:00
parent 8a771450d2
commit 037a000cc8
2 changed files with 129 additions and 59 deletions

View File

@ -166,7 +166,7 @@ func (up *largeUpload) transferChunk(part int64, body []byte) error {
Method: "POST",
Absolute: true,
Path: upload.UploadURL,
Body: bytes.NewBuffer(body),
Body: fs.AccountPart(up.o, bytes.NewBuffer(body)),
ExtraHeaders: map[string]string{
"Authorization": upload.AuthorizationToken,
"X-Bz-Part-Number": fmt.Sprintf("%d", part),
@ -240,6 +240,7 @@ func (up *largeUpload) Upload() error {
errs := make(chan error, 1)
var wg sync.WaitGroup
var err error
fs.AccountByPart(up.o) // Cancel whole file accounting before reading
outer:
for part := int64(1); part <= up.parts; part++ {
// Check any errors

View File

@ -257,6 +257,8 @@ type Account struct {
avg ewma.MovingAverage // Moving average of last few measurements
closed bool // set if the file is closed
exit chan struct{} // channel that will be closed when transfer is finished
wholeFileDisabled bool // disables the whole file when doing parts
}
// NewAccount makes a Account reader for an object
@ -274,46 +276,56 @@ func NewAccount(in io.ReadCloser, obj Object) *Account {
return acc
}
func (file *Account) averageLoop() {
// disableWholeFileAccounting turns off the whole file accounting
func (acc *Account) disableWholeFileAccounting() {
acc.mu.Lock()
acc.wholeFileDisabled = true
acc.mu.Unlock()
}
// accountPart disables the whole file counter and returns an
// io.Reader to wrap a segment of the transfer.
func (acc *Account) accountPart(in io.Reader) io.Reader {
return newAccountStream(acc, in)
}
func (acc *Account) averageLoop() {
tick := time.NewTicker(time.Second)
defer tick.Stop()
for {
select {
case now := <-tick.C:
file.statmu.Lock()
acc.statmu.Lock()
// Add average of last second.
elapsed := now.Sub(file.lpTime).Seconds()
avg := float64(file.lpBytes) / elapsed
file.avg.Add(avg)
file.lpBytes = 0
file.lpTime = now
elapsed := now.Sub(acc.lpTime).Seconds()
avg := float64(acc.lpBytes) / elapsed
acc.avg.Add(avg)
acc.lpBytes = 0
acc.lpTime = now
// Unlock stats
file.statmu.Unlock()
case <-file.exit:
acc.statmu.Unlock()
case <-acc.exit:
return
}
}
}
// Read bytes from the object - see io.Reader
func (file *Account) Read(p []byte) (n int, err error) {
file.mu.Lock()
defer file.mu.Unlock()
// read bytes from the io.Reader passed in and account them
func (acc *Account) read(in io.Reader, p []byte) (n int, err error) {
// Set start time.
file.statmu.Lock()
if file.start.IsZero() {
file.start = time.Now()
acc.statmu.Lock()
if acc.start.IsZero() {
acc.start = time.Now()
}
file.statmu.Unlock()
acc.statmu.Unlock()
n, err = file.in.Read(p)
n, err = in.Read(p)
// Update Stats
file.statmu.Lock()
file.lpBytes += n
file.bytes += int64(n)
file.statmu.Unlock()
acc.statmu.Lock()
acc.lpBytes += n
acc.bytes += int64(n)
acc.statmu.Unlock()
Stats.Bytes(int64(n))
@ -324,69 +336,80 @@ func (file *Account) Read(p []byte) (n int, err error) {
return
}
// Read bytes from the object - see io.Reader
func (acc *Account) Read(p []byte) (n int, err error) {
acc.mu.Lock()
defer acc.mu.Unlock()
if acc.wholeFileDisabled {
// Don't account
return acc.in.Read(p)
}
return acc.read(acc.in, p)
}
// Progress returns bytes read as well as the size.
// Size can be <= 0 if the size is unknown.
func (file *Account) Progress() (bytes, size int64) {
if file == nil {
func (acc *Account) Progress() (bytes, size int64) {
if acc == nil {
return 0, 0
}
file.statmu.Lock()
acc.statmu.Lock()
if bytes > size {
size = 0
}
defer file.statmu.Unlock()
return file.bytes, file.size
defer acc.statmu.Unlock()
return acc.bytes, acc.size
}
// Speed returns the speed of the current file transfer
// in bytes per second, as well a an exponentially weighted moving average
// If no read has completed yet, 0 is returned for both values.
func (file *Account) Speed() (bps, current float64) {
if file == nil {
func (acc *Account) Speed() (bps, current float64) {
if acc == nil {
return 0, 0
}
file.statmu.Lock()
defer file.statmu.Unlock()
if file.bytes == 0 {
acc.statmu.Lock()
defer acc.statmu.Unlock()
if acc.bytes == 0 {
return 0, 0
}
// Calculate speed from first read.
total := float64(time.Now().Sub(file.start)) / float64(time.Second)
bps = float64(file.bytes) / total
current = file.avg.Value()
total := float64(time.Now().Sub(acc.start)) / float64(time.Second)
bps = float64(acc.bytes) / total
current = acc.avg.Value()
return
}
// ETA returns the ETA of the current operation,
// rounded to full seconds.
// If the ETA cannot be determined 'ok' returns false.
func (file *Account) ETA() (eta time.Duration, ok bool) {
if file == nil || file.size <= 0 {
func (acc *Account) ETA() (eta time.Duration, ok bool) {
if acc == nil || acc.size <= 0 {
return 0, false
}
file.statmu.Lock()
defer file.statmu.Unlock()
if file.bytes == 0 {
acc.statmu.Lock()
defer acc.statmu.Unlock()
if acc.bytes == 0 {
return 0, false
}
left := file.size - file.bytes
left := acc.size - acc.bytes
if left <= 0 {
return 0, true
}
avg := file.avg.Value()
avg := acc.avg.Value()
if avg <= 0 {
return 0, false
}
seconds := float64(left) / file.avg.Value()
seconds := float64(left) / acc.avg.Value()
return time.Duration(time.Second * time.Duration(int(seconds))), true
}
// String produces stats for this file
func (file *Account) String() string {
a, b := file.Progress()
avg, cur := file.Speed()
eta, etaok := file.ETA()
func (acc *Account) String() string {
a, b := acc.Progress()
avg, cur := acc.Speed()
eta, etaok := acc.ETA()
etas := "-"
if etaok {
if eta > 0 {
@ -395,7 +418,7 @@ func (file *Account) String() string {
etas = "0s"
}
}
name := []rune(file.name)
name := []rune(acc.name)
if len(name) > 45 {
where := len(name) - 42
name = append([]rune{'.', '.', '.'}, name[where:]...)
@ -407,17 +430,63 @@ func (file *Account) String() string {
}
// Close the object
func (file *Account) Close() error {
file.mu.Lock()
defer file.mu.Unlock()
if file.closed {
func (acc *Account) Close() error {
acc.mu.Lock()
defer acc.mu.Unlock()
if acc.closed {
return nil
}
file.closed = true
close(file.exit)
Stats.inProgress.clear(file.name)
return file.in.Close()
acc.closed = true
close(acc.exit)
Stats.inProgress.clear(acc.name)
return acc.in.Close()
}
// accountStream accounts a single io.Reader into a parent *Account
type accountStream struct {
acc *Account
in io.Reader
}
// newAccountStream makes a new accountStream for an in
func newAccountStream(acc *Account, in io.Reader) *accountStream {
return &accountStream{
acc: acc,
in: in,
}
}
// Read bytes from the object - see io.Reader
func (a *accountStream) Read(p []byte) (n int, err error) {
return a.acc.read(a.in, p)
}
// AccountByPart turns off whole file accounting
//
// Returns the current account or nil if not found
func AccountByPart(obj Object) *Account {
acc := Stats.inProgress.get(obj.Remote())
if acc == nil {
Debug(obj, "Didn't find object to account part transfer")
}
acc.disableWholeFileAccounting()
return acc
}
// AccountPart accounts for part of a transfer
//
// It disables the whole file counter and returns an io.Reader to wrap
// a segment of the transfer.
func AccountPart(obj Object, in io.Reader) io.Reader {
acc := AccountByPart(obj)
if acc == nil {
return in
}
return acc.accountPart(in)
}
// Check it satisfies the interface
var _ io.ReadCloser = &Account{}
var (
_ io.ReadCloser = &Account{}
_ io.Reader = &accountStream{}
)