feat(reader): added WaitAvailable() method

This commit is contained in:
Suyono 2021-02-01 18:37:02 +07:00
parent c64e032934
commit 9ee2d64865
1 changed files with 23 additions and 9 deletions

View File

@ -151,15 +151,9 @@ func (r *Reader) Read(data []byte) (int, error) {
err error
)
if r.closed {
return 0, ErrReaderClosed
}
for r.readPos >= r.multiReader.lastWritePos {
if r.multiReader.closed {
return 0, ErrMultiReaderClosed
}
r.cond.Wait()
_, err = r.waitAvailable()
if err != nil {
return 0, err
}
n, err = r.readAhead(data)
@ -215,6 +209,26 @@ func (r *Reader) ReadAhead(data []byte) (int, error) {
return r.readAhead(data)
}
func (r *Reader) waitAvailable() (int, error) {
for r.readPos >= r.multiReader.lastWritePos {
if r.closed {
return 0, ErrReaderClosed
}
if r.multiReader.closed {
return 0, ErrMultiReaderClosed
}
r.cond.Wait()
}
return r.multiReader.lastWritePos - r.readPos, nil
}
func (r *Reader) WaitAvailable() (int, error) {
r.mtx.Lock()
defer r.mtx.Unlock()
return r.waitAvailable()
}
func (r *Reader) Close() error {
r.mtx.Lock()
defer r.mtx.Unlock()