wip: pipe stdout & stderr

This commit is contained in:
Suyono 2023-12-09 02:44:06 +00:00
parent d9d1fe72d4
commit 9128503da1
8 changed files with 217 additions and 6 deletions

2
cmd/experiment/.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
/dummy/dummy
/starter/starter

View File

@ -0,0 +1,18 @@
package main
import (
"fmt"
"log"
"time"
)
func main() {
for i := 0; i < 10; i++ {
log.Println("using log.Println")
fmt.Println("using fmt.Println")
time.Sleep(time.Millisecond * 500)
}
log.Println("log: finishing up")
fmt.Println("fmt: finishing up")
}

View File

@ -0,0 +1,61 @@
package main
import (
"bufio"
"io"
"log"
"os/exec"
"sync"
)
const (
DummyPath = "/workspaces/wingmate/cmd/experiment/dummy/dummy"
)
func main() {
var (
stdout io.ReadCloser
stderr io.ReadCloser
wg *sync.WaitGroup
err error
)
cmd := exec.Command(DummyPath)
if stdout, err = cmd.StdoutPipe(); err != nil {
log.Panic(err)
}
if stderr, err = cmd.StderrPipe(); err != nil {
log.Panic(err)
}
wg = &sync.WaitGroup{}
wg.Add(2)
go pulley(wg, stdout, "stdout")
go pulley(wg, stderr, "stderr")
if err = cmd.Start(); err != nil {
log.Panic(err)
}
wg.Wait()
if err = cmd.Wait(); err != nil {
log.Printf("got error when Waiting for child process: %#v\n", err)
}
}
func pulley(wg *sync.WaitGroup, src io.ReadCloser, srcName string) {
defer wg.Done()
scanner := bufio.NewScanner(src)
for scanner.Scan() {
log.Printf("coming out from %s: %s\n", srcName, scanner.Text())
}
if err := scanner.Err(); err != nil {
log.Printf("got error whean reading from %s: %#v\n", srcName, err)
}
log.Printf("closing %s...\n", srcName)
_ = src.Close()
}

View File

@ -1,26 +1,69 @@
package init
import (
"bufio"
"io"
"os/exec"
"sync"
"time"
"gitea.suyono.dev/suyono/wingmate"
)
const (
serviceTag = "service"
)
func (i *Init) service(wg *sync.WaitGroup, path Path, exitFlag <-chan any) {
defer wg.Done()
var (
err error
err error
iwg *sync.WaitGroup
stderr io.ReadCloser
stdout io.ReadCloser
failStatus bool
)
failStatus = false
service:
for {
cmd := exec.Command(path.Path())
if err = cmd.Run(); err != nil {
iwg = &sync.WaitGroup{}
if stdout, err = cmd.StdoutPipe(); err != nil {
wingmate.Log().Error().Str(serviceTag, path.Path()).Msgf("stdout pipe: %#v", err)
failStatus = true
goto fail
}
iwg.Add(1)
go i.pipeReader(iwg, stdout, path.Path())
if stderr, err = cmd.StderrPipe(); err != nil {
wingmate.Log().Error().Str(serviceTag, path.Path()).Msgf("stderr pipe: %#v", err)
_ = stdout.Close()
failStatus = true
goto fail
}
iwg.Add(1)
go i.pipeReader(iwg, stderr, path.Path())
if err = cmd.Start(); err != nil {
wingmate.Log().Error().Msgf("starting service %s error %#v", path.Path(), err)
failStatus = true
goto fail
}
iwg.Wait()
if err = cmd.Wait(); err != nil {
wingmate.Log().Error().Str(serviceTag, path.Path()).Msgf("got error when waiting: %#v", err)
}
fail:
if failStatus {
time.Sleep(time.Second)
failStatus = false
}
select {
case <-exitFlag:
break service
@ -29,3 +72,16 @@ service:
}
}
func (i *Init) pipeReader(wg *sync.WaitGroup, pipe io.ReadCloser, serviceName string) {
defer wg.Done()
scanner := bufio.NewScanner(pipe)
for scanner.Scan() {
wingmate.Log().Info().Str(serviceTag, serviceName).Msg(scanner.Text())
}
if err := scanner.Err(); err != nil {
wingmate.Log().Error().Str(serviceTag, serviceName).Msgf("got error when reading pipe: %#v", err)
}
}

View File

@ -24,6 +24,8 @@ signal:
case unix.SIGTERM, unix.SIGINT:
if isOpen {
close(trigger)
wg.Add(1)
go i.signalPump(wg, selfExit)
isOpen = false
}
case unix.SIGCHLD:

55
init/signal-pump.go Normal file
View File

@ -0,0 +1,55 @@
package init
import (
"sync"
"time"
"golang.org/x/sys/unix"
)
type status int
const (
triggered status = iota
expired
)
func (i *Init) signalPump(wg *sync.WaitGroup, selfExit <-chan any) {
if seStatus := i.sigTermPump(time.Now(), selfExit); seStatus == triggered {
return
}
i.sigKillPump(time.Now(), selfExit)
}
func (i *Init) sigKillPump(startTime time.Time, selfExit <-chan any) {
t := time.NewTicker(time.Millisecond * 200)
defer t.Stop()
for time.Since(startTime) < time.Second {
_ = unix.Kill(-1, unix.SIGKILL)
select {
case <-t.C:
case <-selfExit:
return
}
}
}
func (i *Init) sigTermPump(startTime time.Time, selfExit <-chan any) status {
t := time.NewTicker(time.Millisecond * 100)
defer t.Stop()
for time.Since(startTime) < time.Duration(time.Second*4) {
_ = unix.Kill(-1, unix.SIGTERM)
select {
case <-t.C:
case <-selfExit:
return triggered
}
}
return expired
}

View File

@ -1,9 +1,10 @@
package wingmate
import (
"io"
"gitea.suyono.dev/suyono/wingmate/logger"
"github.com/rs/zerolog"
"io"
)
var (
@ -29,13 +30,28 @@ func Log() logger.Log {
}
func (w *wrapper) Info() logger.Content {
return w.log.Info()
return (*eventWrapper)(w.log.Info())
}
func (w *wrapper) Warn() logger.Content {
return w.log.Warn()
return (*eventWrapper)(w.log.Warn())
}
func (w *wrapper) Error() logger.Content {
return w.log.Error()
return (*eventWrapper)(w.log.Error())
}
type eventWrapper zerolog.Event
func (w *eventWrapper) Msg(msg string) {
(*zerolog.Event)(w).Msg(msg)
}
func (w *eventWrapper) Msgf(format string, data ...any) {
(*zerolog.Event)(w).Msgf(format, data...)
}
func (w *eventWrapper) Str(key, value string) logger.Content {
rv := (*zerolog.Event)(w).Str(key, value)
return (*eventWrapper)(rv)
}

View File

@ -3,6 +3,7 @@ package logger
type Content interface {
Msg(string)
Msgf(string, ...any)
Str(string, string) Content
}
type Level interface {