From 284bad20d83ddb85337dd15d91307de3004980a0 Mon Sep 17 00:00:00 2001 From: Miek Gieben Date: Sat, 9 Mar 2019 13:34:22 +0000 Subject: [PATCH] Manually revert go workers (98a1ef45) (#928) Manually revert the worker model. Signed-off-by: Miek Gieben --- server.go | 110 +++++++----------------------------------------------- 1 file changed, 14 insertions(+), 96 deletions(-) diff --git a/server.go b/server.go index d4364d6e..0917e0cf 100644 --- a/server.go +++ b/server.go @@ -12,26 +12,12 @@ import ( "net" "strings" "sync" - "sync/atomic" "time" ) // Default maximum number of TCP queries before we close the socket. const maxTCPQueries = 128 -// The maximum number of idle workers. -// -// This controls the maximum number of workers that are allowed to stay -// idle waiting for incoming requests before being torn down. -// -// If this limit is reached, the server will just keep spawning new -// workers (goroutines) for each incoming request. In this case, each -// worker will only be used for a single request. -const maxIdleWorkersCount = 10000 - -// The maximum length of time a worker may idle for before being destroyed. -const idleWorkerTimeout = 10 * time.Second - // aLongTimeAgo is a non-zero time, far in the past, used for // immediate cancelation of network operations. var aLongTimeAgo = time.Unix(1, 0) @@ -92,7 +78,6 @@ type response struct { tcp net.Conn // i/o connection if TCP was used udpSession *SessionUDP // oob data to get egress interface right writer Writer // writer to output the raw DNS bits - wg *sync.WaitGroup // for gracefull shutdown } // HandleFailed returns a HandlerFunc that returns SERVFAIL for every request it gets. @@ -218,11 +203,6 @@ type Server struct { // By default DefaultMsgAcceptFunc will be used. MsgAcceptFunc MsgAcceptFunc - // UDP packet or TCP connection queue - queue chan *response - // Workers count - workersCount int32 - // Shutdown handling lock sync.RWMutex started bool @@ -240,51 +220,6 @@ func (srv *Server) isStarted() bool { return started } -func (srv *Server) worker(w *response) { - srv.serve(w) - - for { - count := atomic.LoadInt32(&srv.workersCount) - if count > maxIdleWorkersCount { - return - } - if atomic.CompareAndSwapInt32(&srv.workersCount, count, count+1) { - break - } - } - - defer atomic.AddInt32(&srv.workersCount, -1) - - inUse := false - timeout := time.NewTimer(idleWorkerTimeout) - defer timeout.Stop() -LOOP: - for { - select { - case w, ok := <-srv.queue: - if !ok { - break LOOP - } - inUse = true - srv.serve(w) - case <-timeout.C: - if !inUse { - break LOOP - } - inUse = false - timeout.Reset(idleWorkerTimeout) - } - } -} - -func (srv *Server) spawnWorker(w *response) { - select { - case srv.queue <- w: - default: - go srv.worker(w) - } -} - func makeUDPBuffer(size int) func() interface{} { return func() interface{} { return make([]byte, size) @@ -292,8 +227,6 @@ func makeUDPBuffer(size int) func() interface{} { } func (srv *Server) init() { - srv.queue = make(chan *response) - srv.shutdown = make(chan struct{}) srv.conns = make(map[net.Conn]struct{}) @@ -328,7 +261,6 @@ func (srv *Server) ListenAndServe() error { } srv.init() - defer close(srv.queue) switch srv.Net { case "tcp", "tcp4", "tcp6": @@ -383,7 +315,6 @@ func (srv *Server) ActivateAndServe() error { } srv.init() - defer close(srv.queue) pConn := srv.PacketConn l := srv.Listener @@ -499,11 +430,7 @@ func (srv *Server) serveTCP(l net.Listener) error { srv.conns[rw] = struct{}{} srv.lock.Unlock() wg.Add(1) - srv.spawnWorker(&response{ - tsigSecret: srv.TsigSecret, - tcp: rw, - wg: &wg, - }) + go srv.serve(&response{tsigSecret: srv.TsigSecret, tcp: rw}, &wg) } return nil @@ -548,19 +475,13 @@ func (srv *Server) serveUDP(l *net.UDPConn) error { continue } wg.Add(1) - srv.spawnWorker(&response{ - msg: m, - tsigSecret: srv.TsigSecret, - udp: l, - udpSession: s, - wg: &wg, - }) + go srv.serve(&response{msg: m, tsigSecret: srv.TsigSecret, udp: l, udpSession: s}, &wg) } return nil } -func (srv *Server) serve(w *response) { +func (srv *Server) serve(w *response, wg *sync.WaitGroup) { if srv.DecorateWriter != nil { w.writer = srv.DecorateWriter(w) } else { @@ -570,23 +491,10 @@ func (srv *Server) serve(w *response) { if w.udp != nil { // serve UDP srv.serveDNS(w) - - w.wg.Done() + wg.Done() return } - defer func() { - if !w.hijacked { - w.Close() - } - - srv.lock.Lock() - delete(srv.conns, w.tcp) - srv.lock.Unlock() - - w.wg.Done() - }() - reader := Reader(defaultReader{srv}) if srv.DecorateReader != nil { reader = srv.DecorateReader(reader) @@ -622,6 +530,16 @@ func (srv *Server) serve(w *response) { // idle timeout. timeout = idleTimeout } + + if !w.hijacked { + w.Close() + } + + srv.lock.Lock() + delete(srv.conns, w.tcp) + srv.lock.Unlock() + + wg.Done() } func (srv *Server) disposeBuffer(w *response) {