Compare commits

..

3 Commits

Author SHA1 Message Date
dd66cb9f1e test: prepare 2023-12-09 08:40:07 +00:00
d5eb872b13 fix: mismatch WaitGroup 2023-12-09 04:29:38 +00:00
9128503da1 wip: pipe stdout & stderr 2023-12-09 02:44:06 +00:00
21 changed files with 519 additions and 31 deletions

View File

@ -16,7 +16,8 @@
"customizations": { "customizations": {
"vscode": { "vscode": {
"extensions": [ "extensions": [
"golang.go" "golang.go",
"ms-azuretools.vscode-docker"
] ]
} }
} }

4
cmd/experiment/.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
/dummy/dummy
/starter/starter
/oneshot/oneshot
/spawner/spawner

View File

@ -0,0 +1,18 @@
package main
import (
"fmt"
"log"
"time"
)
func main() {
for i := 0; i < 10; i++ {
log.Println("using log.Println")
fmt.Println("using fmt.Println")
time.Sleep(time.Millisecond * 500)
}
log.Println("log: finishing up")
fmt.Println("fmt: finishing up")
}

View File

@ -0,0 +1,67 @@
package main
import (
"log"
"math/rand"
"os"
"os/exec"
"gitea.suyono.dev/suyono/wingmate"
"github.com/spf13/viper"
)
const (
// DummyPath = "/workspaces/wingmate/cmd/experiment/dummy/dummy"
DummyPath = "/usr/local/bin/wmdummy"
EnvDummyPath = "DUMMY_PATH"
EnvPrefix = "WINGMATE"
EnvLog = "LOG"
EnvLogMessage = "LOG_MESSAGE"
EnvDefaultLogMessage = "oneshot executed"
)
func main() {
viper.SetEnvPrefix(EnvPrefix)
viper.BindEnv(EnvDummyPath, EnvLog, EnvLogMessage)
viper.SetDefault(EnvDummyPath, DummyPath)
viper.SetDefault(EnvLogMessage, EnvDefaultLogMessage)
exePath := viper.GetString(EnvDummyPath)
logPath := viper.GetString(EnvLog)
logMessage := viper.GetString(EnvLogMessage)
if logPath != "" {
var (
err error
file *os.File
)
if file, err = os.OpenFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0o666); err == nil {
defer func() {
_ = file.Close()
}()
if err = wingmate.NewLog(file); err == nil {
wingmate.Log().Info().Msg(logMessage)
}
}
}
StartRandomInstances(exePath)
}
func StartRandomInstances(exePath string) {
num := (rand.Uint32() % 16) + 16
var (
ctr uint32
cmd *exec.Cmd
err error
)
for ctr = 0; ctr < num; ctr++ {
cmd = exec.Command(exePath)
if err = cmd.Start(); err != nil {
log.Printf("failed to run %s: %+v\n", exePath, err)
}
}
}

View File

@ -0,0 +1,40 @@
package main
import (
"log"
"os/exec"
"time"
"github.com/spf13/viper"
)
const (
EnvPrefix = "WINGMATE"
EnvOneShotPath = "ONESHOT_PATH"
OneShotPath = "/usr/local/bin/wmoneshot"
)
func main() {
var (
cmd *exec.Cmd
err error
t *time.Ticker
)
viper.SetEnvPrefix(EnvPrefix)
viper.BindEnv(EnvOneShotPath)
viper.SetDefault(EnvOneShotPath, OneShotPath)
exePath := viper.GetString(EnvOneShotPath)
t = time.NewTicker(time.Second * 5)
for {
cmd = exec.Command(exePath)
if err = cmd.Run(); err != nil {
log.Printf("failed to run %s: %+v\n", exePath, err)
} else {
log.Printf("%s executed\n", exePath)
}
<-t.C
}
}

View File

@ -0,0 +1,73 @@
package main
import (
"bufio"
"io"
"log"
"os/exec"
"sync"
"github.com/spf13/viper"
)
const (
// DummyPath = "/workspaces/wingmate/cmd/experiment/dummy/dummy"
DummyPath = "/usr/local/bin/wmdummy"
EnvDummyPath = "DUMMY_PATH"
EnvPrefix = "WINGMATE"
)
func main() {
var (
stdout io.ReadCloser
stderr io.ReadCloser
wg *sync.WaitGroup
err error
exePath string
)
viper.SetEnvPrefix(EnvPrefix)
viper.BindEnv(EnvDummyPath)
viper.SetDefault(EnvDummyPath, DummyPath)
exePath = viper.GetString(EnvDummyPath)
cmd := exec.Command(exePath)
if stdout, err = cmd.StdoutPipe(); err != nil {
log.Panic(err)
}
if stderr, err = cmd.StderrPipe(); err != nil {
log.Panic(err)
}
wg = &sync.WaitGroup{}
wg.Add(2)
go pulley(wg, stdout, "stdout")
go pulley(wg, stderr, "stderr")
if err = cmd.Start(); err != nil {
log.Panic(err)
}
wg.Wait()
if err = cmd.Wait(); err != nil {
log.Printf("got error when Waiting for child process: %#v\n", err)
}
}
func pulley(wg *sync.WaitGroup, src io.ReadCloser, srcName string) {
defer wg.Done()
scanner := bufio.NewScanner(src)
for scanner.Scan() {
log.Printf("coming out from %s: %s\n", srcName, scanner.Text())
}
if err := scanner.Err(); err != nil {
log.Printf("got error whean reading from %s: %#v\n", srcName, err)
}
log.Printf("closing %s...\n", srcName)
_ = src.Close()
}

View File

@ -54,7 +54,7 @@ func Read() (*Config, error) {
} }
} }
if err != nil { if err != nil {
wingmate.Log().Error().Msgf("encounter error when reading service directory %s: %#v", svcdir, err) wingmate.Log().Error().Msgf("encounter error when reading service directory %s: %+v", svcdir, err)
} }
crontabfile = filepath.Join(configPath, CrontabFileName) crontabfile = filepath.Join(configPath, CrontabFileName)
@ -64,7 +64,7 @@ func Read() (*Config, error) {
cronAvailable = true cronAvailable = true
} }
if err != nil { if err != nil {
wingmate.Log().Error().Msgf("encounter error when reading crontab %s: %#v", crontabfile, err) wingmate.Log().Error().Msgf("encounter error when reading crontab %s: %+v", crontabfile, err)
} }
if !serviceAvailable && !cronAvailable { if !serviceAvailable && !cronAvailable {

View File

@ -89,27 +89,27 @@ func readCrontab(path string) ([]*Cron, error) {
hasRun: false, hasRun: false,
} }
if err = c.setField(minute, parts[1]); err != nil { if err = c.setField(minute, parts[1]); err != nil {
wingmate.Log().Error().Msgf("error parsing minute field %#v", err) wingmate.Log().Error().Msgf("error parsing minute field %+v", err)
continue continue
} }
if err = c.setField(hour, parts[2]); err != nil { if err = c.setField(hour, parts[2]); err != nil {
wingmate.Log().Error().Msgf("error parsing hour field %#v", err) wingmate.Log().Error().Msgf("error parsing hour field %+v", err)
continue continue
} }
if err = c.setField(dom, parts[3]); err != nil { if err = c.setField(dom, parts[3]); err != nil {
wingmate.Log().Error().Msgf("error parsing day of month field %#v", err) wingmate.Log().Error().Msgf("error parsing day of month field %+v", err)
continue continue
} }
if err = c.setField(month, parts[4]); err != nil { if err = c.setField(month, parts[4]); err != nil {
wingmate.Log().Error().Msgf("error parsing month field %#v", err) wingmate.Log().Error().Msgf("error parsing month field %+v", err)
continue continue
} }
if err = c.setField(dow, parts[5]); err != nil { if err = c.setField(dow, parts[5]); err != nil {
wingmate.Log().Error().Msgf("error parsing day of week field %#v", err) wingmate.Log().Error().Msgf("error parsing day of week field %+v", err)
continue continue
} }
@ -201,12 +201,12 @@ func (c *Cron) setField(field cronField, input string) error {
*cField = &specAny{} *cField = &specAny{}
} else if strings.HasPrefix(input, "*/") { } else if strings.HasPrefix(input, "*/") {
if parsed64, err = strconv.ParseUint(input[2:], 10, 8); err != nil { if parsed64, err = strconv.ParseUint(input[2:], 10, 8); err != nil {
return fmt.Errorf("error parse field %#v with input %s: %w", field, input, err) return fmt.Errorf("error parse field %+v with input %s: %w", field, input, err)
} }
parsed = uint8(parsed64) parsed = uint8(parsed64)
if fr.valid(parsed) { if !fr.valid(parsed) {
return fmt.Errorf("error parse field %#v with input %s: invalid value", field, input) return fmt.Errorf("error parse field %+v with input %s parsed to %d: invalid value", field, input, parsed)
} }
multi = make([]uint8, 0) multi = make([]uint8, 0)
current = parsed current = parsed
@ -224,12 +224,12 @@ func (c *Cron) setField(field cronField, input string) error {
multi = make([]uint8, 0) multi = make([]uint8, 0)
for _, s := range multiStr { for _, s := range multiStr {
if parsed64, err = strconv.ParseUint(s, 10, 8); err != nil { if parsed64, err = strconv.ParseUint(s, 10, 8); err != nil {
return fmt.Errorf("error parse field %#v with input %s: %w", field, input, err) return fmt.Errorf("error parse field %+v with input %s: %w", field, input, err)
} }
parsed = uint8(parsed64) parsed = uint8(parsed64)
if fr.valid(parsed) { if !fr.valid(parsed) {
return fmt.Errorf("error parse field %#v with input %s: invalid value", field, input) return fmt.Errorf("error parse field %+v with input %s: invalid value", field, input)
} }
multi = append(multi, parsed) multi = append(multi, parsed)
@ -240,12 +240,12 @@ func (c *Cron) setField(field cronField, input string) error {
} }
} else { } else {
if parsed64, err = strconv.ParseUint(input, 10, 8); err != nil { if parsed64, err = strconv.ParseUint(input, 10, 8); err != nil {
return fmt.Errorf("error parse field %#v with input %s: %w", field, input, err) return fmt.Errorf("error parse field %+v with input %s: %w", field, input, err)
} }
parsed = uint8(parsed64) parsed = uint8(parsed64)
if fr.valid(parsed) { if !fr.valid(parsed) {
return fmt.Errorf("error parse field %#v with input %s: invalid value", field, input) return fmt.Errorf("error parse field %+v with input %s: invalid value", field, input)
} }
*cField = &specExact{ *cField = &specExact{

33
docker/alpine/Dockerfile Normal file
View File

@ -0,0 +1,33 @@
FROM golang:1.21-alpine as builder
ADD . /root/wingmate
WORKDIR /root/wingmate/cmd/wingmate
RUN go build -v
WORKDIR /root/wingmate/cmd/experiment/dummy
RUN go build -v
WORKDIR /root/wingmate/cmd/experiment/starter
RUN go build -v
WORKDIR /root/wingmate/cmd/experiment/spawner
RUN go build -v
WORKDIR /root/wingmate/cmd/experiment/oneshot
RUN go build -v
FROM alpine:3.18
RUN apk add tzdata && ln -s /usr/share/zoneinfo/Australia/Sydney /etc/localtime
COPY --from=builder /root/wingmate/cmd/wingmate/wingmate /usr/local/bin/wingmate
COPY --from=builder /root/wingmate/cmd/experiment/dummy/dummy /usr/local/bin/wmdummy
COPY --from=builder /root/wingmate/cmd/experiment/starter/starter /usr/local/bin/wmstarter
COPY --from=builder /root/wingmate/cmd/experiment/oneshot/oneshot /usr/local/bin/wmoneshot
COPY --from=builder /root/wingmate/cmd/experiment/spawner/spawner /usr/local/bin/wmspawner
ADD --chmod=755 docker/alpine/entry.sh /usr/local/bin/entry.sh
ADD --chmod=755 docker/alpine/etc /etc
ENTRYPOINT [ "/usr/local/bin/entry.sh" ]
CMD [ "/usr/local/bin/wingmate" ]

7
docker/alpine/entry.sh Normal file
View File

@ -0,0 +1,7 @@
#!/bin/sh
if [ $# -gt 0 ]; then
exec "$@"
else
exec /usr/local/bin/wingmate
fi

View File

@ -0,0 +1 @@
*/5 * * * * /etc/wingmate/crontab.d/cron1.sh

View File

@ -0,0 +1,6 @@
#!/bin/sh
export WINGMATE_DUMMY_PATH=/usr/local/bin/wmdummy
export WINGMATE_LOG=/var/log/cron1.log
export WINGMATE_LOG_MESSAGE="cron executed in minute 5,10,15,20,25,30,35,40,45,50,55"
exec /usr/local/bin/wmoneshot

View File

@ -0,0 +1,4 @@
#!/bin/sh
export DUMMY_PATH=/usr/local/bin/wmdummy
exec /usr/local/bin/wmstarter

View File

@ -0,0 +1,5 @@
#!/bin/sh
export WINGMATE_ONESHOT_PATH=/usr/local/bin/wmoneshot
export WINGMATE_DUMMY_PATH=/usr/local/bin/wmdummy
exec /usr/local/bin/wmspawner

View File

@ -1,6 +1,7 @@
package init package init
import ( import (
"io"
"os/exec" "os/exec"
"sync" "sync"
"time" "time"
@ -8,19 +9,60 @@ import (
"gitea.suyono.dev/suyono/wingmate" "gitea.suyono.dev/suyono/wingmate"
) )
const (
cronTag = "cron"
)
func (i *Init) cron(wg *sync.WaitGroup, cron Cron, exitFlag <-chan any) { func (i *Init) cron(wg *sync.WaitGroup, cron Cron, exitFlag <-chan any) {
defer wg.Done() defer wg.Done()
var (
iwg *sync.WaitGroup
err error
stdout io.ReadCloser
stderr io.ReadCloser
)
ticker := time.NewTicker(time.Second * 20) ticker := time.NewTicker(time.Second * 20)
cron: cron:
for { for {
if cron.TimeToRun(time.Now()) { if cron.TimeToRun(time.Now()) {
cmd := exec.Command(cron.Command().Path()) cmd := exec.Command(cron.Command().Path())
if err := cmd.Run(); err != nil { iwg = &sync.WaitGroup{}
wingmate.Log().Error().Msgf("running cron %s error %#v", cron.Command().Path(), err)
if stdout, err = cmd.StdoutPipe(); err != nil {
wingmate.Log().Error().Str(cronTag, cron.Command().Path()).Msgf("stdout pipe: %+v", err)
goto fail
}
if stderr, err = cmd.StderrPipe(); err != nil {
wingmate.Log().Error().Str(cronTag, cron.Command().Path()).Msgf("stderr pipe: %+v", err)
_ = stdout.Close()
goto fail
}
iwg.Add(1)
go i.pipeReader(iwg, stdout, cronTag, cron.Command().Path())
iwg.Add(1)
go i.pipeReader(iwg, stderr, cronTag, cron.Command().Path())
if err := cmd.Start(); err != nil {
wingmate.Log().Error().Msgf("starting cron %s error %+v", cron.Command().Path(), err)
_ = stdout.Close()
_ = stderr.Close()
iwg.Wait()
goto fail
}
iwg.Wait()
if err = cmd.Wait(); err != nil {
wingmate.Log().Error().Str(cronTag, cron.Command().Path()).Msgf("got error when waiting: %+v", err)
} }
} }
fail:
select { select {
case <-exitFlag: case <-exitFlag:
ticker.Stop() ticker.Stop()

View File

@ -1,26 +1,76 @@
package init package init
import ( import (
"bufio"
"io"
"os/exec" "os/exec"
"sync" "sync"
"time"
"gitea.suyono.dev/suyono/wingmate" "gitea.suyono.dev/suyono/wingmate"
) )
const (
serviceTag = "service"
)
func (i *Init) service(wg *sync.WaitGroup, path Path, exitFlag <-chan any) { func (i *Init) service(wg *sync.WaitGroup, path Path, exitFlag <-chan any) {
defer wg.Done() defer wg.Done()
var ( var (
err error err error
iwg *sync.WaitGroup
stderr io.ReadCloser
stdout io.ReadCloser
failStatus bool
) )
defer func() {
wingmate.Log().Info().Str(serviceTag, path.Path()).Msg("stopped")
}()
service: service:
for { for {
failStatus = false
cmd := exec.Command(path.Path()) cmd := exec.Command(path.Path())
if err = cmd.Run(); err != nil { iwg = &sync.WaitGroup{}
if stdout, err = cmd.StdoutPipe(); err != nil {
wingmate.Log().Error().Str(serviceTag, path.Path()).Msgf("stdout pipe: %#v", err)
failStatus = true
goto fail
}
iwg.Add(1)
go i.pipeReader(iwg, stdout, serviceTag, path.Path())
if stderr, err = cmd.StderrPipe(); err != nil {
wingmate.Log().Error().Str(serviceTag, path.Path()).Msgf("stderr pipe: %#v", err)
_ = stdout.Close()
failStatus = true
goto fail
}
iwg.Add(1)
go i.pipeReader(iwg, stderr, serviceTag, path.Path())
if err = cmd.Start(); err != nil {
wingmate.Log().Error().Msgf("starting service %s error %#v", path.Path(), err) wingmate.Log().Error().Msgf("starting service %s error %#v", path.Path(), err)
failStatus = true
_ = stdout.Close()
_ = stderr.Close()
iwg.Wait()
goto fail
} }
iwg.Wait()
if err = cmd.Wait(); err != nil {
wingmate.Log().Error().Str(serviceTag, path.Path()).Msgf("got error when waiting: %+v", err)
}
fail:
if failStatus {
time.Sleep(time.Second)
failStatus = false
}
select { select {
case <-exitFlag: case <-exitFlag:
break service break service
@ -29,3 +79,18 @@ service:
} }
} }
func (i *Init) pipeReader(wg *sync.WaitGroup, pipe io.ReadCloser, tag, serviceName string) {
defer wg.Done()
scanner := bufio.NewScanner(pipe)
for scanner.Scan() {
wingmate.Log().Info().Str(tag, serviceName).Msg(scanner.Text())
}
if err := scanner.Err(); err != nil {
wingmate.Log().Error().Str(tag, serviceName).Msgf("got error when reading pipe: %#v", err)
}
wingmate.Log().Info().Str(tag, serviceName).Msg("closing pipe")
}

View File

@ -5,11 +5,16 @@ import (
"os/signal" "os/signal"
"sync" "sync"
"gitea.suyono.dev/suyono/wingmate"
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
) )
func (i *Init) sighandler(wg *sync.WaitGroup, trigger chan<- any, selfExit <-chan any) { func (i *Init) sighandler(wg *sync.WaitGroup, trigger chan<- any, selfExit <-chan any) {
defer wg.Wait() defer wg.Done()
defer func() {
wingmate.Log().Warn().Msg("signal handler: exiting")
}()
isOpen := true isOpen := true
@ -23,7 +28,10 @@ signal:
switch s { switch s {
case unix.SIGTERM, unix.SIGINT: case unix.SIGTERM, unix.SIGINT:
if isOpen { if isOpen {
wingmate.Log().Info().Msg("initiating shutdown...")
close(trigger) close(trigger)
wg.Add(1)
go i.signalPump(wg, selfExit)
isOpen = false isOpen = false
} }
case unix.SIGCHLD: case unix.SIGCHLD:
@ -31,6 +39,7 @@ signal:
} }
case <-selfExit: case <-selfExit:
wingmate.Log().Warn().Msg("signal handler received completion flag")
break signal break signal
} }
} }

71
init/signal-pump.go Normal file
View File

@ -0,0 +1,71 @@
package init
import (
"sync"
"time"
"gitea.suyono.dev/suyono/wingmate"
"golang.org/x/sys/unix"
)
type status int
const (
triggered status = iota
expired
)
func (i *Init) signalPump(wg *sync.WaitGroup, selfExit <-chan any) {
defer wg.Done()
defer func() {
wingmate.Log().Info().Msg("signal pump completed")
}()
if seStatus := i.sigTermPump(time.Now(), selfExit); seStatus == triggered {
return
}
i.sigKillPump(time.Now(), selfExit)
}
func (i *Init) sigKillPump(startTime time.Time, selfExit <-chan any) {
t := time.NewTicker(time.Millisecond * 200)
defer t.Stop()
wingmate.Log().Info().Msg("start pumping SIGKILL signal")
defer func() {
wingmate.Log().Info().Msg("stop pumping SIGKILL signal")
}()
for time.Since(startTime) < time.Second {
_ = unix.Kill(-1, unix.SIGKILL)
select {
case <-t.C:
case <-selfExit:
return
}
}
}
func (i *Init) sigTermPump(startTime time.Time, selfExit <-chan any) status {
t := time.NewTicker(time.Millisecond * 100)
defer t.Stop()
wingmate.Log().Info().Msg("start pumping SIGTERM signal")
defer func() {
wingmate.Log().Info().Msg("stop pumping SIGTERM signal")
}()
for time.Since(startTime) < time.Duration(time.Second*4) {
_ = unix.Kill(-1, unix.SIGTERM)
select {
case <-t.C:
case <-selfExit:
return triggered
}
}
return expired
}

View File

@ -3,7 +3,9 @@ package init
import ( import (
"errors" "errors"
"sync" "sync"
"time"
"gitea.suyono.dev/suyono/wingmate"
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
) )
@ -13,24 +15,42 @@ func (i *Init) waiter(wg *sync.WaitGroup, runningFlag <-chan any, sigHandlerFlag
// pid int // pid int
err error err error
running bool running bool
flagged bool
) )
defer wg.Done()
defer func() {
wingmate.Log().Info().Msg("waiter exiting...")
}()
running = true running = true
flagged = true
wait: wait:
for { for {
if running {
select { select {
case <-runningFlag: case <-runningFlag:
wingmate.Log().Info().Msg("waiter received shutdown signal...")
running = false running = false
default: default:
} }
}
if _, err = unix.Wait4(-1, &ws, 0, nil); err != nil { if _, err = unix.Wait4(-1, &ws, 0, nil); err != nil {
if errors.Is(err, unix.ECHILD) { if errors.Is(err, unix.ECHILD) {
if !running { if !running {
if flagged {
close(sigHandlerFlag) close(sigHandlerFlag)
flagged = false
wingmate.Log().Warn().Msg("waiter: inner flag")
}
wingmate.Log().Warn().Msg("waiter: no child left")
break wait break wait
} }
} }
wingmate.Log().Warn().Msgf("Wait4 returns error: %+v", err)
time.Sleep(time.Millisecond * 100)
} }
} }
} }

View File

@ -1,9 +1,15 @@
package wingmate package wingmate
import ( import (
"io"
"time"
"gitea.suyono.dev/suyono/wingmate/logger" "gitea.suyono.dev/suyono/wingmate/logger"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"io" )
const (
timeTag = "time"
) )
var ( var (
@ -29,13 +35,28 @@ func Log() logger.Log {
} }
func (w *wrapper) Info() logger.Content { func (w *wrapper) Info() logger.Content {
return w.log.Info() return (*eventWrapper)(w.log.Info().Time(timeTag, time.Now()))
} }
func (w *wrapper) Warn() logger.Content { func (w *wrapper) Warn() logger.Content {
return w.log.Warn() return (*eventWrapper)(w.log.Warn().Time(timeTag, time.Now()))
} }
func (w *wrapper) Error() logger.Content { func (w *wrapper) Error() logger.Content {
return w.log.Error() return (*eventWrapper)(w.log.Error().Time(timeTag, time.Now()))
}
type eventWrapper zerolog.Event
func (w *eventWrapper) Msg(msg string) {
(*zerolog.Event)(w).Msg(msg)
}
func (w *eventWrapper) Msgf(format string, data ...any) {
(*zerolog.Event)(w).Msgf(format, data...)
}
func (w *eventWrapper) Str(key, value string) logger.Content {
rv := (*zerolog.Event)(w).Str(key, value)
return (*eventWrapper)(rv)
} }

View File

@ -3,6 +3,7 @@ package logger
type Content interface { type Content interface {
Msg(string) Msg(string)
Msgf(string, ...any) Msgf(string, ...any)
Str(string, string) Content
} }
type Level interface { type Level interface {