WIP: replace the backend address related fields/API from udp config, move them to backend udp
This commit is contained in:
@@ -21,12 +21,13 @@ type udpRel struct {
|
||||
}
|
||||
|
||||
type backendUDP struct {
|
||||
relations map[string]udpRel
|
||||
relMtx *sync.Mutex
|
||||
client net.PacketConn
|
||||
cfg abstract.UDPConnectionConfig
|
||||
bufPool *BufPool
|
||||
msgChan chan udpMessage
|
||||
relations map[string]udpRel
|
||||
relMtx *sync.Mutex
|
||||
client net.PacketConn
|
||||
cfg abstract.UDPConnectionConfig
|
||||
bufPool *BufPool
|
||||
msgChan chan udpMessage
|
||||
backendAddr *net.UDPAddr
|
||||
}
|
||||
|
||||
type udpMessage struct {
|
||||
@@ -42,17 +43,28 @@ func (r udpRel) String() string {
|
||||
return r.clientAddr
|
||||
}
|
||||
|
||||
func initUDP(wg *sync.WaitGroup, ctx context.Context, bp *BufPool, cfg abstract.UDPConnectionConfig, client net.PacketConn) *backendUDP {
|
||||
func initUDP(wg *sync.WaitGroup, ctx context.Context, bp *BufPool, cfg abstract.UDPConnectionConfig, client net.PacketConn) (*backendUDP, error) {
|
||||
var (
|
||||
addr *net.UDPAddr
|
||||
err error
|
||||
)
|
||||
|
||||
if addr, err = net.ResolveUDPAddr("udp", cfg.Backend()); err != nil {
|
||||
return nil, fmt.Errorf("failed to resolve UDP address: %v", err)
|
||||
}
|
||||
|
||||
backend := &backendUDP{
|
||||
relations: make(map[string]udpRel),
|
||||
relMtx: new(sync.Mutex),
|
||||
cfg: cfg,
|
||||
client: client,
|
||||
msgChan: make(chan udpMessage),
|
||||
bufPool: bp,
|
||||
relations: make(map[string]udpRel),
|
||||
relMtx: new(sync.Mutex),
|
||||
cfg: cfg,
|
||||
client: client,
|
||||
msgChan: make(chan udpMessage),
|
||||
bufPool: bp,
|
||||
backendAddr: addr,
|
||||
}
|
||||
defer wg.Done()
|
||||
|
||||
//TODO: make the number of handler spawn configurable
|
||||
wg.Add(1)
|
||||
handlerSpawn(wg, ctx, backend)
|
||||
|
||||
@@ -65,7 +77,7 @@ func initUDP(wg *sync.WaitGroup, ctx context.Context, bp *BufPool, cfg abstract.
|
||||
wg.Add(1)
|
||||
handlerSpawn(wg, ctx, backend)
|
||||
|
||||
return backend
|
||||
return backend, nil
|
||||
}
|
||||
|
||||
func handlerSpawn(wg *sync.WaitGroup, ctx context.Context, backend *backendUDP) {
|
||||
@@ -115,7 +127,7 @@ func (b *backendUDP) addUpdateRel(clientAddr string, rel udpRel) {
|
||||
|
||||
func (b *backendUDP) createRelSend(clientAddr string, buf []byte) (udpRel, error) {
|
||||
var (
|
||||
udpAddr *net.UDPAddr
|
||||
laddr *net.UDPAddr
|
||||
udpConn *net.UDPConn
|
||||
err error
|
||||
)
|
||||
@@ -124,15 +136,15 @@ func (b *backendUDP) createRelSend(clientAddr string, buf []byte) (udpRel, error
|
||||
clientAddr: clientAddr,
|
||||
}
|
||||
|
||||
if udpAddr, err = net.ResolveUDPAddr("udp", b.cfg.Backend()); err != nil {
|
||||
return rel, fmt.Errorf("create udp relation and send message: resolve udp addr: %w", err)
|
||||
if laddr, err = net.ResolveUDPAddr("udp", ""); err != nil {
|
||||
return rel, fmt.Errorf("create udp relation and send message: resolve local/self address for UDP: %w", err)
|
||||
}
|
||||
|
||||
if udpConn, err = net.DialUDP("udp", nil, udpAddr); err != nil {
|
||||
return rel, fmt.Errorf("create udp relation and send message: dial udp: %w", err)
|
||||
if udpConn, err = net.ListenUDP("udp", laddr); err != nil {
|
||||
return rel, fmt.Errorf("create udp relation and send message: bind local/self address for UDP: %w", err)
|
||||
}
|
||||
|
||||
if _, err = udpConn.WriteTo(buf, b.cfg.BackendAddr()); err != nil {
|
||||
if _, err = udpConn.WriteTo(buf, b.backendAddr); err != nil {
|
||||
if errors.Is(err, net.ErrClosed) {
|
||||
return rel, fmt.Errorf("create udp relation and send message: write udp: %w", err)
|
||||
}
|
||||
@@ -152,7 +164,7 @@ func (b *backendUDP) relSend(rel udpRel, buf []byte) error {
|
||||
err error
|
||||
)
|
||||
|
||||
if n, err = rel.backend.WriteTo(buf, b.cfg.BackendAddr()); err != nil && n == 0 {
|
||||
if n, err = rel.backend.WriteTo(buf, b.backendAddr); err != nil && n == 0 {
|
||||
return fmt.Errorf("relSend: %w", err)
|
||||
}
|
||||
|
||||
@@ -199,7 +211,7 @@ func (b *backendUDP) Send(ctx context.Context, addr string, p []byte) error {
|
||||
n int
|
||||
)
|
||||
|
||||
buf := b.bufPool.Get() // the buf will be released in handle
|
||||
buf := b.bufPool.Get() // the buf will be released in the handle
|
||||
n = copy(buf, p)
|
||||
|
||||
if len(p) != n {
|
||||
@@ -261,8 +273,8 @@ udpBackendLoop:
|
||||
}
|
||||
|
||||
if wn, err = b.client.WriteTo(buf[:n], rel); err != nil {
|
||||
// in case of error, never close b.client, it's a shared Packet Conn.
|
||||
// All UDP relation use a shared connection/socket back to the client
|
||||
// In case of error, never close b.client, it's a shared Packet Conn.
|
||||
// All UDP relations use a shared connection/socket back to the client
|
||||
if errors.Is(err, net.ErrClosed) {
|
||||
rel.ctxCancel()
|
||||
break udpBackendLoop
|
||||
|
||||
Reference in New Issue
Block a user