gitea/routers/api/actions/runner/runner.go

222 lines
6.0 KiB
Go

// Copyright 2022 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package runner
import (
"context"
"errors"
"net/http"
actions_model "code.gitea.io/gitea/models/actions"
"code.gitea.io/gitea/modules/actions"
"code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/log"
actions_service "code.gitea.io/gitea/services/actions"
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
"code.gitea.io/actions-proto-go/runner/v1/runnerv1connect"
"github.com/bufbuild/connect-go"
gouuid "github.com/google/uuid"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func NewRunnerServiceHandler() (string, http.Handler) {
return runnerv1connect.NewRunnerServiceHandler(
&Service{},
connect.WithCompressMinBytes(1024),
withRunner,
)
}
var _ runnerv1connect.RunnerServiceClient = (*Service)(nil)
type Service struct {
runnerv1connect.UnimplementedRunnerServiceHandler
}
// Register for new runner.
func (s *Service) Register(
ctx context.Context,
req *connect.Request[runnerv1.RegisterRequest],
) (*connect.Response[runnerv1.RegisterResponse], error) {
if req.Msg.Token == "" || req.Msg.Name == "" {
return nil, errors.New("missing runner token, name")
}
runnerToken, err := actions_model.GetRunnerToken(ctx, req.Msg.Token)
if err != nil {
return nil, errors.New("runner token not found")
}
if runnerToken.IsActive {
return nil, errors.New("runner token has already activated")
}
// create new runner
runner := &actions_model.ActionRunner{
UUID: gouuid.New().String(),
Name: req.Msg.Name,
OwnerID: runnerToken.OwnerID,
RepoID: runnerToken.RepoID,
AgentLabels: req.Msg.AgentLabels,
CustomLabels: req.Msg.CustomLabels,
}
if err := runner.GenerateToken(); err != nil {
return nil, errors.New("can't generate token")
}
// create new runner
if err := actions_model.CreateRunner(ctx, runner); err != nil {
return nil, errors.New("can't create new runner")
}
// update token status
runnerToken.IsActive = true
if err := actions_model.UpdateRunnerToken(ctx, runnerToken, "is_active"); err != nil {
return nil, errors.New("can't update runner token status")
}
res := connect.NewResponse(&runnerv1.RegisterResponse{
Runner: &runnerv1.Runner{
Id: runner.ID,
Uuid: runner.UUID,
Token: runner.Token,
Name: runner.Name,
AgentLabels: runner.AgentLabels,
CustomLabels: runner.CustomLabels,
},
})
return res, nil
}
// FetchTask assigns a task to the runner
func (s *Service) FetchTask(
ctx context.Context,
req *connect.Request[runnerv1.FetchTaskRequest],
) (*connect.Response[runnerv1.FetchTaskResponse], error) {
runner := GetRunner(ctx)
var task *runnerv1.Task
if t, ok, err := pickTask(ctx, runner); err != nil {
log.Error("pick task failed: %v", err)
return nil, status.Errorf(codes.Internal, "pick task: %v", err)
} else if ok {
task = t
}
res := connect.NewResponse(&runnerv1.FetchTaskResponse{
Task: task,
})
return res, nil
}
// UpdateTask updates the task status.
func (s *Service) UpdateTask(
ctx context.Context,
req *connect.Request[runnerv1.UpdateTaskRequest],
) (*connect.Response[runnerv1.UpdateTaskResponse], error) {
{
// to debug strange runner behaviors, it could be removed if all problems have been solved.
stateMsg, _ := json.Marshal(req.Msg.State)
log.Trace("update task with state: %s", stateMsg)
}
// Get Task first
task, err := actions_model.GetTaskByID(ctx, req.Msg.State.Id)
if err != nil {
return nil, status.Errorf(codes.Internal, "can't find the task: %v", err)
}
if task.Status.IsCancelled() {
return connect.NewResponse(&runnerv1.UpdateTaskResponse{
State: &runnerv1.TaskState{
Id: req.Msg.State.Id,
Result: task.Status.AsResult(),
},
}), nil
}
task, err = actions_model.UpdateTaskByState(ctx, req.Msg.State)
if err != nil {
return nil, status.Errorf(codes.Internal, "update task: %v", err)
}
if err := task.LoadJob(ctx); err != nil {
return nil, status.Errorf(codes.Internal, "load job: %v", err)
}
if err := actions_service.CreateCommitStatus(ctx, task.Job); err != nil {
log.Error("Update commit status failed: %v", err)
// go on
}
if req.Msg.State.Result != runnerv1.Result_RESULT_UNSPECIFIED {
if err := actions_service.EmitJobsIfReady(task.Job.RunID); err != nil {
log.Error("Emit ready jobs of run %d: %v", task.Job.RunID, err)
}
}
return connect.NewResponse(&runnerv1.UpdateTaskResponse{
State: &runnerv1.TaskState{
Id: req.Msg.State.Id,
Result: task.Status.AsResult(),
},
}), nil
}
// UpdateLog uploads log of the task.
func (s *Service) UpdateLog(
ctx context.Context,
req *connect.Request[runnerv1.UpdateLogRequest],
) (*connect.Response[runnerv1.UpdateLogResponse], error) {
res := connect.NewResponse(&runnerv1.UpdateLogResponse{})
task, err := actions_model.GetTaskByID(ctx, req.Msg.TaskId)
if err != nil {
return nil, status.Errorf(codes.Internal, "get task: %v", err)
}
ack := task.LogLength
if len(req.Msg.Rows) == 0 || req.Msg.Index > ack || int64(len(req.Msg.Rows))+req.Msg.Index <= ack {
res.Msg.AckIndex = ack
return res, nil
}
if task.LogInStorage {
return nil, status.Errorf(codes.AlreadyExists, "log file has been archived")
}
rows := req.Msg.Rows[ack-req.Msg.Index:]
ns, err := actions.WriteLogs(ctx, task.LogFilename, task.LogSize, rows)
if err != nil {
return nil, status.Errorf(codes.Internal, "write logs: %v", err)
}
task.LogLength += int64(len(rows))
for _, n := range ns {
task.LogIndexes = append(task.LogIndexes, task.LogSize)
task.LogSize += int64(n)
}
res.Msg.AckIndex = task.LogLength
var remove func()
if req.Msg.NoMore {
task.LogInStorage = true
remove, err = actions.TransferLogs(ctx, task.LogFilename)
if err != nil {
return nil, status.Errorf(codes.Internal, "transfer logs: %v", err)
}
}
if err := actions_model.UpdateTask(ctx, task, "log_indexes", "log_length", "log_size", "log_in_storage"); err != nil {
return nil, status.Errorf(codes.Internal, "update task: %v", err)
}
if remove != nil {
remove()
}
return res, nil
}