Files
go-agent/service/serve.go

232 lines
4.8 KiB
Go

package service
/*
Copyright 2025 Suyono <suyono3484@gmail.com>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import (
"context"
"fmt"
"golang.org/x/sys/unix"
"net"
"os"
"os/exec"
"sync/atomic"
"syscall"
"time"
pb "gitea.suyono.dev/suyono/go-agent/proto"
"gitea.suyono.dev/suyono/go-agent/storage"
"github.com/rs/zerolog"
"github.com/rs/zerolog/diode"
"github.com/rs/zerolog/log"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"
)
type Option interface {
SocketPath() string
StopGraceTime() time.Duration
Background() bool
ExecArgs() []string
ExecEnv() []string
ExecLogPath() string
Umask() int
}
type Storage interface {
Get(string) (string, error)
Set(string, string) error
Shutdown() error
}
type BgProcFlag int
const (
child BgProcFlag = iota
parent
)
const (
goAgentDaemonEnvFlag = "GO_AGENT_DAEMON"
goAgentDaemonLogPath = "GO_AGENT_DAEMON_LOG"
)
type server struct {
pb.UnimplementedAgentServer
stor Storage
}
func (s *server) Get(ctx context.Context, req *pb.CacheGetRequest) (*pb.CacheValue, error) {
val, err := s.stor.Get(req.GetKey())
if err != nil {
return nil, err
}
result := &pb.CacheValue{
Status: "OK",
Value: val,
}
return result, nil
}
func (s *server) Set(ctx context.Context, req *pb.CacheSetRequest) (*pb.SetStatus, error) {
err := s.stor.Set(req.GetKey(), req.GetValue())
if err != nil {
log.Error().Err(err).Caller().Msg("failed to set cache")
return nil, err
}
return &pb.SetStatus{
Status: "OK",
}, nil
}
func (s *server) Shutdown(context.Context, *emptypb.Empty) (*emptypb.Empty, error) {
if err := s.stor.Shutdown(); err != nil {
return nil, err
}
return &emptypb.Empty{}, nil
}
func ExecBackground(opt Option) (BgProcFlag, error) {
var (
cmd *exec.Cmd
envVal string
logPath string
err error
f *os.File
)
args := opt.ExecArgs()
envVal = os.Getenv(goAgentDaemonEnvFlag)
if !opt.Background() || len(args) == 0 || envVal == "1" {
if envVal == "1" {
logPath = os.Getenv(goAgentDaemonLogPath)
if logPath == "" {
panic(fmt.Errorf("cannot start background process without log path"))
}
if f, err = os.OpenFile(logPath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0600); err != nil {
panic(fmt.Errorf("opening log file: %w", err))
}
log.Logger = zerolog.New(diode.NewWriter(f, 1000, 10*time.Millisecond, func(missed int) {
fmt.Printf("Logger Dropped %d messages", missed)
}))
}
return child, nil
}
if opt.ExecLogPath() == "" {
return parent, fmt.Errorf("cannot start background process without log path")
}
if len(args) > 1 {
cmd = exec.Command(args[0], args[1:]...)
} else {
cmd = exec.Command(args[0])
}
cmd.Env = append(os.Environ(), opt.ExecEnv()...)
cmd.Env = append(cmd.Env,
fmt.Sprintf("%s=1", goAgentDaemonEnvFlag),
fmt.Sprintf("%s=%s", goAgentDaemonLogPath, opt.ExecLogPath()))
cmd.SysProcAttr = &syscall.SysProcAttr{Setsid: true}
cmd.Stdin, cmd.Stdout, cmd.Stderr = nil, nil, nil
if err = cmd.Start(); err != nil {
return parent, err
}
return parent, nil
}
func Serve(ctx context.Context, opt Option) error {
var (
l net.Listener
err error
errC error
ok bool
sw *atomic.Bool
cancel context.CancelFunc
bgFlag BgProcFlag
)
if bgFlag, err = ExecBackground(opt); err != nil {
return fmt.Errorf("could not start background process: %w", err)
}
if bgFlag == parent {
return nil
}
_ = os.Remove(opt.SocketPath())
oldUmask := unix.Umask(opt.Umask())
if l, err = net.Listen("unix", opt.SocketPath()); err != nil {
return fmt.Errorf("failed to listen on unix socket: %w", err)
}
unix.Umask(oldUmask)
grpcServer := grpc.NewServer()
ctx, cancel = context.WithCancel(ctx)
pb.RegisterAgentServer(grpcServer, &server{
stor: storage.NewInMap(cancel),
})
errChan := make(chan error, 1)
sw = new(atomic.Bool)
sw.Store(false)
go func() {
err := grpcServer.Serve(l)
sw.Swap(true)
if err != nil {
errChan <- err
}
close(errChan)
cancel()
}()
<-ctx.Done()
if !sw.Load() {
grpcServer.GracefulStop()
}
fst := time.NewTimer(opt.StopGraceTime())
err = nil
fsl:
for {
select {
case errC, ok = <-errChan:
if !ok {
break fsl
}
if errC != nil {
err = errC
}
case <-fst.C:
if !sw.Load() {
grpcServer.Stop()
}
break fsl
}
}
return err
}
func (s *server) Ping(ctx context.Context, req *emptypb.Empty) (*emptypb.Empty, error) {
return &emptypb.Empty{}, nil
}