From 4bb60ce4d80aaa3937858b9aedfd6c4d3129e8fd Mon Sep 17 00:00:00 2001 From: Miek Gieben Date: Thu, 9 Nov 2017 21:01:09 +0000 Subject: [PATCH] Revert "server: drop graceful handling (#546)" (#560) This reverts commit 8223ae840e47f0c9a1ea1d5c22bdd03b30b6b3cf. --- client_test.go | 2 +- server.go | 104 ++++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 87 insertions(+), 19 deletions(-) diff --git a/client_test.go b/client_test.go index 61e2f7ea..3800df81 100644 --- a/client_test.go +++ b/client_test.go @@ -525,7 +525,7 @@ func TestTimeout(t *testing.T) { length := time.Since(start) if length > allowable { - t.Errorf("exchange took longer (%v) than specified Timeout (%v)", length, allowable) + t.Errorf("exchange took longer (%v) than specified Timeout (%v)", length, timeout) } } diff --git a/server.go b/server.go index c0faf00f..0ca6e008 100644 --- a/server.go +++ b/server.go @@ -296,10 +296,22 @@ type Server struct { DecorateReader DecorateReader // DecorateWriter is optional, allows customization of the process that writes raw DNS messages. DecorateWriter DecorateWriter + + // Graceful shutdown handling + + inFlight sync.WaitGroup + + lock sync.RWMutex + started bool } // ListenAndServe starts a nameserver on the configured address in *Server. func (srv *Server) ListenAndServe() error { + srv.lock.Lock() + defer srv.lock.Unlock() + if srv.started { + return &Error{err: "server already started"} + } addr := srv.Addr if addr == "" { addr = ":domain" @@ -318,7 +330,10 @@ func (srv *Server) ListenAndServe() error { return err } srv.Listener = l + srv.started = true + srv.lock.Unlock() err = srv.serveTCP(l) + srv.lock.Lock() // to satisfy the defer at the top return err case "tcp-tls", "tcp4-tls", "tcp6-tls": network := "tcp" @@ -333,7 +348,10 @@ func (srv *Server) ListenAndServe() error { return err } srv.Listener = l + srv.started = true + srv.lock.Unlock() err = srv.serveTCP(l) + srv.lock.Lock() // to satisfy the defer at the top return err case "udp", "udp4", "udp6": a, err := net.ResolveUDPAddr(srv.Net, addr) @@ -348,7 +366,10 @@ func (srv *Server) ListenAndServe() error { return e } srv.PacketConn = l + srv.started = true + srv.lock.Unlock() err = srv.serveUDP(l) + srv.lock.Lock() // to satisfy the defer at the top return err } return &Error{err: "bad network"} @@ -357,38 +378,72 @@ func (srv *Server) ListenAndServe() error { // ActivateAndServe starts a nameserver with the PacketConn or Listener // configured in *Server. Its main use is to start a server from systemd. func (srv *Server) ActivateAndServe() error { + srv.lock.Lock() + defer srv.lock.Unlock() + if srv.started { + return &Error{err: "server already started"} + } pConn := srv.PacketConn l := srv.Listener - - if srv.UDPSize == 0 { - srv.UDPSize = MinMsgSize - } - // Check PacketConn interface's type is valid and value - // is not nil - if t, ok := pConn.(*net.UDPConn); ok && t != nil { - if e := setUDPSocketOptions(t); e != nil { + if pConn != nil { + if srv.UDPSize == 0 { + srv.UDPSize = MinMsgSize + } + // Check PacketConn interface's type is valid and value + // is not nil + if t, ok := pConn.(*net.UDPConn); ok && t != nil { + if e := setUDPSocketOptions(t); e != nil { + return e + } + srv.started = true + srv.lock.Unlock() + e := srv.serveUDP(t) + srv.lock.Lock() // to satisfy the defer at the top return e } - e := srv.serveUDP(t) - return e } - if l != nil { + srv.started = true + srv.lock.Unlock() e := srv.serveTCP(l) + srv.lock.Lock() // to satisfy the defer at the top return e } return &Error{err: "bad listeners"} } -// Shutdown shuts down a server. After a call to Shutdown, ListenAndServe and ActivateAndServe will return. +// Shutdown gracefully shuts down a server. After a call to Shutdown, ListenAndServe and +// ActivateAndServe will return. All in progress queries are completed before the server +// is taken down. If the Shutdown is taking longer than the reading timeout an error +// is returned. func (srv *Server) Shutdown() error { + srv.lock.Lock() + if !srv.started { + srv.lock.Unlock() + return &Error{err: "server not started"} + } + srv.started = false + srv.lock.Unlock() + if srv.PacketConn != nil { srv.PacketConn.Close() } if srv.Listener != nil { srv.Listener.Close() } - return nil + + fin := make(chan bool) + go func() { + srv.inFlight.Wait() + fin <- true + }() + + select { + case <-time.After(srv.getReadTimeout()): + return &Error{err: "server shutdown is pending"} + case <-fin: + return nil + } } // getReadTimeout is a helper func to use system timeout if server did not intend to change it. @@ -429,9 +484,16 @@ func (srv *Server) serveTCP(l net.Listener) error { return err } m, err := reader.ReadTCP(rw, rtimeout) + srv.lock.RLock() + if !srv.started { + srv.lock.RUnlock() + return nil + } + srv.lock.RUnlock() if err != nil { continue } + srv.inFlight.Add(1) go srv.serve(rw.RemoteAddr(), handler, m, nil, nil, rw) } } @@ -458,18 +520,24 @@ func (srv *Server) serveUDP(l *net.UDPConn) error { // deadline is not used here for { m, s, err := reader.ReadUDP(l, rtimeout) - if err != nil { - if neterr, ok := err.(net.Error); ok && neterr.Temporary() { - continue - } - return err + srv.lock.RLock() + if !srv.started { + srv.lock.RUnlock() + return nil } + srv.lock.RUnlock() + if err != nil { + continue + } + srv.inFlight.Add(1) go srv.serve(s.RemoteAddr(), handler, m, l, s, nil) } } // Serve a new connection. func (srv *Server) serve(a net.Addr, h Handler, m []byte, u *net.UDPConn, s *SessionUDP, t net.Conn) { + defer srv.inFlight.Done() + w := &response{tsigSecret: srv.TsigSecret, udp: u, tcp: t, remoteAddr: a, udpSession: s} if srv.DecorateWriter != nil { w.writer = srv.DecorateWriter(w)