Compare commits
No commits in common. "9553f87292c86bbb41f8dcc5b8c3108c2631c40b" and "7aa1f19f568f72bc7085446581867fbe89ce45df" have entirely different histories.
9553f87292
...
7aa1f19f56
@ -1,2 +0,0 @@
|
|||||||
# MultiReader
|
|
||||||
Single writer with multiple readers. Designed for small number of concurrent readers.
|
|
||||||
@ -91,31 +91,6 @@ func (m *MultiReader) shift() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MultiReader) removeReader(id int) {
|
|
||||||
var (
|
|
||||||
pin int
|
|
||||||
nb []*Reader
|
|
||||||
)
|
|
||||||
for idx := range m.pos {
|
|
||||||
if m.pos[idx].id == id {
|
|
||||||
pin = idx
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
nb = make([]*Reader, len(m.pos)-1)
|
|
||||||
if pin == 0 {
|
|
||||||
copy(nb, m.pos[1:])
|
|
||||||
} else if pin == len(m.pos)-1 {
|
|
||||||
copy(nb, m.pos[:len(m.pos)-1])
|
|
||||||
} else {
|
|
||||||
copy(nb, m.pos[:pin])
|
|
||||||
copy(nb[pin:], m.pos[pin+1:])
|
|
||||||
}
|
|
||||||
|
|
||||||
m.pos = nb
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MultiReader) Close() error {
|
func (m *MultiReader) Close() error {
|
||||||
m.mtx.Lock()
|
m.mtx.Lock()
|
||||||
defer m.mtx.Unlock()
|
defer m.mtx.Unlock()
|
||||||
@ -151,9 +126,15 @@ func (r *Reader) Read(data []byte) (int, error) {
|
|||||||
err error
|
err error
|
||||||
)
|
)
|
||||||
|
|
||||||
_, err = r.waitAvailable()
|
if r.closed {
|
||||||
if err != nil {
|
return 0, ErrReaderClosed
|
||||||
return 0, err
|
}
|
||||||
|
|
||||||
|
for r.readPos >= r.multiReader.lastWritePos {
|
||||||
|
if r.multiReader.closed {
|
||||||
|
return 0, ErrMultiReaderClosed
|
||||||
|
}
|
||||||
|
r.cond.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
n, err = r.readAhead(data)
|
n, err = r.readAhead(data)
|
||||||
@ -174,8 +155,6 @@ func (r *Reader) seek(offset int, whence int) (int, error) {
|
|||||||
r.readPos = offset
|
r.readPos = offset
|
||||||
case io.SeekEnd:
|
case io.SeekEnd:
|
||||||
r.readPos = r.multiReader.lastWritePos - offset
|
r.readPos = r.multiReader.lastWritePos - offset
|
||||||
default:
|
|
||||||
return 0, ErrUnsupportedWhence
|
|
||||||
}
|
}
|
||||||
|
|
||||||
r.multiReader.sortLastPos()
|
r.multiReader.sortLastPos()
|
||||||
@ -209,31 +188,9 @@ func (r *Reader) ReadAhead(data []byte) (int, error) {
|
|||||||
return r.readAhead(data)
|
return r.readAhead(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Reader) waitAvailable() (int, error) {
|
|
||||||
for r.readPos >= r.multiReader.lastWritePos {
|
|
||||||
if r.closed {
|
|
||||||
return 0, ErrReaderClosed
|
|
||||||
}
|
|
||||||
if r.multiReader.closed {
|
|
||||||
return 0, ErrMultiReaderClosed
|
|
||||||
}
|
|
||||||
r.cond.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
return r.multiReader.lastWritePos - r.readPos, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Reader) WaitAvailable() (int, error) {
|
|
||||||
r.mtx.Lock()
|
|
||||||
defer r.mtx.Unlock()
|
|
||||||
return r.waitAvailable()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Reader) Close() error {
|
func (r *Reader) Close() error {
|
||||||
r.mtx.Lock()
|
r.mtx.Lock()
|
||||||
defer r.mtx.Unlock()
|
defer r.mtx.Unlock()
|
||||||
r.closed = true
|
r.closed = true
|
||||||
|
|
||||||
r.multiReader.removeReader(r.id)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user