sftp: Fix slow downloads for long latency connections - fixes #1158

This was caused by using the sftp.File.Read method which resets the
streaming window after each call.  Replacing it with sftp.File.WriteTo
and an io.Pipe fixes the problem bringing the speed to the same as the
sftp binary.
This commit is contained in:
Nick Craig-Wood 2018-05-24 15:03:57 +01:00
parent 3a2248aa5f
commit d4213c0ac5
1 changed files with 38 additions and 13 deletions

View File

@ -125,12 +125,6 @@ type Object struct {
sha1sum *string // Cached SHA1 checksum
}
// ObjectReader holds the sftp.File interface to a remote SFTP file opened for reading
type ObjectReader struct {
object *Object
sftpFile *sftp.File
}
// readCurrentUser finds the current user name or "" if not found
func readCurrentUser() (userName string) {
usr, err := user.Current()
@ -877,15 +871,49 @@ func (o *Object) Storable() bool {
return o.mode.IsRegular()
}
// objectReader represents a file open for reading on the SFTP server
type objectReader struct {
sftpFile *sftp.File
pipeReader *io.PipeReader
done chan struct{}
}
func newObjectReader(sftpFile *sftp.File) *objectReader {
pipeReader, pipeWriter := io.Pipe()
file := &objectReader{
sftpFile: sftpFile,
pipeReader: pipeReader,
done: make(chan struct{}),
}
go func() {
// Use sftpFile.WriteTo to pump data so that it gets a
// chance to build the window up.
_, err := sftpFile.WriteTo(pipeWriter)
// Close the pipeWriter so the pipeReader fails with
// the same error or EOF if err == nil
_ = pipeWriter.CloseWithError(err)
// signal that we've finished
close(file.done)
}()
return file
}
// Read from a remote sftp file object reader
func (file *ObjectReader) Read(p []byte) (n int, err error) {
n, err = file.sftpFile.Read(p)
func (file *objectReader) Read(p []byte) (n int, err error) {
n, err = file.pipeReader.Read(p)
return n, err
}
// Close a reader of a remote sftp file
func (file *ObjectReader) Close() (err error) {
func (file *objectReader) Close() (err error) {
// Close the sftpFile - this will likely cause the WriteTo to error
err = file.sftpFile.Close()
// Close the pipeReader so writes to the pipeWriter fail
_ = file.pipeReader.Close()
// Wait for the background process to finish
<-file.done
return err
}
@ -919,10 +947,7 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) {
return nil, errors.Wrap(err, "Open Seek failed")
}
}
in = readers.NewLimitedReadCloser(&ObjectReader{
object: o,
sftpFile: sftpFile,
}, limit)
in = readers.NewLimitedReadCloser(newObjectReader(sftpFile), limit)
return in, nil
}