Manually revert go workers (98a1ef45) (#928)

Manually revert the worker model.

Signed-off-by: Miek Gieben <miek@miek.nl>
This commit is contained in:
Miek Gieben 2019-03-09 13:34:22 +00:00 committed by GitHub
parent 487e4636d5
commit 284bad20d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 14 additions and 96 deletions

110
server.go
View File

@ -12,26 +12,12 @@ import (
"net" "net"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
) )
// Default maximum number of TCP queries before we close the socket. // Default maximum number of TCP queries before we close the socket.
const maxTCPQueries = 128 const maxTCPQueries = 128
// The maximum number of idle workers.
//
// This controls the maximum number of workers that are allowed to stay
// idle waiting for incoming requests before being torn down.
//
// If this limit is reached, the server will just keep spawning new
// workers (goroutines) for each incoming request. In this case, each
// worker will only be used for a single request.
const maxIdleWorkersCount = 10000
// The maximum length of time a worker may idle for before being destroyed.
const idleWorkerTimeout = 10 * time.Second
// aLongTimeAgo is a non-zero time, far in the past, used for // aLongTimeAgo is a non-zero time, far in the past, used for
// immediate cancelation of network operations. // immediate cancelation of network operations.
var aLongTimeAgo = time.Unix(1, 0) var aLongTimeAgo = time.Unix(1, 0)
@ -92,7 +78,6 @@ type response struct {
tcp net.Conn // i/o connection if TCP was used tcp net.Conn // i/o connection if TCP was used
udpSession *SessionUDP // oob data to get egress interface right udpSession *SessionUDP // oob data to get egress interface right
writer Writer // writer to output the raw DNS bits writer Writer // writer to output the raw DNS bits
wg *sync.WaitGroup // for gracefull shutdown
} }
// HandleFailed returns a HandlerFunc that returns SERVFAIL for every request it gets. // HandleFailed returns a HandlerFunc that returns SERVFAIL for every request it gets.
@ -218,11 +203,6 @@ type Server struct {
// By default DefaultMsgAcceptFunc will be used. // By default DefaultMsgAcceptFunc will be used.
MsgAcceptFunc MsgAcceptFunc MsgAcceptFunc MsgAcceptFunc
// UDP packet or TCP connection queue
queue chan *response
// Workers count
workersCount int32
// Shutdown handling // Shutdown handling
lock sync.RWMutex lock sync.RWMutex
started bool started bool
@ -240,51 +220,6 @@ func (srv *Server) isStarted() bool {
return started return started
} }
func (srv *Server) worker(w *response) {
srv.serve(w)
for {
count := atomic.LoadInt32(&srv.workersCount)
if count > maxIdleWorkersCount {
return
}
if atomic.CompareAndSwapInt32(&srv.workersCount, count, count+1) {
break
}
}
defer atomic.AddInt32(&srv.workersCount, -1)
inUse := false
timeout := time.NewTimer(idleWorkerTimeout)
defer timeout.Stop()
LOOP:
for {
select {
case w, ok := <-srv.queue:
if !ok {
break LOOP
}
inUse = true
srv.serve(w)
case <-timeout.C:
if !inUse {
break LOOP
}
inUse = false
timeout.Reset(idleWorkerTimeout)
}
}
}
func (srv *Server) spawnWorker(w *response) {
select {
case srv.queue <- w:
default:
go srv.worker(w)
}
}
func makeUDPBuffer(size int) func() interface{} { func makeUDPBuffer(size int) func() interface{} {
return func() interface{} { return func() interface{} {
return make([]byte, size) return make([]byte, size)
@ -292,8 +227,6 @@ func makeUDPBuffer(size int) func() interface{} {
} }
func (srv *Server) init() { func (srv *Server) init() {
srv.queue = make(chan *response)
srv.shutdown = make(chan struct{}) srv.shutdown = make(chan struct{})
srv.conns = make(map[net.Conn]struct{}) srv.conns = make(map[net.Conn]struct{})
@ -328,7 +261,6 @@ func (srv *Server) ListenAndServe() error {
} }
srv.init() srv.init()
defer close(srv.queue)
switch srv.Net { switch srv.Net {
case "tcp", "tcp4", "tcp6": case "tcp", "tcp4", "tcp6":
@ -383,7 +315,6 @@ func (srv *Server) ActivateAndServe() error {
} }
srv.init() srv.init()
defer close(srv.queue)
pConn := srv.PacketConn pConn := srv.PacketConn
l := srv.Listener l := srv.Listener
@ -499,11 +430,7 @@ func (srv *Server) serveTCP(l net.Listener) error {
srv.conns[rw] = struct{}{} srv.conns[rw] = struct{}{}
srv.lock.Unlock() srv.lock.Unlock()
wg.Add(1) wg.Add(1)
srv.spawnWorker(&response{ go srv.serve(&response{tsigSecret: srv.TsigSecret, tcp: rw}, &wg)
tsigSecret: srv.TsigSecret,
tcp: rw,
wg: &wg,
})
} }
return nil return nil
@ -548,19 +475,13 @@ func (srv *Server) serveUDP(l *net.UDPConn) error {
continue continue
} }
wg.Add(1) wg.Add(1)
srv.spawnWorker(&response{ go srv.serve(&response{msg: m, tsigSecret: srv.TsigSecret, udp: l, udpSession: s}, &wg)
msg: m,
tsigSecret: srv.TsigSecret,
udp: l,
udpSession: s,
wg: &wg,
})
} }
return nil return nil
} }
func (srv *Server) serve(w *response) { func (srv *Server) serve(w *response, wg *sync.WaitGroup) {
if srv.DecorateWriter != nil { if srv.DecorateWriter != nil {
w.writer = srv.DecorateWriter(w) w.writer = srv.DecorateWriter(w)
} else { } else {
@ -570,23 +491,10 @@ func (srv *Server) serve(w *response) {
if w.udp != nil { if w.udp != nil {
// serve UDP // serve UDP
srv.serveDNS(w) srv.serveDNS(w)
wg.Done()
w.wg.Done()
return return
} }
defer func() {
if !w.hijacked {
w.Close()
}
srv.lock.Lock()
delete(srv.conns, w.tcp)
srv.lock.Unlock()
w.wg.Done()
}()
reader := Reader(defaultReader{srv}) reader := Reader(defaultReader{srv})
if srv.DecorateReader != nil { if srv.DecorateReader != nil {
reader = srv.DecorateReader(reader) reader = srv.DecorateReader(reader)
@ -622,6 +530,16 @@ func (srv *Server) serve(w *response) {
// idle timeout. // idle timeout.
timeout = idleTimeout timeout = idleTimeout
} }
if !w.hijacked {
w.Close()
}
srv.lock.Lock()
delete(srv.conns, w.tcp)
srv.lock.Unlock()
wg.Done()
} }
func (srv *Server) disposeBuffer(w *response) { func (srv *Server) disposeBuffer(w *response) {