diff --git a/fs/rc/jobs/job.go b/fs/rc/jobs/job.go index 5296bfe70..049f6ef6d 100644 --- a/fs/rc/jobs/job.go +++ b/fs/rc/jobs/job.go @@ -184,15 +184,15 @@ func getGroup(in rc.Params) string { return group } -// NewAsyncJob start a new asynchronous Job off -func (jobs *Jobs) NewAsyncJob(fn rc.Func, in rc.Params) *Job { +// NewJob creates a Job ready to be executed +func (jobs *Jobs) NewJob(ctx context.Context, in rc.Params) (*Job, context.Context) { id := atomic.AddInt64(&jobID, 1) group := getGroup(in) if group == "" { group = fmt.Sprintf("job/%d", id) } - ctx := accounting.WithStatsGroup(context.Background(), group) + ctx = accounting.WithStatsGroup(ctx, group) ctx, cancel := context.WithCancel(ctx) stop := func() { cancel() @@ -208,36 +208,16 @@ func (jobs *Jobs) NewAsyncJob(fn rc.Func, in rc.Params) *Job { jobs.mu.Lock() jobs.jobs[job.ID] = job jobs.mu.Unlock() - go job.run(ctx, fn, in) - return job -} - -// NewSyncJob start a new synchronous Job off -func (jobs *Jobs) NewSyncJob(ctx context.Context, in rc.Params) (*Job, context.Context) { - id := atomic.AddInt64(&jobID, 1) - group := getGroup(in) - if group == "" { - group = fmt.Sprintf("job/%d", id) - } - ctxG := accounting.WithStatsGroup(ctx, fmt.Sprintf("job/%d", id)) - ctx, cancel := context.WithCancel(ctxG) - stop := func() { - cancel() - // Wait for cancel to propagate before returning. - <-ctx.Done() - } - job := &Job{ - ID: id, - Group: group, - StartTime: time.Now(), - Stop: stop, - } - jobs.mu.Lock() - jobs.jobs[job.ID] = job - jobs.mu.Unlock() return job, ctx } +// NewAsyncJob start a new asynchronous Job off +func (jobs *Jobs) NewAsyncJob(fn rc.Func, in rc.Params) *Job { + job, ctx := jobs.NewJob(context.Background(), in) + go job.run(ctx, fn, in) + return job +} + // StartAsyncJob starts a new job asynchronously and returns a Param suitable // for output. func StartAsyncJob(fn rc.Func, in rc.Params) (rc.Params, error) { @@ -250,7 +230,7 @@ func StartAsyncJob(fn rc.Func, in rc.Params) (rc.Params, error) { // ExecuteJob executes new job synchronously and returns a Param suitable for // output. func ExecuteJob(ctx context.Context, fn rc.Func, in rc.Params) (rc.Params, int64, error) { - job, ctx := running.NewSyncJob(ctx, in) + job, ctx := running.NewJob(ctx, in) job.run(ctx, fn, in) return job.Output, job.ID, job.realErr }