looks good, it should be test worthy

This commit is contained in:
Suyono 2025-05-10 11:33:30 +10:00
parent 0c1a9fb7e7
commit 9eeda34e19
3 changed files with 90 additions and 67 deletions

View File

@ -51,36 +51,54 @@ func initUDP(wg *sync.WaitGroup, ctx context.Context, bp *BufPool, cfg abstract.
msgChan: make(chan udpMessage), msgChan: make(chan udpMessage),
bufPool: bp, bufPool: bp,
} }
defer wg.Done()
go func(wg *sync.WaitGroup, ctx context.Context, cfg abstract.UDPConnectionConfig, backend *backendUDP) { wg.Add(1)
defer wg.Done() handlerSpawn(wg, ctx, backend)
incoming := backend.msgChan wg.Add(1)
readIncoming: handlerSpawn(wg, ctx, backend)
for {
select { wg.Add(1)
case <-ctx.Done(): handlerSpawn(wg, ctx, backend)
break readIncoming
case m := <-incoming: wg.Add(1)
backend.handle(wg, ctx, m) handlerSpawn(wg, ctx, backend)
}
}
}(wg, ctx, cfg, backend)
return backend return backend
} }
func handlerSpawn(wg *sync.WaitGroup, ctx context.Context, backend *backendUDP) {
defer wg.Done()
incoming := backend.msgChan
readIncoming:
for {
select {
case <-ctx.Done():
break readIncoming
case m := <-incoming:
backend.handle(wg, ctx, m)
}
}
}
func (b *backendUDP) findRel(clientAddr string) (udpRel, bool) { func (b *backendUDP) findRel(clientAddr string) (udpRel, bool) {
b.relMtx.Lock() b.relMtx.Lock()
defer b.relMtx.Unlock() defer b.relMtx.Unlock()
rel, ok := b.relations[clientAddr] rel, ok := b.relations[clientAddr]
if ok && rel.expiry.Before(time.Now()) { if ok && rel.expiry.Before(time.Now()) {
// expired
if rel.ctxCancel != nil { if rel.ctxCancel != nil {
rel.ctxCancel() rel.ctxCancel()
} }
_ = rel.backend.Close() // cleans up connection; do it in a separate goroutine just in case the close operation might take some time
go func(c net.PacketConn) {
_ = rel.backend.Close()
}(rel.backend)
delete(b.relations, clientAddr) delete(b.relations, clientAddr)
return rel, false return rel, false
} }
@ -115,9 +133,11 @@ func (b *backendUDP) createRelSend(clientAddr string, buf []byte) (udpRel, error
} }
if _, err = udpConn.WriteTo(buf, b.cfg.BackendAddr()); err != nil { if _, err = udpConn.WriteTo(buf, b.cfg.BackendAddr()); err != nil {
//TODO: I think I need to fix this. This error handling is odd. if errors.Is(err, net.ErrClosed) {
_ = udpConn.Close() return rel, fmt.Errorf("create udp relation and send message: write udp: %w", err)
return rel, fmt.Errorf("create udp relation and send message: write udp: %w", err) }
log.Error().Caller().Err(err).Str(CONNECTION, b.cfg.Name()).Str(DIRECTION, CLIENT_TO_BACKEND).Msg("create udp relation & send")
} }
rel.backend = udpConn rel.backend = udpConn
@ -137,7 +157,7 @@ func (b *backendUDP) relSend(rel udpRel, buf []byte) error {
} }
if len(buf) != n { if len(buf) != n {
log.Error().Msg("relSend mismatch size") log.Error().Caller().Msg("relSend mismatch size")
} }
return nil return nil
@ -150,15 +170,13 @@ func (b *backendUDP) handle(wg *sync.WaitGroup, ctx context.Context, msg udpMess
err error err error
) )
defer b.bufPool.Put(msg.buf)
if rel, ok = b.findRel(msg.addr); !ok { if rel, ok = b.findRel(msg.addr); !ok {
if rel, err = b.createRelSend(msg.addr, msg.buf); err != nil { if rel, err = b.createRelSend(msg.addr, msg.buf); err != nil {
b.bufPool.Put(msg.buf) log.Error().Caller().Err(err).Str(DIRECTION, CLIENT_TO_BACKEND).Msg("establish relation with udp backend")
log.Error().Err(err).Str(DIRECTION, CLIENT_TO_BACKEND).Msg("establish relation with udp backend")
return return
} }
b.bufPool.Put(msg.buf)
rel.ctx, rel.ctxCancel = context.WithCancel(ctx) rel.ctx, rel.ctxCancel = context.WithCancel(ctx)
b.addUpdateRel(msg.addr, rel) b.addUpdateRel(msg.addr, rel)
@ -168,13 +186,10 @@ func (b *backendUDP) handle(wg *sync.WaitGroup, ctx context.Context, msg udpMess
} }
if err = b.relSend(rel, msg.buf); err != nil { if err = b.relSend(rel, msg.buf); err != nil {
b.bufPool.Put(msg.buf) log.Error().Caller().Err(err).Msg("handle: send for existing relation")
log.Error().Err(err).Msg("handle: send for existing relation")
return return
} }
b.bufPool.Put(msg.buf)
rel.expiry = time.Now().Add(b.cfg.Timeout()) rel.expiry = time.Now().Add(b.cfg.Timeout())
b.addUpdateRel(rel.clientAddr, rel) b.addUpdateRel(rel.clientAddr, rel)
} }
@ -184,11 +199,10 @@ func (b *backendUDP) Send(ctx context.Context, addr string, p []byte) error {
n int n int
) )
buf := b.bufPool.Get() buf := b.bufPool.Get() // the buf will be released in handle
n = copy(buf, p) n = copy(buf, p)
if len(p) != n { if len(p) != n {
b.bufPool.Put(buf)
return fmt.Errorf("send udp message to handler: failed to write complete message") return fmt.Errorf("send udp message to handler: failed to write complete message")
} }
@ -198,6 +212,7 @@ func (b *backendUDP) Send(ctx context.Context, addr string, p []byte) error {
buf: buf[:n], buf: buf[:n],
}: }:
case <-ctx.Done(): case <-ctx.Done():
return context.Canceled
} }
return nil return nil
@ -213,29 +228,29 @@ func (b *backendUDP) udpBackend2Client(wg *sync.WaitGroup, rel udpRel) {
clientAddr string clientAddr string
) )
buf := make([]byte, BUF_SIZE) buf := b.bufPool.Get()
defer b.bufPool.Put(buf)
go func(ctx context.Context, backend net.PacketConn) {
<-ctx.Done()
_ = backend.Close()
}(rel.ctx, rel.backend)
udpBackendLoop: udpBackendLoop:
for { for {
if n, _, err = rel.backend.ReadFrom(buf); err != nil { if n, _, err = rel.backend.ReadFrom(buf); err != nil {
select { if errors.Is(err, net.ErrClosed) {
case <-rel.ctx.Done():
break udpBackendLoop
default:
}
if errors.Is(err, net.ErrClosed) { //TODO: implement this error handling to all socket read/write operation
rel.ctxCancel() rel.ctxCancel()
break udpBackendLoop break udpBackendLoop
} }
log.Error().Err(err).Str(CONNECTION, rel.clientAddr).Str(DIRECTION, BACKEND_TO_CLIENT).Msg("udpBackend2Client: read from udp") log.Error().Caller().Err(err).Str(CONNECTION, rel.clientAddr).Str(DIRECTION, BACKEND_TO_CLIENT).Msg("udpBackend2Client: read from udp")
continue udpBackendLoop continue udpBackendLoop
} }
clientAddr = rel.clientAddr clientAddr = rel.clientAddr
if rel, ok = b.findRel(clientAddr); !ok { if rel, ok = b.findRel(clientAddr); !ok {
log.Error().Msg("relation not found") log.Error().Caller().Msg("relation not found")
return return
} }
@ -246,11 +261,19 @@ udpBackendLoop:
} }
if wn, err = b.client.WriteTo(buf[:n], rel); err != nil { if wn, err = b.client.WriteTo(buf[:n], rel); err != nil {
//TODO: error handling // in case of error, never close b.client, it's a shared Packet Conn.
// All UDP relation use a shared connection/socket back to the client
if errors.Is(err, net.ErrClosed) {
rel.ctxCancel()
break udpBackendLoop
}
log.Error().Caller().Err(err).Str(CONNECTION, rel.clientAddr).Str(DIRECTION, BACKEND_TO_CLIENT).Msg("udpBackend2Client: write to client")
continue udpBackendLoop
} }
if wn != n { if wn != n {
//TODO: error when mismatch length log.Warn().Caller().Str(CONNECTION, rel.clientAddr).Str(DIRECTION, BACKEND_TO_CLIENT).Msgf("failed to write complete message: %d vs %d", wn, n)
} }
rel.expiry = time.Now().Add(b.cfg.Timeout()) rel.expiry = time.Now().Add(b.cfg.Timeout())

View File

@ -141,7 +141,7 @@ func RunTCP(wg *sync.WaitGroup, ctx context.Context, bp *BufPool, cfg abstract.T
accepted = make(chan net.Conn) accepted = make(chan net.Conn)
go func(ctx context.Context, cfg abstract.TCPConnectionConfig, listener net.Listener, acceptChan chan<- net.Conn) { go func(ctx context.Context, cfg abstract.TCPConnectionConfig, listener net.Listener, acceptChan chan<- net.Conn) {
defer log.Info().Msgf("go routine for accepting connection quited: %s", cfg.Name()) defer log.Info().Caller().Msgf("go routine for accepting connection quited: %s", cfg.Name())
var ( var (
err error err error
c net.Conn c net.Conn
@ -151,10 +151,10 @@ func RunTCP(wg *sync.WaitGroup, ctx context.Context, bp *BufPool, cfg abstract.T
for { for {
if c, err = listener.Accept(); err != nil { if c, err = listener.Accept(); err != nil {
if errors.Is(err, net.ErrClosed) { if errors.Is(err, net.ErrClosed) {
log.Error().Err(err).Msg("stop accepting new connection") log.Error().Caller().Err(err).Msg("stop accepting new connection")
break acceptIncoming break acceptIncoming
} }
log.Error().Err(err).Msg("accept TCP connection") log.Error().Caller().Err(err).Msg("accept TCP connection")
continue continue
} }
@ -187,7 +187,7 @@ func makeTCPConnection(wg *sync.WaitGroup, ctx context.Context, bp *BufPool, cfg
) )
if backend, err = net.Dial("tcp", cfg.Backend()); err != nil { if backend, err = net.Dial("tcp", cfg.Backend()); err != nil {
log.Error().Err(err).Str(CONNECTION, cfg.Name()).Msg("connection to backend failed") log.Error().Caller().Err(err).Str(CONNECTION, cfg.Name()).Msg("connection to backend failed")
return return
} }
@ -225,7 +225,7 @@ func tcpBackend2Client(wg *sync.WaitGroup, ctx context.Context, bp *BufPool, cfg
backendRead: backendRead:
for readRetryCounter.IsContinue() { for readRetryCounter.IsContinue() {
if n, err = backend.Read(buf); err != nil { if n, err = backend.Read(buf); err != nil {
log.Error().Err(err).Str(CONNECTION, cfg.Name()).Str(DIRECTION, BACKEND_TO_CLIENT).Msg("tcp read error") log.Error().Caller().Err(err).Str(CONNECTION, cfg.Name()).Str(DIRECTION, BACKEND_TO_CLIENT).Msg("tcp read error")
if errors.Is(err, net.ErrClosed) && n == 0 { if errors.Is(err, net.ErrClosed) && n == 0 {
cancel() cancel()
break backendRead break backendRead
@ -243,7 +243,7 @@ backendRead:
backendWrite: backendWrite:
for writeRetryCounter.IsContinue() { for writeRetryCounter.IsContinue() {
if wn, err = client.Write(buf[:n]); err != nil { if wn, err = client.Write(buf[:n]); err != nil {
log.Error().Err(err).Str(CONNECTION, cfg.Name()).Str(DIRECTION, BACKEND_TO_CLIENT).Msg("tcp read error") log.Error().Caller().Err(err).Str(CONNECTION, cfg.Name()).Str(DIRECTION, BACKEND_TO_CLIENT).Msg("tcp read error")
if errors.Is(err, net.ErrClosed) { if errors.Is(err, net.ErrClosed) {
cancel() cancel()
break backendRead break backendRead
@ -255,13 +255,13 @@ backendRead:
break backendWrite break backendWrite
} }
if writeRetryCounter.MaxCounterExceeded() { if writeRetryCounter.MaxCounterExceeded() {
log.Error().Str(CONNECTION, cfg.Name()).Str(DIRECTION, BACKEND_TO_CLIENT).Msg("tcp write retry exceeded") log.Error().Caller().Str(CONNECTION, cfg.Name()).Str(DIRECTION, BACKEND_TO_CLIENT).Msg("tcp write retry exceeded")
cancel() cancel()
break backendRead break backendRead
} }
if wn != n { if wn != n {
log.Error().Err(fmt.Errorf("mismatch length between read and write")).Str(CONNECTION, cfg.Name()).Str(DIRECTION, BACKEND_TO_CLIENT).Msg("tcp read problem") log.Error().Caller().Err(fmt.Errorf("mismatch length between read and write")).Str(CONNECTION, cfg.Name()).Str(DIRECTION, BACKEND_TO_CLIENT).Msg("tcp read problem")
cancel() cancel()
break backendRead break backendRead
} }
@ -273,7 +273,7 @@ backendRead:
} }
} }
if readRetryCounter.MaxCounterExceeded() { if readRetryCounter.MaxCounterExceeded() {
log.Error().Str(CONNECTION, cfg.Name()).Str(DIRECTION, BACKEND_TO_CLIENT).Msg("tcp read retry exceeded") log.Error().Caller().Str(CONNECTION, cfg.Name()).Str(DIRECTION, BACKEND_TO_CLIENT).Msg("tcp read retry exceeded")
cancel() cancel()
} }
} }
@ -299,7 +299,7 @@ func tcpClient2Backend(wg *sync.WaitGroup, ctx context.Context, bp *BufPool, cfg
clientRead: clientRead:
for readRetryCounter.IsContinue() { for readRetryCounter.IsContinue() {
if n, err = client.Read(buf); err != nil { if n, err = client.Read(buf); err != nil {
log.Error().Err(err).Str(CONNECTION, cfg.Name()).Str(DIRECTION, CLIENT_TO_BACKEND).Msg("tcp read error") log.Error().Caller().Err(err).Str(CONNECTION, cfg.Name()).Str(DIRECTION, CLIENT_TO_BACKEND).Msg("tcp read error")
if errors.Is(err, net.ErrClosed) && n == 0 { if errors.Is(err, net.ErrClosed) && n == 0 {
cancel() cancel()
break clientRead break clientRead
@ -317,7 +317,7 @@ clientRead:
clientWrite: clientWrite:
for writeRetryCounter.IsContinue() { for writeRetryCounter.IsContinue() {
if wn, err = backend.Write(buf[:n]); err != nil { if wn, err = backend.Write(buf[:n]); err != nil {
log.Error().Err(err).Str(CONNECTION, cfg.Name()).Str(DIRECTION, CLIENT_TO_BACKEND).Msg("tcp write error") log.Error().Caller().Err(err).Str(CONNECTION, cfg.Name()).Str(DIRECTION, CLIENT_TO_BACKEND).Msg("tcp write error")
if errors.Is(err, net.ErrClosed) { if errors.Is(err, net.ErrClosed) {
cancel() cancel()
break clientRead break clientRead
@ -329,7 +329,7 @@ clientRead:
break clientWrite break clientWrite
} }
if writeRetryCounter.MaxCounterExceeded() { if writeRetryCounter.MaxCounterExceeded() {
log.Error().Str(CONNECTION, cfg.Name()).Str(DIRECTION, CLIENT_TO_BACKEND).Msg("tcp write retry exceeded") log.Error().Caller().Str(CONNECTION, cfg.Name()).Str(DIRECTION, CLIENT_TO_BACKEND).Msg("tcp write retry exceeded")
cancel() cancel()
break clientRead break clientRead
} }
@ -347,7 +347,7 @@ clientRead:
} }
} }
if readRetryCounter.MaxCounterExceeded() { if readRetryCounter.MaxCounterExceeded() {
log.Error().Str(CONNECTION, cfg.Name()).Str(DIRECTION, CLIENT_TO_BACKEND).Msg("tcp read retry exceeded") log.Error().Caller().Str(CONNECTION, cfg.Name()).Str(DIRECTION, CLIENT_TO_BACKEND).Msg("tcp read retry exceeded")
cancel() cancel()
} }
} }
@ -373,27 +373,27 @@ func RunUDP(wg *sync.WaitGroup, ctx context.Context, bp *BufPool, cfg abstract.U
buf = bp.Get() buf = bp.Get()
defer bp.Put(buf) defer bp.Put(buf)
// wait for context cancel/done then close socket; exit mechanism
go func(c net.PacketConn, ctx context.Context) {
<-ctx.Done()
_ = client.Close()
}(client, ctx)
udpReadLoop: udpReadLoop:
for { for {
if n, addr, err = client.ReadFrom(buf); err != nil { if n, addr, err = client.ReadFrom(buf); err != nil {
select { log.Error().Caller().Err(err).Str(CONNECTION, cfg.Name()).Str(DIRECTION, CLIENT_TO_BACKEND).Msg("udp read error")
case <-ctx.Done(): if errors.Is(err, net.ErrClosed) {
break udpReadLoop return
default:
} }
log.Error().Err(err).Str(CONNECTION, cfg.Name()).Str(DIRECTION, CLIENT_TO_BACKEND).Msg("udp read error")
continue udpReadLoop continue udpReadLoop
} }
if err = backend.Send(ctx, addr.String(), buf[:n]); err != nil { if err = backend.Send(ctx, addr.String(), buf[:n]); err != nil {
log.Error().Err(err).Str(CONNECTION, cfg.Name()).Str(DIRECTION, CLIENT_TO_BACKEND).Msg("send udp message") if errors.Is(err, context.Canceled) {
//TODO: continue udpReadLoop return
} }
log.Error().Caller().Err(err).Str(CONNECTION, cfg.Name()).Str(DIRECTION, CLIENT_TO_BACKEND).Msg("send udp message")
select {
case <-ctx.Done():
break udpReadLoop
default:
} }
} }
} }

View File

@ -23,6 +23,6 @@ import (
func CollectEnv() { func CollectEnv() {
if err := viper.BindEnv("config", "NETBOUNCE_CONFIG"); err != nil { if err := viper.BindEnv("config", "NETBOUNCE_CONFIG"); err != nil {
log.Warn().Err(err).Msg("failed to bind env for config") log.Warn().Caller().Err(err).Msg("failed to bind env for config")
} }
} }