diff --git a/cmd/rc/rc.go b/cmd/rc/rc.go index 2f95098b4..bea9df92d 100644 --- a/cmd/rc/rc.go +++ b/cmd/rc/rc.go @@ -16,6 +16,7 @@ import ( "github.com/rclone/rclone/fs/config/flags" "github.com/rclone/rclone/fs/fshttp" "github.com/rclone/rclone/fs/rc" + "github.com/rclone/rclone/fs/rc/jobs" "github.com/spf13/cobra" "github.com/spf13/pflag" ) @@ -164,7 +165,7 @@ func doCall(ctx context.Context, path string, in rc.Params) (out rc.Params, err if call == nil { return nil, errors.Errorf("method %q not found", path) } - out, err = call.Fn(context.Background(), in) + _, out, err := jobs.NewJob(ctx, call.Fn, in) if err != nil { return nil, errors.Wrap(err, "loopback call failed") } diff --git a/fs/rc/jobs/job.go b/fs/rc/jobs/job.go index 049f6ef6d..e5fdae076 100644 --- a/fs/rc/jobs/job.go +++ b/fs/rc/jobs/job.go @@ -11,7 +11,6 @@ import ( "time" "github.com/pkg/errors" - "github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs/accounting" "github.com/rclone/rclone/fs/rc" ) @@ -174,32 +173,56 @@ func (jobs *Jobs) Get(ID int64) *Job { return jobs.jobs[ID] } -func getGroup(in rc.Params) string { - // Check to see if the group is set +// Check to see if the group is set +func getGroup(ctx context.Context, in rc.Params, id int64) (context.Context, string, error) { group, err := in.GetString("_group") if rc.NotErrParamNotFound(err) { - fs.Errorf(nil, "Can't get _group param %+v", err) + return ctx, "", err } delete(in, "_group") - return group -} - -// NewJob creates a Job ready to be executed -func (jobs *Jobs) NewJob(ctx context.Context, in rc.Params) (*Job, context.Context) { - id := atomic.AddInt64(&jobID, 1) - - group := getGroup(in) if group == "" { group = fmt.Sprintf("job/%d", id) } ctx = accounting.WithStatsGroup(ctx, group) + return ctx, group, nil +} + +// See if _async is set returning a boolean and a possible new context +func getAsync(ctx context.Context, in rc.Params) (context.Context, bool, error) { + isAsync, err := in.GetBool("_async") + if rc.NotErrParamNotFound(err) { + return ctx, false, err + } + delete(in, "_async") // remove the async parameter after parsing + if isAsync { + // unlink this job from the current context + ctx = context.Background() + } + return ctx, isAsync, nil +} + +// NewJob creates a Job and executes it, possibly in the background if _async is set +func (jobs *Jobs) NewJob(ctx context.Context, fn rc.Func, in rc.Params) (job *Job, out rc.Params, err error) { + id := atomic.AddInt64(&jobID, 1) + in = in.Copy() // copy input so we can change it + + ctx, isAsync, err := getAsync(ctx, in) + if err != nil { + return nil, nil, err + } + + ctx, group, err := getGroup(ctx, in, id) + if err != nil { + return nil, nil, err + } + ctx, cancel := context.WithCancel(ctx) stop := func() { cancel() // Wait for cancel to propagate before returning. <-ctx.Done() } - job := &Job{ + job = &Job{ ID: id, Group: group, StartTime: time.Now(), @@ -208,31 +231,23 @@ func (jobs *Jobs) NewJob(ctx context.Context, in rc.Params) (*Job, context.Conte jobs.mu.Lock() jobs.jobs[job.ID] = job jobs.mu.Unlock() - return job, ctx + if isAsync { + go job.run(ctx, fn, in) + out = make(rc.Params) + out["jobid"] = job.ID + err = nil + } else { + job.run(ctx, fn, in) + out = job.Output + err = job.realErr + } + return job, out, err } -// NewAsyncJob start a new asynchronous Job off -func (jobs *Jobs) NewAsyncJob(fn rc.Func, in rc.Params) *Job { - job, ctx := jobs.NewJob(context.Background(), in) - go job.run(ctx, fn, in) - return job -} - -// StartAsyncJob starts a new job asynchronously and returns a Param suitable -// for output. -func StartAsyncJob(fn rc.Func, in rc.Params) (rc.Params, error) { - job := running.NewAsyncJob(fn, in) - out := make(rc.Params) - out["jobid"] = job.ID - return out, nil -} - -// ExecuteJob executes new job synchronously and returns a Param suitable for -// output. -func ExecuteJob(ctx context.Context, fn rc.Func, in rc.Params) (rc.Params, int64, error) { - job, ctx := running.NewJob(ctx, in) - job.run(ctx, fn, in) - return job.Output, job.ID, job.realErr +// NewJob creates a Job and executes it on the global job queue, +// possibly in the background if _async is set +func NewJob(ctx context.Context, fn rc.Func, in rc.Params) (job *Job, out rc.Params, err error) { + return running.NewJob(ctx, fn, in) } // OnFinish adds listener to jobid that will be triggered when job is finished. diff --git a/fs/rc/jobs/job_test.go b/fs/rc/jobs/job_test.go index 4bcf40c28..6af2455b3 100644 --- a/fs/rc/jobs/job_test.go +++ b/fs/rc/jobs/job_test.go @@ -36,14 +36,17 @@ func TestJobsKickExpire(t *testing.T) { func TestJobsExpire(t *testing.T) { testy.SkipUnreliable(t) + ctx := context.Background() wait := make(chan struct{}) jobs := newJobs() jobs.opt.JobExpireInterval = time.Millisecond assert.Equal(t, false, jobs.expireRunning) - job := jobs.NewAsyncJob(func(ctx context.Context, in rc.Params) (rc.Params, error) { + job, out, err := jobs.NewJob(ctx, func(ctx context.Context, in rc.Params) (rc.Params, error) { defer close(wait) return in, nil - }, rc.Params{}) + }, rc.Params{"_async": true}) + require.NoError(t, err) + assert.Equal(t, 1, len(out)) <-wait assert.Equal(t, 1, len(jobs.jobs)) jobs.Expire() @@ -66,9 +69,12 @@ var noopFn = func(ctx context.Context, in rc.Params) (rc.Params, error) { } func TestJobsIDs(t *testing.T) { + ctx := context.Background() jobs := newJobs() - job1 := jobs.NewAsyncJob(noopFn, rc.Params{}) - job2 := jobs.NewAsyncJob(noopFn, rc.Params{}) + job1, _, err := jobs.NewJob(ctx, noopFn, rc.Params{"_async": true}) + require.NoError(t, err) + job2, _, err := jobs.NewJob(ctx, noopFn, rc.Params{"_async": true}) + require.NoError(t, err) wantIDs := []int64{job1.ID, job2.ID} gotIDs := jobs.IDs() require.Equal(t, 2, len(gotIDs)) @@ -79,8 +85,10 @@ func TestJobsIDs(t *testing.T) { } func TestJobsGet(t *testing.T) { + ctx := context.Background() jobs := newJobs() - job := jobs.NewAsyncJob(noopFn, rc.Params{}) + job, _, err := jobs.NewJob(ctx, noopFn, rc.Params{"_async": true}) + require.NoError(t, err) assert.Equal(t, job, jobs.Get(job.ID)) assert.Nil(t, jobs.Get(123123123123)) } @@ -125,8 +133,10 @@ func sleepJob() { } func TestJobFinish(t *testing.T) { + ctx := context.Background() jobs := newJobs() - job := jobs.NewAsyncJob(longFn, rc.Params{}) + job, _, err := jobs.NewJob(ctx, longFn, rc.Params{"_async": true}) + require.NoError(t, err) sleepJob() assert.Equal(t, true, job.EndTime.IsZero()) @@ -146,7 +156,8 @@ func TestJobFinish(t *testing.T) { assert.Equal(t, true, job.Success) assert.Equal(t, true, job.Finished) - job = jobs.NewAsyncJob(longFn, rc.Params{}) + job, _, err = jobs.NewJob(ctx, longFn, rc.Params{"_async": true}) + require.NoError(t, err) sleepJob() job.finish(nil, nil) @@ -157,7 +168,8 @@ func TestJobFinish(t *testing.T) { assert.Equal(t, true, job.Success) assert.Equal(t, true, job.Finished) - job = jobs.NewAsyncJob(longFn, rc.Params{}) + job, _, err = jobs.NewJob(ctx, longFn, rc.Params{"_async": true}) + require.NoError(t, err) sleepJob() job.finish(wantOut, errors.New("potato")) @@ -172,6 +184,7 @@ func TestJobFinish(t *testing.T) { // We've tested the functionality of run() already as it is // part of NewJob, now just test the panic catching func TestJobRunPanic(t *testing.T) { + ctx := context.Background() wait := make(chan struct{}) boom := func(ctx context.Context, in rc.Params) (rc.Params, error) { sleepJob() @@ -180,7 +193,8 @@ func TestJobRunPanic(t *testing.T) { } jobs := newJobs() - job := jobs.NewAsyncJob(boom, rc.Params{}) + job, _, err := jobs.NewJob(ctx, boom, rc.Params{"_async": true}) + require.NoError(t, err) <-wait runtime.Gosched() // yield to make sure job is updated @@ -206,42 +220,50 @@ func TestJobRunPanic(t *testing.T) { } func TestJobsNewJob(t *testing.T) { + ctx := context.Background() jobID = 0 jobs := newJobs() - job := jobs.NewAsyncJob(noopFn, rc.Params{}) + job, out, err := jobs.NewJob(ctx, noopFn, rc.Params{"_async": true}) + require.NoError(t, err) assert.Equal(t, int64(1), job.ID) + assert.Equal(t, rc.Params{"jobid": int64(1)}, out) assert.Equal(t, job, jobs.Get(1)) assert.NotEmpty(t, job.Stop) } func TestStartJob(t *testing.T) { + ctx := context.Background() jobID = 0 - out, err := StartAsyncJob(longFn, rc.Params{}) + job, out, err := NewJob(ctx, longFn, rc.Params{"_async": true}) assert.NoError(t, err) assert.Equal(t, rc.Params{"jobid": int64(1)}, out) + assert.Equal(t, int64(1), job.ID) } func TestExecuteJob(t *testing.T) { jobID = 0 - _, id, err := ExecuteJob(context.Background(), shortFn, rc.Params{}) + job, out, err := NewJob(context.Background(), shortFn, rc.Params{}) assert.NoError(t, err) - assert.Equal(t, int64(1), id) + assert.Equal(t, int64(1), job.ID) + assert.Equal(t, rc.Params{}, out) } func TestExecuteJobErrorPropagation(t *testing.T) { + ctx := context.Background() jobID = 0 testErr := errors.New("test error") errorFn := func(ctx context.Context, in rc.Params) (out rc.Params, err error) { return nil, testErr } - _, _, err := ExecuteJob(context.Background(), errorFn, rc.Params{}) + _, _, err := NewJob(ctx, errorFn, rc.Params{}) assert.Equal(t, testErr, err) } func TestRcJobStatus(t *testing.T) { + ctx := context.Background() jobID = 0 - _, err := StartAsyncJob(longFn, rc.Params{}) + _, _, err := NewJob(ctx, longFn, rc.Params{"_async": true}) assert.NoError(t, err) call := rc.Calls.Get("job/status") @@ -267,8 +289,9 @@ func TestRcJobStatus(t *testing.T) { } func TestRcJobList(t *testing.T) { + ctx := context.Background() jobID = 0 - _, err := StartAsyncJob(longFn, rc.Params{}) + _, _, err := NewJob(ctx, longFn, rc.Params{"_async": true}) assert.NoError(t, err) call := rc.Calls.Get("job/list") @@ -281,8 +304,9 @@ func TestRcJobList(t *testing.T) { } func TestRcAsyncJobStop(t *testing.T) { + ctx := context.Background() jobID = 0 - _, err := StartAsyncJob(ctxFn, rc.Params{}) + _, _, err := NewJob(ctx, ctxFn, rc.Params{"_async": true}) assert.NoError(t, err) call := rc.Calls.Get("job/stop") @@ -320,9 +344,10 @@ func TestRcSyncJobStop(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) go func() { jobID = 0 - _, id, err := ExecuteJob(ctx, ctxFn, rc.Params{}) + job, out, err := NewJob(ctx, ctxFn, rc.Params{}) assert.Error(t, err) - assert.Equal(t, int64(1), id) + assert.Equal(t, int64(1), job.ID) + assert.Equal(t, rc.Params{}, out) }() time.Sleep(10 * time.Millisecond) @@ -363,10 +388,10 @@ func TestOnFinish(t *testing.T) { jobID = 0 done := make(chan struct{}) ctx, cancel := context.WithCancel(context.Background()) - _, err := StartAsyncJob(ctxParmFn(ctx, false), rc.Params{}) + job, _, err := NewJob(ctx, ctxParmFn(ctx, false), rc.Params{"_async": true}) assert.NoError(t, err) - stop, err := OnFinish(jobID, func() { close(done) }) + stop, err := OnFinish(job.ID, func() { close(done) }) defer stop() assert.NoError(t, err) @@ -384,10 +409,10 @@ func TestOnFinishAlreadyFinished(t *testing.T) { done := make(chan struct{}) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - _, id, err := ExecuteJob(ctx, shortFn, rc.Params{}) + job, _, err := NewJob(ctx, shortFn, rc.Params{}) assert.NoError(t, err) - stop, err := OnFinish(id, func() { close(done) }) + stop, err := OnFinish(job.ID, func() { close(done) }) defer stop() assert.NoError(t, err) diff --git a/fs/rc/rcserver/rcserver.go b/fs/rc/rcserver/rcserver.go index 3c6945ebc..f72935846 100644 --- a/fs/rc/rcserver/rcserver.go +++ b/fs/rc/rcserver/rcserver.go @@ -229,6 +229,7 @@ func (s *Server) handler(w http.ResponseWriter, r *http.Request) { } func (s *Server) handlePost(w http.ResponseWriter, r *http.Request, path string) { + ctx := r.Context() contentType := r.Header.Get("Content-Type") values := r.URL.Query() @@ -282,22 +283,10 @@ func (s *Server) handlePost(w http.ResponseWriter, r *http.Request, path string) in["_response"] = w } - // Check to see if it is async or not - isAsync, err := in.GetBool("_async") - if rc.NotErrParamNotFound(err) { - writeError(path, inOrig, w, err, http.StatusBadRequest) - return - } - delete(in, "_async") // remove the async parameter after parsing so vfs operations don't get confused - fs.Debugf(nil, "rc: %q: with parameters %+v", path, in) - var out rc.Params - if isAsync { - out, err = jobs.StartAsyncJob(call.Fn, in) - } else { - var jobID int64 - out, jobID, err = jobs.ExecuteJob(r.Context(), call.Fn, in) - w.Header().Add("x-rclone-jobid", fmt.Sprintf("%d", jobID)) + job, out, err := jobs.NewJob(ctx, call.Fn, in) + if job != nil { + w.Header().Add("x-rclone-jobid", fmt.Sprintf("%d", job.ID)) } if err != nil { writeError(path, inOrig, w, err, http.StatusInternalServerError)