WIP: udp backend
This commit is contained in:
parent
7f69553ead
commit
df828e2e9e
@ -16,6 +16,8 @@ type udpRel struct {
|
||||
backend net.PacketConn
|
||||
clientAddr string
|
||||
expiry time.Time
|
||||
ctx context.Context
|
||||
ctxCancel context.CancelFunc
|
||||
}
|
||||
|
||||
type backendUDP struct {
|
||||
@ -32,6 +34,14 @@ type udpMessage struct {
|
||||
buf *bytes.Buffer
|
||||
}
|
||||
|
||||
func (r udpRel) Network() string {
|
||||
return "udp"
|
||||
}
|
||||
|
||||
func (r udpRel) String() string {
|
||||
return r.clientAddr
|
||||
}
|
||||
|
||||
func initUDP(wg *sync.WaitGroup, ctx context.Context, cfg abstract.UDPConnectionConfig, client net.PacketConn) *backendUDP {
|
||||
backend := &backendUDP{
|
||||
relations: make(map[string]udpRel),
|
||||
@ -56,7 +66,7 @@ func initUDP(wg *sync.WaitGroup, ctx context.Context, cfg abstract.UDPConnection
|
||||
case <-ctx.Done():
|
||||
break readIncoming
|
||||
case m := <-incoming:
|
||||
backend.handle(m)
|
||||
backend.handle(wg, ctx, m)
|
||||
}
|
||||
}
|
||||
}(wg, ctx, cfg, backend)
|
||||
@ -70,6 +80,11 @@ func (b *backendUDP) findRel(clientAddr string) (udpRel, bool) {
|
||||
|
||||
rel, ok := b.relations[clientAddr]
|
||||
if ok && rel.expiry.Before(time.Now()) {
|
||||
if rel.ctxCancel != nil {
|
||||
rel.ctxCancel()
|
||||
}
|
||||
|
||||
_ = rel.backend.Close()
|
||||
delete(b.relations, clientAddr)
|
||||
return rel, false
|
||||
}
|
||||
@ -77,7 +92,7 @@ func (b *backendUDP) findRel(clientAddr string) (udpRel, bool) {
|
||||
return rel, ok
|
||||
}
|
||||
|
||||
func (b *backendUDP) addRel(clientAddr string, rel udpRel) {
|
||||
func (b *backendUDP) addUpdateRel(clientAddr string, rel udpRel) {
|
||||
b.relMtx.Lock()
|
||||
defer b.relMtx.Unlock()
|
||||
|
||||
@ -115,7 +130,7 @@ func (b *backendUDP) createRelSend(clientAddr string, buf []byte) (udpRel, error
|
||||
return rel, nil
|
||||
}
|
||||
|
||||
func (b *backendUDP) handle(msg udpMessage) {
|
||||
func (b *backendUDP) handle(wg *sync.WaitGroup, ctx context.Context, msg udpMessage) {
|
||||
var (
|
||||
rel udpRel
|
||||
ok bool
|
||||
@ -127,14 +142,16 @@ func (b *backendUDP) handle(msg udpMessage) {
|
||||
log.Error().Err(err).Str(DIRECTION, CLIENT_TO_BACKEND).Msg("establish relation with udp backend")
|
||||
}
|
||||
|
||||
b.addRel(msg.addr, rel)
|
||||
rel.ctx, rel.ctxCancel = context.WithCancel(ctx)
|
||||
b.addUpdateRel(msg.addr, rel)
|
||||
|
||||
wg.Add(1)
|
||||
go b.udpBackend2Client(wg, rel)
|
||||
return
|
||||
}
|
||||
|
||||
_ = rel
|
||||
// if rel.expiry.Before(time.Now()) {
|
||||
// //TODO: handle expiry
|
||||
// }
|
||||
//TODO: handle existing valid client
|
||||
|
||||
}
|
||||
|
||||
@ -166,3 +183,52 @@ func (b *backendUDP) Send(ctx context.Context, addr string, p []byte) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *backendUDP) udpBackend2Client(wg *sync.WaitGroup, rel udpRel) {
|
||||
defer wg.Done()
|
||||
|
||||
var (
|
||||
n, wn int
|
||||
err error
|
||||
ok bool
|
||||
clientAddr string
|
||||
)
|
||||
|
||||
buf := make([]byte, BUF_SIZE)
|
||||
|
||||
udpBackendLoop:
|
||||
for {
|
||||
if n, _, err = rel.backend.ReadFrom(buf); err != nil {
|
||||
//TODO: error handling
|
||||
}
|
||||
|
||||
clientAddr = rel.clientAddr
|
||||
if rel, ok = b.findRel(clientAddr); !ok {
|
||||
log.Error().Msg("relation not found")
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-rel.ctx.Done():
|
||||
break udpBackendLoop
|
||||
default:
|
||||
}
|
||||
|
||||
if wn, err = b.client.WriteTo(buf[:n], rel); err != nil {
|
||||
//TODO: error handling
|
||||
}
|
||||
|
||||
if wn != n {
|
||||
//TODO: error when mismatch length
|
||||
}
|
||||
|
||||
rel.expiry = time.Now().Add(b.cfg.Timeout())
|
||||
b.addUpdateRel(clientAddr, rel)
|
||||
|
||||
select {
|
||||
case <-rel.ctx.Done():
|
||||
break udpBackendLoop
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user