Perform query inflight deduplication
Taking a queue from Go tip, I copied over the new code and tweaked it a bit for use in Go DNS.
This commit is contained in:
parent
5a15fb1e8f
commit
d117fda34b
43
client.go
43
client.go
|
@ -36,6 +36,41 @@ type Client struct {
|
||||||
WriteTimeout time.Duration // the net.Conn.SetWriteTimeout value for new connections (ns), defaults to 2 * 1e9
|
WriteTimeout time.Duration // the net.Conn.SetWriteTimeout value for new connections (ns), defaults to 2 * 1e9
|
||||||
TsigSecret map[string]string // secret(s) for Tsig map[<zonename>]<base64 secret>, zonename must be fully qualified
|
TsigSecret map[string]string // secret(s) for Tsig map[<zonename>]<base64 secret>, zonename must be fully qualified
|
||||||
Inflight bool // if true suppress multiple outstanding queries for the same Qname, Qtype and Qclass
|
Inflight bool // if true suppress multiple outstanding queries for the same Qname, Qtype and Qclass
|
||||||
|
group singleflight
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) exchangeMerge(m *Msg, a string, s net.Conn) (r *Msg, rtt time.Duration, err error) {
|
||||||
|
if !c.Inflight {
|
||||||
|
if s == nil {
|
||||||
|
return c.exchange(m, a)
|
||||||
|
}
|
||||||
|
return c.exchangeConn(m, s)
|
||||||
|
}
|
||||||
|
// This adds a bunch of garbage, TODO(miek).
|
||||||
|
t := "nop"
|
||||||
|
if t1, ok := TypeToString[r.Question[0].Qtype]; ok {
|
||||||
|
t = t1
|
||||||
|
}
|
||||||
|
cl := "nop"
|
||||||
|
if cl1, ok := ClassToString[r.Question[0].Qclass]; ok {
|
||||||
|
cl = cl1
|
||||||
|
}
|
||||||
|
key := r.Question[0].Name + t + cl
|
||||||
|
r, rtt, err, shared := c.group.Do(key, func() (*Msg, time.Duration, error) {
|
||||||
|
if s == nil {
|
||||||
|
return c.exchange(m, a)
|
||||||
|
}
|
||||||
|
return c.exchangeConn(m, s)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return r, rtt, err
|
||||||
|
}
|
||||||
|
if shared {
|
||||||
|
r1 := r.copy()
|
||||||
|
r1.Id = r.Id // Copy Id!
|
||||||
|
r = r1
|
||||||
|
}
|
||||||
|
return r, rtt, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Exchange performs an synchronous query. It sends the message m to the address
|
// Exchange performs an synchronous query. It sends the message m to the address
|
||||||
|
@ -45,6 +80,10 @@ type Client struct {
|
||||||
// in, rtt, err := c.Exchange(message, "127.0.0.1:53")
|
// in, rtt, err := c.Exchange(message, "127.0.0.1:53")
|
||||||
//
|
//
|
||||||
func (c *Client) Exchange(m *Msg, a string) (r *Msg, rtt time.Duration, err error) {
|
func (c *Client) Exchange(m *Msg, a string) (r *Msg, rtt time.Duration, err error) {
|
||||||
|
return c.exchangeMerge(m, a, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) exchange(m *Msg, a string) (r *Msg, rtt time.Duration, err error) {
|
||||||
w := &reply{client: c, addr: a}
|
w := &reply{client: c, addr: a}
|
||||||
if err = w.dial(); err != nil {
|
if err = w.dial(); err != nil {
|
||||||
return nil, 0, err
|
return nil, 0, err
|
||||||
|
@ -60,6 +99,10 @@ func (c *Client) Exchange(m *Msg, a string) (r *Msg, rtt time.Duration, err erro
|
||||||
// ExchangeConn performs an synchronous query. It sends the message m trough the
|
// ExchangeConn performs an synchronous query. It sends the message m trough the
|
||||||
// connection s and waits for a reply.
|
// connection s and waits for a reply.
|
||||||
func (c *Client) ExchangeConn(m *Msg, s net.Conn) (r *Msg, rtt time.Duration, err error) {
|
func (c *Client) ExchangeConn(m *Msg, s net.Conn) (r *Msg, rtt time.Duration, err error) {
|
||||||
|
return c.exchangeMerge(m, "", s)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) exchangeConn(m *Msg, s net.Conn) (r *Msg, rtt time.Duration, err error) {
|
||||||
w := &reply{client: c, conn: s}
|
w := &reply{client: c, conn: s}
|
||||||
if err = w.send(m); err != nil {
|
if err = w.send(m); err != nil {
|
||||||
return nil, 0, err
|
return nil, 0, err
|
||||||
|
|
Loading…
Reference in New Issue