232 lines
4.8 KiB
Go
232 lines
4.8 KiB
Go
package service
|
|
|
|
/*
|
|
Copyright 2025 Suyono <suyono3484@gmail.com>
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"golang.org/x/sys/unix"
|
|
"net"
|
|
"os"
|
|
"os/exec"
|
|
"sync/atomic"
|
|
"syscall"
|
|
"time"
|
|
|
|
pb "gitea.suyono.dev/suyono/go-agent/proto"
|
|
"gitea.suyono.dev/suyono/go-agent/storage"
|
|
"github.com/rs/zerolog"
|
|
"github.com/rs/zerolog/diode"
|
|
"github.com/rs/zerolog/log"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/protobuf/types/known/emptypb"
|
|
)
|
|
|
|
type Option interface {
|
|
SocketPath() string
|
|
StopGraceTime() time.Duration
|
|
Background() bool
|
|
ExecArgs() []string
|
|
ExecEnv() []string
|
|
ExecLogPath() string
|
|
Umask() int
|
|
}
|
|
|
|
type Storage interface {
|
|
Get(string) (string, error)
|
|
Set(string, string) error
|
|
Shutdown() error
|
|
}
|
|
|
|
type BgProcFlag int
|
|
|
|
const (
|
|
child BgProcFlag = iota
|
|
parent
|
|
)
|
|
|
|
const (
|
|
goAgentDaemonEnvFlag = "GO_AGENT_DAEMON"
|
|
goAgentDaemonLogPath = "GO_AGENT_DAEMON_LOG"
|
|
)
|
|
|
|
type server struct {
|
|
pb.UnimplementedAgentServer
|
|
stor Storage
|
|
}
|
|
|
|
func (s *server) Get(ctx context.Context, req *pb.CacheGetRequest) (*pb.CacheValue, error) {
|
|
val, err := s.stor.Get(req.GetKey())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
result := &pb.CacheValue{
|
|
Status: "OK",
|
|
Value: val,
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
func (s *server) Set(ctx context.Context, req *pb.CacheSetRequest) (*pb.SetStatus, error) {
|
|
err := s.stor.Set(req.GetKey(), req.GetValue())
|
|
if err != nil {
|
|
log.Error().Err(err).Caller().Msg("failed to set cache")
|
|
return nil, err
|
|
}
|
|
return &pb.SetStatus{
|
|
Status: "OK",
|
|
}, nil
|
|
}
|
|
|
|
func (s *server) Shutdown(context.Context, *emptypb.Empty) (*emptypb.Empty, error) {
|
|
if err := s.stor.Shutdown(); err != nil {
|
|
return nil, err
|
|
}
|
|
return &emptypb.Empty{}, nil
|
|
}
|
|
|
|
func ExecBackground(opt Option) (BgProcFlag, error) {
|
|
var (
|
|
cmd *exec.Cmd
|
|
envVal string
|
|
logPath string
|
|
err error
|
|
f *os.File
|
|
)
|
|
|
|
args := opt.ExecArgs()
|
|
envVal = os.Getenv(goAgentDaemonEnvFlag)
|
|
if !opt.Background() || len(args) == 0 || envVal == "1" {
|
|
if envVal == "1" {
|
|
logPath = os.Getenv(goAgentDaemonLogPath)
|
|
if logPath == "" {
|
|
panic(fmt.Errorf("cannot start background process without log path"))
|
|
}
|
|
|
|
if f, err = os.OpenFile(logPath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0600); err != nil {
|
|
panic(fmt.Errorf("opening log file: %w", err))
|
|
}
|
|
|
|
log.Logger = zerolog.New(diode.NewWriter(f, 1000, 10*time.Millisecond, func(missed int) {
|
|
fmt.Printf("Logger Dropped %d messages", missed)
|
|
}))
|
|
}
|
|
|
|
return child, nil
|
|
}
|
|
|
|
if opt.ExecLogPath() == "" {
|
|
return parent, fmt.Errorf("cannot start background process without log path")
|
|
}
|
|
|
|
if len(args) > 1 {
|
|
cmd = exec.Command(args[0], args[1:]...)
|
|
} else {
|
|
cmd = exec.Command(args[0])
|
|
}
|
|
cmd.Env = append(os.Environ(), opt.ExecEnv()...)
|
|
cmd.Env = append(cmd.Env,
|
|
fmt.Sprintf("%s=1", goAgentDaemonEnvFlag),
|
|
fmt.Sprintf("%s=%s", goAgentDaemonLogPath, opt.ExecLogPath()))
|
|
cmd.SysProcAttr = &syscall.SysProcAttr{Setsid: true}
|
|
cmd.Stdin, cmd.Stdout, cmd.Stderr = nil, nil, nil
|
|
|
|
if err = cmd.Start(); err != nil {
|
|
return parent, err
|
|
}
|
|
|
|
return parent, nil
|
|
}
|
|
|
|
func Serve(ctx context.Context, opt Option) error {
|
|
var (
|
|
l net.Listener
|
|
err error
|
|
errC error
|
|
ok bool
|
|
sw *atomic.Bool
|
|
cancel context.CancelFunc
|
|
bgFlag BgProcFlag
|
|
)
|
|
|
|
if bgFlag, err = ExecBackground(opt); err != nil {
|
|
return fmt.Errorf("could not start background process: %w", err)
|
|
}
|
|
if bgFlag == parent {
|
|
return nil
|
|
}
|
|
|
|
_ = os.Remove(opt.SocketPath())
|
|
|
|
oldUmask := unix.Umask(opt.Umask())
|
|
if l, err = net.Listen("unix", opt.SocketPath()); err != nil {
|
|
return fmt.Errorf("failed to listen on unix socket: %w", err)
|
|
}
|
|
unix.Umask(oldUmask)
|
|
|
|
grpcServer := grpc.NewServer()
|
|
ctx, cancel = context.WithCancel(ctx)
|
|
pb.RegisterAgentServer(grpcServer, &server{
|
|
stor: storage.NewInMap(cancel),
|
|
})
|
|
errChan := make(chan error, 1)
|
|
sw = new(atomic.Bool)
|
|
sw.Store(false)
|
|
|
|
go func() {
|
|
err := grpcServer.Serve(l)
|
|
sw.Swap(true)
|
|
if err != nil {
|
|
errChan <- err
|
|
}
|
|
close(errChan)
|
|
cancel()
|
|
}()
|
|
|
|
<-ctx.Done()
|
|
if !sw.Load() {
|
|
grpcServer.GracefulStop()
|
|
}
|
|
|
|
fst := time.NewTimer(opt.StopGraceTime())
|
|
err = nil
|
|
fsl:
|
|
for {
|
|
select {
|
|
case errC, ok = <-errChan:
|
|
if !ok {
|
|
break fsl
|
|
}
|
|
if errC != nil {
|
|
err = errC
|
|
}
|
|
case <-fst.C:
|
|
if !sw.Load() {
|
|
grpcServer.Stop()
|
|
}
|
|
break fsl
|
|
}
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func (s *server) Ping(ctx context.Context, req *emptypb.Empty) (*emptypb.Empty, error) {
|
|
return &emptypb.Empty{}, nil
|
|
}
|