rclone/fs/rc/jobs/job.go

409 lines
9.4 KiB
Go

// Manage background jobs that the rc is running
package jobs
import (
"context"
"fmt"
"runtime/debug"
"sync"
"sync/atomic"
"time"
"github.com/pkg/errors"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/fs/filter"
"github.com/rclone/rclone/fs/rc"
)
// Job describes an asynchronous task started via the rc package
type Job struct {
mu sync.Mutex
ID int64 `json:"id"`
Group string `json:"group"`
StartTime time.Time `json:"startTime"`
EndTime time.Time `json:"endTime"`
Error string `json:"error"`
Finished bool `json:"finished"`
Success bool `json:"success"`
Duration float64 `json:"duration"`
Output rc.Params `json:"output"`
Stop func() `json:"-"`
listeners []*func()
// realErr is the Error before printing it as a string, it's used to return
// the real error to the upper application layers while still printing the
// string error message.
realErr error
}
// mark the job as finished
func (job *Job) finish(out rc.Params, err error) {
job.mu.Lock()
job.EndTime = time.Now()
if out == nil {
out = make(rc.Params)
}
job.Output = out
job.Duration = job.EndTime.Sub(job.StartTime).Seconds()
if err != nil {
job.realErr = err
job.Error = err.Error()
job.Success = false
} else {
job.realErr = nil
job.Error = ""
job.Success = true
}
job.Finished = true
// Notify listeners that the job is finished
for i := range job.listeners {
go (*job.listeners[i])()
}
job.mu.Unlock()
running.kickExpire() // make sure this job gets expired
}
func (job *Job) addListener(fn *func()) {
job.mu.Lock()
defer job.mu.Unlock()
job.listeners = append(job.listeners, fn)
}
func (job *Job) removeListener(fn *func()) {
job.mu.Lock()
defer job.mu.Unlock()
for i, ln := range job.listeners {
if ln == fn {
job.listeners = append(job.listeners[:i], job.listeners[i+1:]...)
return
}
}
}
// run the job until completion writing the return status
func (job *Job) run(ctx context.Context, fn rc.Func, in rc.Params) {
defer func() {
if r := recover(); r != nil {
job.finish(nil, errors.Errorf("panic received: %v \n%s", r, string(debug.Stack())))
}
}()
job.finish(fn(ctx, in))
}
// Jobs describes a collection of running tasks
type Jobs struct {
mu sync.RWMutex
jobs map[int64]*Job
opt *rc.Options
expireRunning bool
}
var (
running = newJobs()
jobID = int64(0)
)
// newJobs makes a new Jobs structure
func newJobs() *Jobs {
return &Jobs{
jobs: map[int64]*Job{},
opt: &rc.DefaultOpt,
}
}
// SetOpt sets the options when they are known
func SetOpt(opt *rc.Options) {
running.opt = opt
}
// SetInitialJobID allows for setting jobID before starting any jobs.
func SetInitialJobID(id int64) {
if !atomic.CompareAndSwapInt64(&jobID, 0, id) {
panic("Setting jobID is only possible before starting any jobs")
}
}
// kickExpire makes sure Expire is running
func (jobs *Jobs) kickExpire() {
jobs.mu.Lock()
defer jobs.mu.Unlock()
if !jobs.expireRunning {
time.AfterFunc(jobs.opt.JobExpireInterval, jobs.Expire)
jobs.expireRunning = true
}
}
// Expire expires any jobs that haven't been collected
func (jobs *Jobs) Expire() {
jobs.mu.Lock()
defer jobs.mu.Unlock()
now := time.Now()
for ID, job := range jobs.jobs {
job.mu.Lock()
if job.Finished && now.Sub(job.EndTime) > jobs.opt.JobExpireDuration {
delete(jobs.jobs, ID)
}
job.mu.Unlock()
}
if len(jobs.jobs) != 0 {
time.AfterFunc(jobs.opt.JobExpireInterval, jobs.Expire)
jobs.expireRunning = true
} else {
jobs.expireRunning = false
}
}
// IDs returns the IDs of the running jobs
func (jobs *Jobs) IDs() (IDs []int64) {
jobs.mu.RLock()
defer jobs.mu.RUnlock()
IDs = []int64{}
for ID := range jobs.jobs {
IDs = append(IDs, ID)
}
return IDs
}
// Get a job with a given ID or nil if it doesn't exist
func (jobs *Jobs) Get(ID int64) *Job {
jobs.mu.RLock()
defer jobs.mu.RUnlock()
return jobs.jobs[ID]
}
// Check to see if the group is set
func getGroup(ctx context.Context, in rc.Params, id int64) (context.Context, string, error) {
group, err := in.GetString("_group")
if rc.NotErrParamNotFound(err) {
return ctx, "", err
}
delete(in, "_group")
if group == "" {
group = fmt.Sprintf("job/%d", id)
}
ctx = accounting.WithStatsGroup(ctx, group)
return ctx, group, nil
}
// See if _async is set returning a boolean and a possible new context
func getAsync(ctx context.Context, in rc.Params) (context.Context, bool, error) {
isAsync, err := in.GetBool("_async")
if rc.NotErrParamNotFound(err) {
return ctx, false, err
}
delete(in, "_async") // remove the async parameter after parsing
if isAsync {
// unlink this job from the current context
ctx = context.Background()
}
return ctx, isAsync, nil
}
// See if _config is set and if so adjust ctx to include it
func getConfig(ctx context.Context, in rc.Params) (context.Context, error) {
if _, ok := in["_config"]; !ok {
return ctx, nil
}
ctx, ci := fs.AddConfig(ctx)
err := in.GetStruct("_config", ci)
if err != nil {
return ctx, err
}
delete(in, "_config") // remove the parameter
return ctx, nil
}
// See if _filter is set and if so adjust ctx to include it
func getFilter(ctx context.Context, in rc.Params) (context.Context, error) {
if _, ok := in["_filter"]; !ok {
return ctx, nil
}
// Copy of the current filter options
opt := filter.GetConfig(ctx).Opt
// Update the options from the parameter
err := in.GetStruct("_filter", &opt)
if err != nil {
return ctx, err
}
fi, err := filter.NewFilter(&opt)
if err != nil {
return ctx, err
}
ctx = filter.ReplaceConfig(ctx, fi)
delete(in, "_filter") // remove the parameter
return ctx, nil
}
// NewJob creates a Job and executes it, possibly in the background if _async is set
func (jobs *Jobs) NewJob(ctx context.Context, fn rc.Func, in rc.Params) (job *Job, out rc.Params, err error) {
id := atomic.AddInt64(&jobID, 1)
in = in.Copy() // copy input so we can change it
ctx, isAsync, err := getAsync(ctx, in)
if err != nil {
return nil, nil, err
}
ctx, err = getConfig(ctx, in)
if err != nil {
return nil, nil, err
}
ctx, err = getFilter(ctx, in)
if err != nil {
return nil, nil, err
}
ctx, group, err := getGroup(ctx, in, id)
if err != nil {
return nil, nil, err
}
ctx, cancel := context.WithCancel(ctx)
stop := func() {
cancel()
// Wait for cancel to propagate before returning.
<-ctx.Done()
}
job = &Job{
ID: id,
Group: group,
StartTime: time.Now(),
Stop: stop,
}
jobs.mu.Lock()
jobs.jobs[job.ID] = job
jobs.mu.Unlock()
if isAsync {
go job.run(ctx, fn, in)
out = make(rc.Params)
out["jobid"] = job.ID
err = nil
} else {
job.run(ctx, fn, in)
out = job.Output
err = job.realErr
}
return job, out, err
}
// NewJob creates a Job and executes it on the global job queue,
// possibly in the background if _async is set
func NewJob(ctx context.Context, fn rc.Func, in rc.Params) (job *Job, out rc.Params, err error) {
return running.NewJob(ctx, fn, in)
}
// OnFinish adds listener to jobid that will be triggered when job is finished.
// It returns a function to cancel listening.
func OnFinish(jobID int64, fn func()) (func(), error) {
job := running.Get(jobID)
if job == nil {
return func() {}, errors.New("job not found")
}
if job.Finished {
fn()
} else {
job.addListener(&fn)
}
return func() { job.removeListener(&fn) }, nil
}
func init() {
rc.Add(rc.Call{
Path: "job/status",
Fn: rcJobStatus,
Title: "Reads the status of the job ID",
Help: `Parameters
- jobid - id of the job (integer)
Results
- finished - boolean
- duration - time in seconds that the job ran for
- endTime - time the job finished (e.g. "2018-10-26T18:50:20.528746884+01:00")
- error - error from the job or empty string for no error
- finished - boolean whether the job has finished or not
- id - as passed in above
- startTime - time the job started (e.g. "2018-10-26T18:50:20.528336039+01:00")
- success - boolean - true for success false otherwise
- output - output of the job as would have been returned if called synchronously
- progress - output of the progress related to the underlying job
`,
})
}
// Returns the status of a job
func rcJobStatus(ctx context.Context, in rc.Params) (out rc.Params, err error) {
jobID, err := in.GetInt64("jobid")
if err != nil {
return nil, err
}
job := running.Get(jobID)
if job == nil {
return nil, errors.New("job not found")
}
job.mu.Lock()
defer job.mu.Unlock()
out = make(rc.Params)
err = rc.Reshape(&out, job)
if err != nil {
return nil, errors.Wrap(err, "reshape failed in job status")
}
return out, nil
}
func init() {
rc.Add(rc.Call{
Path: "job/list",
Fn: rcJobList,
Title: "Lists the IDs of the running jobs",
Help: `Parameters - None
Results
- jobids - array of integer job ids
`,
})
}
// Returns list of job ids.
func rcJobList(ctx context.Context, in rc.Params) (out rc.Params, err error) {
out = make(rc.Params)
out["jobids"] = running.IDs()
return out, nil
}
func init() {
rc.Add(rc.Call{
Path: "job/stop",
Fn: rcJobStop,
Title: "Stop the running job",
Help: `Parameters
- jobid - id of the job (integer)
`,
})
}
// Stops the running job.
func rcJobStop(ctx context.Context, in rc.Params) (out rc.Params, err error) {
jobID, err := in.GetInt64("jobid")
if err != nil {
return nil, err
}
job := running.Get(jobID)
if job == nil {
return nil, errors.New("job not found")
}
job.mu.Lock()
defer job.mu.Unlock()
out = make(rc.Params)
job.Stop()
return out, nil
}