Restored code when shutdown needs an exchange...

Clients sents NULL-packet to server which helps to avoid
timeout. Timeout is still possible to encounter.
Shutdown will likely report error for those cases.
This commit is contained in:
Alex Sergeyev 2014-08-29 14:27:53 -04:00
parent c618657d9b
commit fce659ba46
1 changed files with 22 additions and 21 deletions

View File

@ -11,7 +11,6 @@ import (
"io" "io"
"net" "net"
"sync" "sync"
"sync/atomic"
"time" "time"
) )
@ -306,12 +305,14 @@ func (srv *Server) ActivateAndServe() error {
// is taken down. If the Shutdown was not succesful an error is taking longer than reading // is taken down. If the Shutdown was not succesful an error is taking longer than reading
// timeout. // timeout.
func (srv *Server) Shutdown() error { func (srv *Server) Shutdown() error {
net := srv.Net net, addr := srv.Net, srv.Addr
switch { switch {
case srv.Listener != nil: case srv.Listener != nil:
net = srv.Listener.Addr().Network() a := srv.Listener.Addr()
net, addr = a.Network(), a.String()
case srv.PacketConn != nil: case srv.PacketConn != nil:
net = srv.PacketConn.LocalAddr().Network() a := srv.PacketConn.LocalAddr()
net, addr = a.Network(), a.String()
} }
fin := make(chan bool) fin := make(chan bool)
@ -331,6 +332,9 @@ func (srv *Server) Shutdown() error {
}() }()
} }
c := &Client{Net: net}
go c.Exchange(new(Msg), addr) // extra query to help ReadXXX loop to pass
select { select {
case <-time.After(srv.getReadTimeout()): case <-time.After(srv.getReadTimeout()):
return &Error{err: "server shutdown is pending"} return &Error{err: "server shutdown is pending"}
@ -358,25 +362,24 @@ func (srv *Server) serveTCP(l *net.TCPListener) error {
} }
rtimeout := srv.getReadTimeout() rtimeout := srv.getReadTimeout()
// deadline is not used here // deadline is not used here
done := int32(0) for {
go func() {
<-srv.stopTCP // there is no way out of serving but to receive stop
l.SetDeadline(time.Now())
atomic.StoreInt32(&done, 1)
}()
for done == 0 {
rw, e := l.AcceptTCP() rw, e := l.AcceptTCP()
if e != nil { if e != nil {
continue continue
} }
m, e := srv.readTCP(rw, rtimeout) m, e := srv.readTCP(rw, rtimeout)
select {
case <-srv.stopTCP:
return nil
default:
}
if e != nil { if e != nil {
continue continue
} }
srv.wgTCP.Add(1) srv.wgTCP.Add(1)
go srv.serve(rw.RemoteAddr(), handler, m, nil, nil, rw) go srv.serve(rw.RemoteAddr(), handler, m, nil, nil, rw)
} }
return nil panic("dns: not reached")
} }
// serveUDP starts a UDP listener for the server. // serveUDP starts a UDP listener for the server.
@ -390,22 +393,20 @@ func (srv *Server) serveUDP(l *net.UDPConn) error {
} }
rtimeout := srv.getReadTimeout() rtimeout := srv.getReadTimeout()
// deadline is not used here // deadline is not used here
done := int32(0) for {
go func() {
<-srv.stopUDP // there is no way out of serving but to receive stop
l.SetDeadline(time.Now())
atomic.StoreInt32(&done, 1)
}()
for done == 0 {
m, s, e := srv.readUDP(l, rtimeout) m, s, e := srv.readUDP(l, rtimeout)
select {
case <-srv.stopUDP:
return nil
default:
}
if e != nil { if e != nil {
continue continue
} }
srv.wgUDP.Add(1) srv.wgUDP.Add(1)
go srv.serve(s.RemoteAddr(), handler, m, l, s, nil) go srv.serve(s.RemoteAddr(), handler, m, l, s, nil)
} }
srv.wgUDP.Wait() panic("dns: not reached")
return nil
} }
// Serve a new connection. // Serve a new connection.