From 4a382c09ecab5cee85726d7b4215bb502b7c65b9 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Thu, 16 Jun 2022 12:08:11 +0100 Subject: [PATCH] mount: run tests in a subprocess to fix deadlock - fixes #3259 Before this change we ran the tests and the mount in the same process. This could cause deadlocks and often did, and made the mount tests very unreliable. This fixes the problem by running the mount in a seperate process and commanding it via a pipe over stdin/stdout. --- vfs/vfstest/dir.go | 8 +- vfs/vfstest/fs.go | 190 +++++++------------------- vfs/vfstest/submount.go | 276 ++++++++++++++++++++++++++++++++++++++ vfs/vfstest/write.go | 4 +- vfs/vfstest/write_unix.go | 2 +- 5 files changed, 327 insertions(+), 153 deletions(-) create mode 100644 vfs/vfstest/submount.go diff --git a/vfs/vfstest/dir.go b/vfs/vfstest/dir.go index 8c6273a1c..0cf02ed0f 100644 --- a/vfs/vfstest/dir.go +++ b/vfs/vfstest/dir.go @@ -5,7 +5,6 @@ import ( "testing" "time" - "github.com/rclone/rclone/fs" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -175,15 +174,12 @@ func TestDirCacheFlush(t *testing.T) { err := run.fremote.Mkdir(context.Background(), "dir/subdir") require.NoError(t, err) - root, err := run.vfs.Root() - require.NoError(t, err) - // expect newly created "subdir" on remote to not show up - root.ForgetPath("otherdir", fs.EntryDirectory) + run.forget("otherdir") run.readLocal(t, localDm, "") assert.Equal(t, dm, localDm, "expected vs fuse mount") - root.ForgetPath("dir", fs.EntryDirectory) + run.forget("dir") dm = newDirMap("otherdir/|otherdir/file 1|dir/|dir/file 1|dir/subdir/") run.readLocal(t, localDm, "") assert.Equal(t, dm, localDm, "expected vs fuse mount") diff --git a/vfs/vfstest/fs.go b/vfs/vfstest/fs.go index bb9a61fc5..6781fbe31 100644 --- a/vfs/vfstest/fs.go +++ b/vfs/vfstest/fs.go @@ -3,11 +3,11 @@ package vfstest import ( + "bufio" "context" "flag" "fmt" "io" - "io/ioutil" "log" "os" "os/exec" @@ -16,6 +16,7 @@ import ( "reflect" "runtime" "strings" + "sync" "testing" "time" @@ -24,8 +25,6 @@ import ( "github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs/walk" "github.com/rclone/rclone/fstest" - "github.com/rclone/rclone/lib/file" - "github.com/rclone/rclone/vfs" "github.com/rclone/rclone/vfs/vfscommon" "github.com/rclone/rclone/vfs/vfsflags" "github.com/stretchr/testify/assert" @@ -36,16 +35,19 @@ const ( waitForWritersDelay = 30 * time.Second // time to wait for existing writers ) -var ( - mountFn mountlib.MountFn -) - // RunTests runs all the tests against all the VFS cache modes // -// If useVFS is set then it runs the tests against a VFS rather than amount -func RunTests(t *testing.T, useVFS bool, fn mountlib.MountFn) { - mountFn = fn +// If useVFS is set then it runs the tests against a VFS rather than a +// mount +// +// If useVFS is not set then it runs the mount in a subprocess in +// order to avoid kernel deadlocks. +func RunTests(t *testing.T, useVFS bool, mountFn mountlib.MountFn) { flag.Parse() + if isSubProcess() { + startMount(mountFn, useVFS, *runMount) + return + } tests := []struct { cacheMode vfscommon.CacheMode writeBack time.Duration @@ -56,9 +58,11 @@ func RunTests(t *testing.T, useVFS bool, fn mountlib.MountFn) { {cacheMode: vfscommon.CacheModeFull}, {cacheMode: vfscommon.CacheModeFull, writeBack: 100 * time.Millisecond}, } - run = newRun(useVFS) for _, test := range tests { - run.cacheMode(test.cacheMode, test.writeBack) + vfsOpt := vfsflags.Opt + vfsOpt.CacheMode = test.cacheMode + vfsOpt.WriteBack = test.writeBack + run = newRun(useVFS, &vfsOpt, mountFn) what := fmt.Sprintf("CacheMode=%v", test.cacheMode) if test.writeBack > 0 { what += fmt.Sprintf(",WriteBack=%v", test.writeBack) @@ -93,24 +97,29 @@ func RunTests(t *testing.T, useVFS bool, fn mountlib.MountFn) { t.Run("TestWriteFileAppend", TestWriteFileAppend) }) log.Printf("Finished test run with %s (ok=%v)", what, ok) + run.Finalise() if !ok { break } } - run.Finalise() } // Run holds the remotes for a test run type Run struct { os Oser - vfs *vfs.VFS + vfsOpt *vfscommon.Options useVFS bool // set if we are testing a VFS not a mount - mnt *mountlib.MountPoint mountPath string fremote fs.Fs fremoteName string cleanRemote func() skip bool + // For controlling the subprocess running the mount + cmdMu sync.Mutex + cmd *exec.Cmd + in io.ReadCloser + out io.WriteCloser + scanner *bufio.Scanner } // run holds the master Run data @@ -122,10 +131,12 @@ var run *Run // r.fremote is an empty remote Fs // // Finalise() will tidy them away when done. -func newRun(useVFS bool) *Run { +func newRun(useVFS bool, vfsOpt *vfscommon.Options, mountFn mountlib.MountFn) *Run { r := &Run{ useVFS: useVFS, + vfsOpt: vfsOpt, } + r.vfsOpt.Init() fstest.Initialise() var err error @@ -139,118 +150,10 @@ func newRun(useVFS bool) *Run { log.Fatalf("Failed to open mkdir %q: %v", *fstest.RemoteName, err) } - if !r.useVFS { - r.mountPath = findMountPath() - } - // Mount it up - r.mount() - + r.startMountSubProcess() return r } -func findMountPath() string { - if runtime.GOOS != "windows" { - mountPath, err := ioutil.TempDir("", "rclonefs-mount") - if err != nil { - log.Fatalf("Failed to create mount dir: %v", err) - } - return mountPath - } - - // Find a free drive letter - letter := file.FindUnusedDriveLetter() - drive := "" - if letter == 0 { - log.Fatalf("Couldn't find free drive letter for test") - } else { - drive = string(letter) + ":" - } - return drive -} - -func (r *Run) mount() { - log.Printf("mount %q %q", r.fremote, r.mountPath) - var err error - r.mnt = mountlib.NewMountPoint(mountFn, r.mountPath, r.fremote, &mountlib.Opt, &vfsflags.Opt) - - _, err = r.mnt.Mount() - if err != nil { - log.Printf("mount FAILED: %v", err) - r.skip = true - } else { - log.Printf("mount OK") - } - r.vfs = r.mnt.VFS - if r.useVFS { - r.os = vfsOs{r.vfs} - } else { - r.os = realOs{} - } - -} - -func (r *Run) umount() { - if r.skip { - log.Printf("FUSE not found so skipping umount") - return - } - /* - log.Printf("Calling fusermount -u %q", r.mountPath) - err := exec.Command("fusermount", "-u", r.mountPath).Run() - if err != nil { - log.Printf("fusermount failed: %v", err) - } - */ - log.Printf("Unmounting %q", r.mountPath) - err := r.mnt.Unmount() - if err != nil { - log.Printf("signal to umount failed - retrying: %v", err) - time.Sleep(3 * time.Second) - err = r.mnt.Unmount() - } - if err != nil { - log.Fatalf("signal to umount failed: %v", err) - } - log.Printf("Waiting for umount") - err = <-r.mnt.ErrChan - if err != nil { - log.Fatalf("umount failed: %v", err) - } - - // Cleanup the VFS cache - umount has called Shutdown - err = r.vfs.CleanUp() - if err != nil { - log.Printf("Failed to cleanup the VFS cache: %v", err) - } -} - -// cacheMode flushes the VFS and changes the CacheMode and the writeBack time -func (r *Run) cacheMode(cacheMode vfscommon.CacheMode, writeBack time.Duration) { - if r.skip { - log.Printf("FUSE not found so skipping cacheMode") - return - } - // Wait for writers to finish - r.vfs.WaitForWriters(waitForWritersDelay) - // Empty and remake the remote - r.cleanRemote() - err := r.fremote.Mkdir(context.Background(), "") - if err != nil { - log.Fatalf("Failed to open mkdir %q: %v", *fstest.RemoteName, err) - } - // Empty the cache - err = r.vfs.CleanUp() - if err != nil { - log.Printf("Failed to cleanup the VFS cache: %v", err) - } - // Reset the cache mode - r.vfs.SetCacheMode(cacheMode) - r.vfs.Opt.WriteBack = writeBack - // Flush the directory cache - r.vfs.FlushDirCache() - -} - func (r *Run) skipIfNoFUSE(t *testing.T) { if r.skip { t.Skip("FUSE not found so skipping test") @@ -265,11 +168,15 @@ func (r *Run) skipIfVFS(t *testing.T) { // Finalise cleans the remote and unmounts func (r *Run) Finalise() { - r.umount() + if !r.useVFS { + r.sendMountCommand("exit") + _, err := r.cmd.Process.Wait() + if err != nil { + log.Fatalf("mount sub process failed: %v", err) + } + } r.cleanRemote() - if r.useVFS { - // FIXME - } else { + if !r.useVFS { err := os.RemoveAll(r.mountPath) if err != nil { log.Printf("Failed to clean mountPath %q: %v", r.mountPath, err) @@ -284,9 +191,9 @@ func (r *Run) path(filePath string) string { } // return windows drive letter root as E:\ if filePath == "" && runtime.GOOS == "windows" { - return run.mountPath + `\` + return r.mountPath + `\` } - return filepath.Join(run.mountPath, filepath.FromSlash(filePath)) + return filepath.Join(r.mountPath, filepath.FromSlash(filePath)) } type dirMap map[string]struct{} @@ -323,10 +230,10 @@ func (r *Run) readLocal(t *testing.T, dir dirMap, filePath string) { if fi.IsDir() { dir[name+"/"] = struct{}{} r.readLocal(t, dir, name) - assert.Equal(t, run.vfs.Opt.DirPerms&os.ModePerm, fi.Mode().Perm()) + assert.Equal(t, r.vfsOpt.DirPerms&os.ModePerm, fi.Mode().Perm()) } else { dir[fmt.Sprintf("%s %d", name, fi.Size())] = struct{}{} - assert.Equal(t, run.vfs.Opt.FilePerms&os.ModePerm, fi.Mode().Perm()) + assert.Equal(t, r.vfsOpt.FilePerms&os.ModePerm, fi.Mode().Perm()) } } } @@ -374,11 +281,6 @@ func (r *Run) checkDir(t *testing.T, dirString string) { assert.Equal(t, dm, localDm, "expected vs fuse mount") } -// wait for any files being written to be released by fuse -func (r *Run) waitForWriters() { - run.vfs.WaitForWriters(waitForWritersDelay) -} - // writeFile writes data to a file named by filename. // If the file does not exist, WriteFile creates it with permissions perm; // otherwise writeFile truncates it before writing. @@ -415,25 +317,25 @@ func (r *Run) createFile(t *testing.T, filepath string, contents string) { func (r *Run) readFile(t *testing.T, filepath string) string { filepath = r.path(filepath) - result, err := run.os.ReadFile(filepath) + result, err := r.os.ReadFile(filepath) require.NoError(t, err) return string(result) } func (r *Run) mkdir(t *testing.T, filepath string) { filepath = r.path(filepath) - err := run.os.Mkdir(filepath, 0700) + err := r.os.Mkdir(filepath, 0700) require.NoError(t, err) } func (r *Run) rm(t *testing.T, filepath string) { filepath = r.path(filepath) - err := run.os.Remove(filepath) + err := r.os.Remove(filepath) require.NoError(t, err) // Wait for file to disappear from listing for i := 0; i < 100; i++ { - _, err := run.os.Stat(filepath) + _, err := r.os.Stat(filepath) if os.IsNotExist(err) { return } @@ -444,7 +346,7 @@ func (r *Run) rm(t *testing.T, filepath string) { func (r *Run) rmdir(t *testing.T, filepath string) { filepath = r.path(filepath) - err := run.os.Remove(filepath) + err := r.os.Remove(filepath) require.NoError(t, err) } @@ -470,5 +372,5 @@ func TestRoot(t *testing.T) { fi, err := os.Lstat(run.mountPath) require.NoError(t, err) assert.True(t, fi.IsDir()) - assert.Equal(t, run.vfs.Opt.DirPerms&os.ModePerm, fi.Mode().Perm()) + assert.Equal(t, run.vfsOpt.DirPerms&os.ModePerm, fi.Mode().Perm()) } diff --git a/vfs/vfstest/submount.go b/vfs/vfstest/submount.go new file mode 100644 index 000000000..e405dd847 --- /dev/null +++ b/vfs/vfstest/submount.go @@ -0,0 +1,276 @@ +package vfstest + +import ( + "bufio" + "context" + "encoding/json" + "flag" + "fmt" + "io" + "io/ioutil" + "log" + "os" + "os/exec" + "runtime" + "strings" + "time" + + "github.com/rclone/rclone/cmd/mountlib" + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/cache" + "github.com/rclone/rclone/fstest" + "github.com/rclone/rclone/lib/file" + "github.com/rclone/rclone/vfs" + "github.com/rclone/rclone/vfs/vfscommon" +) + +// Functions to run and control the mount subprocess + +var ( + runMount = flag.String("run-mount", "", "If set, run the mount subprocess with the options (internal use only)") +) + +// Options for the mount sub processes passed with the -run-mount flag +type runMountOpt struct { + MountPoint string + MountOpt mountlib.Options + VFSOpt vfscommon.Options + Remote string +} + +// Start the mount subprocess and wait for it to start +func (r *Run) startMountSubProcess() { + // If testing the VFS we don't start a subprocess, we just use + // the VFS directly + if r.useVFS { + vfs := vfs.New(r.fremote, r.vfsOpt) + r.os = vfsOs{vfs} + return + } + r.os = realOs{} + r.mountPath = findMountPath() + log.Printf("startMountSubProcess %q (%q) %q", r.fremote, r.fremoteName, r.mountPath) + + opt := runMountOpt{ + MountPoint: r.mountPath, + MountOpt: mountlib.Opt, + VFSOpt: *r.vfsOpt, + Remote: r.fremoteName, + } + + opts, err := json.Marshal(&opt) + if err != nil { + log.Fatal(err) + } + + // Re-run this executable with a new option -run-mount + args := append(os.Args, "-run-mount", string(opts)) + r.cmd = exec.Command(args[0], args[1:]...) + r.cmd.Stderr = os.Stderr + r.out, err = r.cmd.StdinPipe() + if err != nil { + log.Fatal(err) + } + r.in, err = r.cmd.StdoutPipe() + if err != nil { + log.Fatal(err) + } + err = r.cmd.Start() + if err != nil { + log.Fatal("startMountSubProcess failed", err) + } + r.scanner = bufio.NewScanner(r.in) + + // Wait it for startup + log.Print("Waiting for mount to start") + for r.scanner.Scan() { + rx := strings.TrimSpace(r.scanner.Text()) + if rx == "STARTED" { + break + } + log.Printf("..Mount said: %s", rx) + } + if r.scanner.Err() != nil { + log.Printf("scanner err %v", r.scanner.Err()) + } + + log.Printf("startMountSubProcess: end") +} + +// Find a free path to run the mount on +func findMountPath() string { + if runtime.GOOS != "windows" { + mountPath, err := ioutil.TempDir("", "rclonefs-mount") + if err != nil { + log.Fatalf("Failed to create mount dir: %v", err) + } + return mountPath + } + + // Find a free drive letter + letter := file.FindUnusedDriveLetter() + drive := "" + if letter == 0 { + log.Fatalf("Couldn't find free drive letter for test") + } else { + drive = string(letter) + ":" + } + return drive +} + +// Return true if we are running as a subprocess to run the mount +func isSubProcess() bool { + return *runMount != "" +} + +// Run the mount - this is running in a subprocesses and the config +// is passed JSON encoded as the -run-mount parameter +// +// It reads commands from standard input and writes results to +// standard output. +func startMount(mountFn mountlib.MountFn, useVFS bool, opts string) { + log.Print("startMount") + ctx := context.Background() + + var opt runMountOpt + err := json.Unmarshal([]byte(opts), &opt) + if err != nil { + log.Fatalf("Unmarshal failed: %v", err) + } + + fstest.Initialise() + + f, err := cache.Get(ctx, opt.Remote) + if err != nil { + log.Fatalf("Failed to open remote %q: %v", opt.Remote, err) + } + + err = f.Mkdir(ctx, "") + if err != nil { + log.Fatalf("Failed to mkdir %q: %v", opt.Remote, err) + } + + log.Printf("startMount: Mounting %q on %q with %q", opt.Remote, opt.MountPoint, opt.VFSOpt.CacheMode) + mnt := mountlib.NewMountPoint(mountFn, opt.MountPoint, f, &opt.MountOpt, &opt.VFSOpt) + + _, err = mnt.Mount() + if err != nil { + log.Fatalf("mount FAILED %q: %v", opt.Remote, err) + } + defer umount(mnt) + log.Printf("startMount: mount OK") + fmt.Println("STARTED") // signal to parent all is good + + // Read commands from stdin + scanner := bufio.NewScanner(os.Stdin) + exit := false + for !exit && scanner.Scan() { + rx := strings.Trim(scanner.Text(), "\r\n") + var tx string + tx, exit = doMountCommand(mnt.VFS, rx) + fmt.Println(tx) + } + + err = scanner.Err() + if err != nil { + log.Fatalf("scanner failed %q: %v", opt.Remote, err) + } +} + +// Do a mount command which is a line read from stdin and return a +// line to send to stdout with an exit flag. +// +// The format of the lines is +// command \t parameter (optional) +// The response should be +// OK|ERR \t result (optional) +func doMountCommand(vfs *vfs.VFS, rx string) (tx string, exit bool) { + command := strings.Split(rx, "\t") + // log.Printf("doMountCommand: %q received", command) + var out = []string{"OK", ""} + switch command[0] { + case "waitForWriters": + vfs.WaitForWriters(waitForWritersDelay) + case "forget": + root, err := vfs.Root() + if err != nil { + out = []string{"ERR", err.Error()} + } else { + root.ForgetPath(command[1], fs.EntryDirectory) + } + case "exit": + exit = true + default: + out = []string{"ERR", "command not found"} + } + return strings.Join(out, "\t"), exit +} + +// Send a command to the mount subprocess and await a response +func (r *Run) sendMountCommand(args ...string) { + r.cmdMu.Lock() + defer r.cmdMu.Unlock() + tx := strings.Join(args, "\t") + // log.Printf("Send mount command: %q", tx) + var rx string + if r.useVFS { + // if using VFS do the VFS command directly + rx, _ = doMountCommand(r.os.(vfsOs).VFS, tx) + } else { + _, err := io.WriteString(r.out, tx+"\n") + if err != nil { + log.Fatalf("WriteString err %v", err) + } + if !r.scanner.Scan() { + log.Fatalf("Mount has gone away") + } + rx = strings.Trim(r.scanner.Text(), "\r\n") + } + in := strings.Split(rx, "\t") + // log.Printf("Answer is %q", in) + if in[0] != "OK" { + log.Fatalf("Error from mount: %q", in[1:]) + } +} + +// wait for any files being written to be released by fuse +func (r *Run) waitForWriters() { + r.sendMountCommand("waitForWriters") +} + +// forget the directory passed in +func (r *Run) forget(dir string) { + r.sendMountCommand("forget", dir) +} + +// Unmount the mount +func umount(mnt *mountlib.MountPoint) { + /* + log.Printf("Calling fusermount -u %q", mountPath) + err := exec.Command("fusermount", "-u", mountPath).Run() + if err != nil { + log.Printf("fusermount failed: %v", err) + } + */ + log.Printf("Unmounting %q", mnt.MountPoint) + err := mnt.Unmount() + if err != nil { + log.Printf("signal to umount failed - retrying: %v", err) + time.Sleep(3 * time.Second) + err = mnt.Unmount() + } + if err != nil { + log.Fatalf("signal to umount failed: %v", err) + } + log.Printf("Waiting for umount") + err = <-mnt.ErrChan + if err != nil { + log.Fatalf("umount failed: %v", err) + } + + // Cleanup the VFS cache - umount has called Shutdown + err = mnt.VFS.CleanUp() + if err != nil { + log.Printf("Failed to cleanup the VFS cache: %v", err) + } +} diff --git a/vfs/vfstest/write.go b/vfs/vfstest/write.go index 03ad7bfa1..d4c2e0d92 100644 --- a/vfs/vfstest/write.go +++ b/vfs/vfstest/write.go @@ -91,7 +91,7 @@ func TestWriteFileDup(t *testing.T) { run.skipIfVFS(t) run.skipIfNoFUSE(t) - if run.vfs.Opt.CacheMode < vfscommon.CacheModeWrites { + if run.vfsOpt.CacheMode < vfscommon.CacheModeWrites { t.Skip("not supported on vfs-cache-mode < writes") return } @@ -136,7 +136,7 @@ func TestWriteFileDup(t *testing.T) { func TestWriteFileAppend(t *testing.T) { run.skipIfNoFUSE(t) - if run.vfs.Opt.CacheMode < vfscommon.CacheModeWrites { + if run.vfsOpt.CacheMode < vfscommon.CacheModeWrites { t.Skip("not supported on vfs-cache-mode < writes") return } diff --git a/vfs/vfstest/write_unix.go b/vfs/vfstest/write_unix.go index 0d412be07..4ce40c970 100644 --- a/vfs/vfstest/write_unix.go +++ b/vfs/vfstest/write_unix.go @@ -46,7 +46,7 @@ func TestWriteFileDoubleClose(t *testing.T) { // write to the other dup _, err = unix.Write(fd2, buf) - if run.vfs.Opt.CacheMode < vfscommon.CacheModeWrites { + if run.vfsOpt.CacheMode < vfscommon.CacheModeWrites { // produces an error if cache mode < writes assert.Error(t, err, "input/output error") } else {