rc: factor async/sync job handing into rc/jobs from rc/rcserver

This fixes async jobs with `rclone rc --loopback` which isn't very
important but sets the stage for _config setting.
This commit is contained in:
Nick Craig-Wood 2020-12-12 15:35:30 +00:00
parent a0fc10e41a
commit 8574a7bd67
4 changed files with 105 additions and 75 deletions

View File

@ -16,6 +16,7 @@ import (
"github.com/rclone/rclone/fs/config/flags" "github.com/rclone/rclone/fs/config/flags"
"github.com/rclone/rclone/fs/fshttp" "github.com/rclone/rclone/fs/fshttp"
"github.com/rclone/rclone/fs/rc" "github.com/rclone/rclone/fs/rc"
"github.com/rclone/rclone/fs/rc/jobs"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/pflag" "github.com/spf13/pflag"
) )
@ -164,7 +165,7 @@ func doCall(ctx context.Context, path string, in rc.Params) (out rc.Params, err
if call == nil { if call == nil {
return nil, errors.Errorf("method %q not found", path) return nil, errors.Errorf("method %q not found", path)
} }
out, err = call.Fn(context.Background(), in) _, out, err := jobs.NewJob(ctx, call.Fn, in)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "loopback call failed") return nil, errors.Wrap(err, "loopback call failed")
} }

View File

@ -11,7 +11,6 @@ import (
"time" "time"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/accounting" "github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/fs/rc" "github.com/rclone/rclone/fs/rc"
) )
@ -174,32 +173,56 @@ func (jobs *Jobs) Get(ID int64) *Job {
return jobs.jobs[ID] return jobs.jobs[ID]
} }
func getGroup(in rc.Params) string { // Check to see if the group is set
// Check to see if the group is set func getGroup(ctx context.Context, in rc.Params, id int64) (context.Context, string, error) {
group, err := in.GetString("_group") group, err := in.GetString("_group")
if rc.NotErrParamNotFound(err) { if rc.NotErrParamNotFound(err) {
fs.Errorf(nil, "Can't get _group param %+v", err) return ctx, "", err
} }
delete(in, "_group") delete(in, "_group")
return group
}
// NewJob creates a Job ready to be executed
func (jobs *Jobs) NewJob(ctx context.Context, in rc.Params) (*Job, context.Context) {
id := atomic.AddInt64(&jobID, 1)
group := getGroup(in)
if group == "" { if group == "" {
group = fmt.Sprintf("job/%d", id) group = fmt.Sprintf("job/%d", id)
} }
ctx = accounting.WithStatsGroup(ctx, group) ctx = accounting.WithStatsGroup(ctx, group)
return ctx, group, nil
}
// See if _async is set returning a boolean and a possible new context
func getAsync(ctx context.Context, in rc.Params) (context.Context, bool, error) {
isAsync, err := in.GetBool("_async")
if rc.NotErrParamNotFound(err) {
return ctx, false, err
}
delete(in, "_async") // remove the async parameter after parsing
if isAsync {
// unlink this job from the current context
ctx = context.Background()
}
return ctx, isAsync, nil
}
// NewJob creates a Job and executes it, possibly in the background if _async is set
func (jobs *Jobs) NewJob(ctx context.Context, fn rc.Func, in rc.Params) (job *Job, out rc.Params, err error) {
id := atomic.AddInt64(&jobID, 1)
in = in.Copy() // copy input so we can change it
ctx, isAsync, err := getAsync(ctx, in)
if err != nil {
return nil, nil, err
}
ctx, group, err := getGroup(ctx, in, id)
if err != nil {
return nil, nil, err
}
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
stop := func() { stop := func() {
cancel() cancel()
// Wait for cancel to propagate before returning. // Wait for cancel to propagate before returning.
<-ctx.Done() <-ctx.Done()
} }
job := &Job{ job = &Job{
ID: id, ID: id,
Group: group, Group: group,
StartTime: time.Now(), StartTime: time.Now(),
@ -208,31 +231,23 @@ func (jobs *Jobs) NewJob(ctx context.Context, in rc.Params) (*Job, context.Conte
jobs.mu.Lock() jobs.mu.Lock()
jobs.jobs[job.ID] = job jobs.jobs[job.ID] = job
jobs.mu.Unlock() jobs.mu.Unlock()
return job, ctx if isAsync {
go job.run(ctx, fn, in)
out = make(rc.Params)
out["jobid"] = job.ID
err = nil
} else {
job.run(ctx, fn, in)
out = job.Output
err = job.realErr
}
return job, out, err
} }
// NewAsyncJob start a new asynchronous Job off // NewJob creates a Job and executes it on the global job queue,
func (jobs *Jobs) NewAsyncJob(fn rc.Func, in rc.Params) *Job { // possibly in the background if _async is set
job, ctx := jobs.NewJob(context.Background(), in) func NewJob(ctx context.Context, fn rc.Func, in rc.Params) (job *Job, out rc.Params, err error) {
go job.run(ctx, fn, in) return running.NewJob(ctx, fn, in)
return job
}
// StartAsyncJob starts a new job asynchronously and returns a Param suitable
// for output.
func StartAsyncJob(fn rc.Func, in rc.Params) (rc.Params, error) {
job := running.NewAsyncJob(fn, in)
out := make(rc.Params)
out["jobid"] = job.ID
return out, nil
}
// ExecuteJob executes new job synchronously and returns a Param suitable for
// output.
func ExecuteJob(ctx context.Context, fn rc.Func, in rc.Params) (rc.Params, int64, error) {
job, ctx := running.NewJob(ctx, in)
job.run(ctx, fn, in)
return job.Output, job.ID, job.realErr
} }
// OnFinish adds listener to jobid that will be triggered when job is finished. // OnFinish adds listener to jobid that will be triggered when job is finished.

View File

@ -36,14 +36,17 @@ func TestJobsKickExpire(t *testing.T) {
func TestJobsExpire(t *testing.T) { func TestJobsExpire(t *testing.T) {
testy.SkipUnreliable(t) testy.SkipUnreliable(t)
ctx := context.Background()
wait := make(chan struct{}) wait := make(chan struct{})
jobs := newJobs() jobs := newJobs()
jobs.opt.JobExpireInterval = time.Millisecond jobs.opt.JobExpireInterval = time.Millisecond
assert.Equal(t, false, jobs.expireRunning) assert.Equal(t, false, jobs.expireRunning)
job := jobs.NewAsyncJob(func(ctx context.Context, in rc.Params) (rc.Params, error) { job, out, err := jobs.NewJob(ctx, func(ctx context.Context, in rc.Params) (rc.Params, error) {
defer close(wait) defer close(wait)
return in, nil return in, nil
}, rc.Params{}) }, rc.Params{"_async": true})
require.NoError(t, err)
assert.Equal(t, 1, len(out))
<-wait <-wait
assert.Equal(t, 1, len(jobs.jobs)) assert.Equal(t, 1, len(jobs.jobs))
jobs.Expire() jobs.Expire()
@ -66,9 +69,12 @@ var noopFn = func(ctx context.Context, in rc.Params) (rc.Params, error) {
} }
func TestJobsIDs(t *testing.T) { func TestJobsIDs(t *testing.T) {
ctx := context.Background()
jobs := newJobs() jobs := newJobs()
job1 := jobs.NewAsyncJob(noopFn, rc.Params{}) job1, _, err := jobs.NewJob(ctx, noopFn, rc.Params{"_async": true})
job2 := jobs.NewAsyncJob(noopFn, rc.Params{}) require.NoError(t, err)
job2, _, err := jobs.NewJob(ctx, noopFn, rc.Params{"_async": true})
require.NoError(t, err)
wantIDs := []int64{job1.ID, job2.ID} wantIDs := []int64{job1.ID, job2.ID}
gotIDs := jobs.IDs() gotIDs := jobs.IDs()
require.Equal(t, 2, len(gotIDs)) require.Equal(t, 2, len(gotIDs))
@ -79,8 +85,10 @@ func TestJobsIDs(t *testing.T) {
} }
func TestJobsGet(t *testing.T) { func TestJobsGet(t *testing.T) {
ctx := context.Background()
jobs := newJobs() jobs := newJobs()
job := jobs.NewAsyncJob(noopFn, rc.Params{}) job, _, err := jobs.NewJob(ctx, noopFn, rc.Params{"_async": true})
require.NoError(t, err)
assert.Equal(t, job, jobs.Get(job.ID)) assert.Equal(t, job, jobs.Get(job.ID))
assert.Nil(t, jobs.Get(123123123123)) assert.Nil(t, jobs.Get(123123123123))
} }
@ -125,8 +133,10 @@ func sleepJob() {
} }
func TestJobFinish(t *testing.T) { func TestJobFinish(t *testing.T) {
ctx := context.Background()
jobs := newJobs() jobs := newJobs()
job := jobs.NewAsyncJob(longFn, rc.Params{}) job, _, err := jobs.NewJob(ctx, longFn, rc.Params{"_async": true})
require.NoError(t, err)
sleepJob() sleepJob()
assert.Equal(t, true, job.EndTime.IsZero()) assert.Equal(t, true, job.EndTime.IsZero())
@ -146,7 +156,8 @@ func TestJobFinish(t *testing.T) {
assert.Equal(t, true, job.Success) assert.Equal(t, true, job.Success)
assert.Equal(t, true, job.Finished) assert.Equal(t, true, job.Finished)
job = jobs.NewAsyncJob(longFn, rc.Params{}) job, _, err = jobs.NewJob(ctx, longFn, rc.Params{"_async": true})
require.NoError(t, err)
sleepJob() sleepJob()
job.finish(nil, nil) job.finish(nil, nil)
@ -157,7 +168,8 @@ func TestJobFinish(t *testing.T) {
assert.Equal(t, true, job.Success) assert.Equal(t, true, job.Success)
assert.Equal(t, true, job.Finished) assert.Equal(t, true, job.Finished)
job = jobs.NewAsyncJob(longFn, rc.Params{}) job, _, err = jobs.NewJob(ctx, longFn, rc.Params{"_async": true})
require.NoError(t, err)
sleepJob() sleepJob()
job.finish(wantOut, errors.New("potato")) job.finish(wantOut, errors.New("potato"))
@ -172,6 +184,7 @@ func TestJobFinish(t *testing.T) {
// We've tested the functionality of run() already as it is // We've tested the functionality of run() already as it is
// part of NewJob, now just test the panic catching // part of NewJob, now just test the panic catching
func TestJobRunPanic(t *testing.T) { func TestJobRunPanic(t *testing.T) {
ctx := context.Background()
wait := make(chan struct{}) wait := make(chan struct{})
boom := func(ctx context.Context, in rc.Params) (rc.Params, error) { boom := func(ctx context.Context, in rc.Params) (rc.Params, error) {
sleepJob() sleepJob()
@ -180,7 +193,8 @@ func TestJobRunPanic(t *testing.T) {
} }
jobs := newJobs() jobs := newJobs()
job := jobs.NewAsyncJob(boom, rc.Params{}) job, _, err := jobs.NewJob(ctx, boom, rc.Params{"_async": true})
require.NoError(t, err)
<-wait <-wait
runtime.Gosched() // yield to make sure job is updated runtime.Gosched() // yield to make sure job is updated
@ -206,42 +220,50 @@ func TestJobRunPanic(t *testing.T) {
} }
func TestJobsNewJob(t *testing.T) { func TestJobsNewJob(t *testing.T) {
ctx := context.Background()
jobID = 0 jobID = 0
jobs := newJobs() jobs := newJobs()
job := jobs.NewAsyncJob(noopFn, rc.Params{}) job, out, err := jobs.NewJob(ctx, noopFn, rc.Params{"_async": true})
require.NoError(t, err)
assert.Equal(t, int64(1), job.ID) assert.Equal(t, int64(1), job.ID)
assert.Equal(t, rc.Params{"jobid": int64(1)}, out)
assert.Equal(t, job, jobs.Get(1)) assert.Equal(t, job, jobs.Get(1))
assert.NotEmpty(t, job.Stop) assert.NotEmpty(t, job.Stop)
} }
func TestStartJob(t *testing.T) { func TestStartJob(t *testing.T) {
ctx := context.Background()
jobID = 0 jobID = 0
out, err := StartAsyncJob(longFn, rc.Params{}) job, out, err := NewJob(ctx, longFn, rc.Params{"_async": true})
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, rc.Params{"jobid": int64(1)}, out) assert.Equal(t, rc.Params{"jobid": int64(1)}, out)
assert.Equal(t, int64(1), job.ID)
} }
func TestExecuteJob(t *testing.T) { func TestExecuteJob(t *testing.T) {
jobID = 0 jobID = 0
_, id, err := ExecuteJob(context.Background(), shortFn, rc.Params{}) job, out, err := NewJob(context.Background(), shortFn, rc.Params{})
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, int64(1), id) assert.Equal(t, int64(1), job.ID)
assert.Equal(t, rc.Params{}, out)
} }
func TestExecuteJobErrorPropagation(t *testing.T) { func TestExecuteJobErrorPropagation(t *testing.T) {
ctx := context.Background()
jobID = 0 jobID = 0
testErr := errors.New("test error") testErr := errors.New("test error")
errorFn := func(ctx context.Context, in rc.Params) (out rc.Params, err error) { errorFn := func(ctx context.Context, in rc.Params) (out rc.Params, err error) {
return nil, testErr return nil, testErr
} }
_, _, err := ExecuteJob(context.Background(), errorFn, rc.Params{}) _, _, err := NewJob(ctx, errorFn, rc.Params{})
assert.Equal(t, testErr, err) assert.Equal(t, testErr, err)
} }
func TestRcJobStatus(t *testing.T) { func TestRcJobStatus(t *testing.T) {
ctx := context.Background()
jobID = 0 jobID = 0
_, err := StartAsyncJob(longFn, rc.Params{}) _, _, err := NewJob(ctx, longFn, rc.Params{"_async": true})
assert.NoError(t, err) assert.NoError(t, err)
call := rc.Calls.Get("job/status") call := rc.Calls.Get("job/status")
@ -267,8 +289,9 @@ func TestRcJobStatus(t *testing.T) {
} }
func TestRcJobList(t *testing.T) { func TestRcJobList(t *testing.T) {
ctx := context.Background()
jobID = 0 jobID = 0
_, err := StartAsyncJob(longFn, rc.Params{}) _, _, err := NewJob(ctx, longFn, rc.Params{"_async": true})
assert.NoError(t, err) assert.NoError(t, err)
call := rc.Calls.Get("job/list") call := rc.Calls.Get("job/list")
@ -281,8 +304,9 @@ func TestRcJobList(t *testing.T) {
} }
func TestRcAsyncJobStop(t *testing.T) { func TestRcAsyncJobStop(t *testing.T) {
ctx := context.Background()
jobID = 0 jobID = 0
_, err := StartAsyncJob(ctxFn, rc.Params{}) _, _, err := NewJob(ctx, ctxFn, rc.Params{"_async": true})
assert.NoError(t, err) assert.NoError(t, err)
call := rc.Calls.Get("job/stop") call := rc.Calls.Get("job/stop")
@ -320,9 +344,10 @@ func TestRcSyncJobStop(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
go func() { go func() {
jobID = 0 jobID = 0
_, id, err := ExecuteJob(ctx, ctxFn, rc.Params{}) job, out, err := NewJob(ctx, ctxFn, rc.Params{})
assert.Error(t, err) assert.Error(t, err)
assert.Equal(t, int64(1), id) assert.Equal(t, int64(1), job.ID)
assert.Equal(t, rc.Params{}, out)
}() }()
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
@ -363,10 +388,10 @@ func TestOnFinish(t *testing.T) {
jobID = 0 jobID = 0
done := make(chan struct{}) done := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
_, err := StartAsyncJob(ctxParmFn(ctx, false), rc.Params{}) job, _, err := NewJob(ctx, ctxParmFn(ctx, false), rc.Params{"_async": true})
assert.NoError(t, err) assert.NoError(t, err)
stop, err := OnFinish(jobID, func() { close(done) }) stop, err := OnFinish(job.ID, func() { close(done) })
defer stop() defer stop()
assert.NoError(t, err) assert.NoError(t, err)
@ -384,10 +409,10 @@ func TestOnFinishAlreadyFinished(t *testing.T) {
done := make(chan struct{}) done := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
_, id, err := ExecuteJob(ctx, shortFn, rc.Params{}) job, _, err := NewJob(ctx, shortFn, rc.Params{})
assert.NoError(t, err) assert.NoError(t, err)
stop, err := OnFinish(id, func() { close(done) }) stop, err := OnFinish(job.ID, func() { close(done) })
defer stop() defer stop()
assert.NoError(t, err) assert.NoError(t, err)

View File

@ -229,6 +229,7 @@ func (s *Server) handler(w http.ResponseWriter, r *http.Request) {
} }
func (s *Server) handlePost(w http.ResponseWriter, r *http.Request, path string) { func (s *Server) handlePost(w http.ResponseWriter, r *http.Request, path string) {
ctx := r.Context()
contentType := r.Header.Get("Content-Type") contentType := r.Header.Get("Content-Type")
values := r.URL.Query() values := r.URL.Query()
@ -282,22 +283,10 @@ func (s *Server) handlePost(w http.ResponseWriter, r *http.Request, path string)
in["_response"] = w in["_response"] = w
} }
// Check to see if it is async or not
isAsync, err := in.GetBool("_async")
if rc.NotErrParamNotFound(err) {
writeError(path, inOrig, w, err, http.StatusBadRequest)
return
}
delete(in, "_async") // remove the async parameter after parsing so vfs operations don't get confused
fs.Debugf(nil, "rc: %q: with parameters %+v", path, in) fs.Debugf(nil, "rc: %q: with parameters %+v", path, in)
var out rc.Params job, out, err := jobs.NewJob(ctx, call.Fn, in)
if isAsync { if job != nil {
out, err = jobs.StartAsyncJob(call.Fn, in) w.Header().Add("x-rclone-jobid", fmt.Sprintf("%d", job.ID))
} else {
var jobID int64
out, jobID, err = jobs.ExecuteJob(r.Context(), call.Fn, in)
w.Header().Add("x-rclone-jobid", fmt.Sprintf("%d", jobID))
} }
if err != nil { if err != nil {
writeError(path, inOrig, w, err, http.StatusInternalServerError) writeError(path, inOrig, w, err, http.StatusInternalServerError)