diff --git a/README.md b/README.md new file mode 100644 index 0000000..94c8d87 --- /dev/null +++ b/README.md @@ -0,0 +1,2 @@ +# MultiReader +Single writer with multiple readers. Designed for small number of concurrent readers. \ No newline at end of file diff --git a/multireader.go b/multireader.go index 1cb8cf8..c343ca5 100644 --- a/multireader.go +++ b/multireader.go @@ -91,6 +91,31 @@ func (m *MultiReader) shift() { } } +func (m *MultiReader) removeReader(id int) { + var ( + pin int + nb []*Reader + ) + for idx := range m.pos { + if m.pos[idx].id == id { + pin = idx + break + } + } + + nb = make([]*Reader, len(m.pos)-1) + if pin == 0 { + copy(nb, m.pos[1:]) + } else if pin == len(m.pos)-1 { + copy(nb, m.pos[:len(m.pos)-1]) + } else { + copy(nb, m.pos[:pin]) + copy(nb[pin:], m.pos[pin+1:]) + } + + m.pos = nb +} + func (m *MultiReader) Close() error { m.mtx.Lock() defer m.mtx.Unlock() @@ -126,15 +151,9 @@ func (r *Reader) Read(data []byte) (int, error) { err error ) - if r.closed { - return 0, ErrReaderClosed - } - - for r.readPos >= r.multiReader.lastWritePos { - if r.multiReader.closed { - return 0, ErrMultiReaderClosed - } - r.cond.Wait() + _, err = r.waitAvailable() + if err != nil { + return 0, err } n, err = r.readAhead(data) @@ -155,6 +174,8 @@ func (r *Reader) seek(offset int, whence int) (int, error) { r.readPos = offset case io.SeekEnd: r.readPos = r.multiReader.lastWritePos - offset + default: + return 0, ErrUnsupportedWhence } r.multiReader.sortLastPos() @@ -188,9 +209,31 @@ func (r *Reader) ReadAhead(data []byte) (int, error) { return r.readAhead(data) } +func (r *Reader) waitAvailable() (int, error) { + for r.readPos >= r.multiReader.lastWritePos { + if r.closed { + return 0, ErrReaderClosed + } + if r.multiReader.closed { + return 0, ErrMultiReaderClosed + } + r.cond.Wait() + } + + return r.multiReader.lastWritePos - r.readPos, nil +} + +func (r *Reader) WaitAvailable() (int, error) { + r.mtx.Lock() + defer r.mtx.Unlock() + return r.waitAvailable() +} + func (r *Reader) Close() error { r.mtx.Lock() defer r.mtx.Unlock() r.closed = true + + r.multiReader.removeReader(r.id) return nil }