package service /* Copyright 2025 Suyono Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ import ( "context" "fmt" "golang.org/x/sys/unix" "net" "os" "os/exec" "sync/atomic" "syscall" "time" pb "gitea.suyono.dev/suyono/go-agent/proto" "gitea.suyono.dev/suyono/go-agent/storage" "github.com/rs/zerolog" "github.com/rs/zerolog/diode" "github.com/rs/zerolog/log" "google.golang.org/grpc" "google.golang.org/protobuf/types/known/emptypb" ) type Option interface { SocketPath() string StopGraceTime() time.Duration Background() bool ExecArgs() []string ExecEnv() []string ExecLogPath() string Umask() int } type Storage interface { Get(string) (string, error) Set(string, string) error Shutdown() error } type BgProcFlag int const ( child BgProcFlag = iota parent ) const ( goAgentDaemonEnvFlag = "GO_AGENT_DAEMON" goAgentDaemonLogPath = "GO_AGENT_DAEMON_LOG" ) type server struct { pb.UnimplementedAgentServer stor Storage } func (s *server) Get(ctx context.Context, req *pb.CacheGetRequest) (*pb.CacheValue, error) { val, err := s.stor.Get(req.GetKey()) if err != nil { return nil, err } result := &pb.CacheValue{ Status: "OK", Value: val, } return result, nil } func (s *server) Set(ctx context.Context, req *pb.CacheSetRequest) (*pb.SetStatus, error) { err := s.stor.Set(req.GetKey(), req.GetValue()) if err != nil { log.Error().Err(err).Caller().Msg("failed to set cache") return nil, err } return &pb.SetStatus{ Status: "OK", }, nil } func (s *server) Shutdown(context.Context, *emptypb.Empty) (*emptypb.Empty, error) { if err := s.stor.Shutdown(); err != nil { return nil, err } return &emptypb.Empty{}, nil } func ExecBackground(opt Option) (BgProcFlag, error) { var ( cmd *exec.Cmd envVal string logPath string err error f *os.File ) args := opt.ExecArgs() envVal = os.Getenv(goAgentDaemonEnvFlag) if !opt.Background() || len(args) == 0 || envVal == "1" { if envVal == "1" { logPath = os.Getenv(goAgentDaemonLogPath) if logPath == "" { panic(fmt.Errorf("cannot start background process without log path")) } if f, err = os.OpenFile(logPath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0600); err != nil { panic(fmt.Errorf("opening log file: %w", err)) } log.Logger = zerolog.New(diode.NewWriter(f, 1000, 10*time.Millisecond, func(missed int) { fmt.Printf("Logger Dropped %d messages", missed) })) } return child, nil } if opt.ExecLogPath() == "" { return parent, fmt.Errorf("cannot start background process without log path") } if len(args) > 1 { cmd = exec.Command(args[0], args[1:]...) } else { cmd = exec.Command(args[0]) } cmd.Env = append(os.Environ(), opt.ExecEnv()...) cmd.Env = append(cmd.Env, fmt.Sprintf("%s=1", goAgentDaemonEnvFlag), fmt.Sprintf("%s=%s", goAgentDaemonLogPath, opt.ExecLogPath())) cmd.SysProcAttr = &syscall.SysProcAttr{Setsid: true} cmd.Stdin, cmd.Stdout, cmd.Stderr = nil, nil, nil if err = cmd.Start(); err != nil { return parent, err } return parent, nil } func Serve(ctx context.Context, opt Option) error { var ( l net.Listener err error errC error ok bool sw *atomic.Bool cancel context.CancelFunc bgFlag BgProcFlag ) if bgFlag, err = ExecBackground(opt); err != nil { return fmt.Errorf("could not start background process: %w", err) } if bgFlag == parent { return nil } _ = os.Remove(opt.SocketPath()) oldUmask := unix.Umask(opt.Umask()) if l, err = net.Listen("unix", opt.SocketPath()); err != nil { return fmt.Errorf("failed to listen on unix socket: %w", err) } unix.Umask(oldUmask) grpcServer := grpc.NewServer() ctx, cancel = context.WithCancel(ctx) pb.RegisterAgentServer(grpcServer, &server{ stor: storage.NewInMap(cancel), }) errChan := make(chan error, 1) sw = new(atomic.Bool) sw.Store(false) go func() { err := grpcServer.Serve(l) sw.Swap(true) if err != nil { errChan <- err } close(errChan) cancel() }() <-ctx.Done() if !sw.Load() { grpcServer.GracefulStop() } fst := time.NewTimer(opt.StopGraceTime()) err = nil fsl: for { select { case errC, ok = <-errChan: if !ok { break fsl } if errC != nil { err = errC } case <-fst.C: if !sw.Load() { grpcServer.Stop() } break fsl } } return err } func (s *server) Ping(ctx context.Context, req *emptypb.Empty) (*emptypb.Empty, error) { return &emptypb.Empty{}, nil }