multireader/multireader.go

303 lines
6.9 KiB
Go

package multireader
import (
"errors"
"io"
"sort"
"sync"
)
var (
ErrMultiReaderClosed = errors.New("multireader closed")
ErrReaderClosed = errors.New("reader closed")
ErrOutOfBound = errors.New("seek beyond writer")
ErrUnsupportedWhence = errors.New("unsupported whence")
)
type MultiReader struct {
mtx *sync.Mutex
cond *sync.Cond
buffer []byte
readers []*Reader
pos []*Reader
notif []*Reader
lastWritePos int
closed bool
}
type Reader struct {
multiReader *MultiReader
mtx *sync.Mutex
cond *sync.Cond
notify func(int)
id int
readPos int
closed bool
}
//MultiReader initialize new MultiReader object. The object also implements WriteCloser.
func NewMultiReader(len int) *MultiReader {
if len < 0 {
len = 0
}
buff := make([]byte, len)
mtx := &sync.Mutex{}
return &MultiReader{
mtx: mtx,
cond: sync.NewCond(mtx),
buffer: buff,
readers: make([]*Reader, 0),
pos: make([]*Reader, 0),
notif: make([]*Reader, 0),
lastWritePos: 0,
closed: false,
}
}
func (m *MultiReader) notify(dataLen int) {
m.cond.Broadcast()
for _, reader := range m.notif {
if reader.notify != nil && reader.readPos >= m.lastWritePos-dataLen && !reader.closed {
go func() {
reader.notify(dataLen)
}()
}
}
}
// Write implements Writer interface. The function will not return error. Error return only satisfy the interface.
func (m *MultiReader) Write(data []byte) (int, error) {
m.mtx.Lock()
defer m.mtx.Unlock()
var (
tmpBuff []byte
dataLen int
)
if m.closed {
return 0, ErrMultiReaderClosed
}
defer func(ip *int) {
m.notify(*ip)
}(&dataLen)
if m.lastWritePos+len(data) > len(m.buffer) {
tmpBuff = make([]byte, m.lastWritePos+len(data)-len(m.buffer))
m.buffer = append(m.buffer, tmpBuff...)
}
copy(m.buffer[m.lastWritePos:], data)
m.lastWritePos += len(data)
dataLen = len(data)
//_ = dataLen //prevents lint complaining about ineffectual assignment
return len(data), nil
}
func (m *MultiReader) sortLastPos() {
sort.Slice(m.pos, func(i, j int) bool {
return m.pos[i].readPos < m.pos[j].readPos
})
}
func (m *MultiReader) shift() {
shiftPos := m.pos[0].readPos
if shiftPos > 0 {
for _, r := range m.pos {
r.readPos -= shiftPos
}
copy(m.buffer, m.buffer[shiftPos:shiftPos+(m.lastWritePos-shiftPos)])
m.lastWritePos -= shiftPos
}
}
func (m *MultiReader) removeReader(id int) {
var (
pin int
nb []*Reader
)
for idx := range m.pos {
if m.pos[idx].id == id {
pin = idx
break
}
}
nb = make([]*Reader, len(m.pos)-1)
if pin == 0 {
copy(nb, m.pos[1:])
} else if pin == len(m.pos)-1 {
copy(nb, m.pos[:len(m.pos)-1])
} else {
copy(nb, m.pos[:pin])
copy(nb[pin:], m.pos[pin+1:])
}
m.pos = nb
}
// Close will not return error. The error return only satisfy the interface signature.
func (m *MultiReader) Close() error {
m.mtx.Lock()
defer m.mtx.Unlock()
m.closed = true
return nil
}
// NewReader initialize new Reader object. The object implements ReadCloser and Seeker interface.
func (m *MultiReader) NewReader() *Reader {
m.mtx.Lock()
defer m.mtx.Unlock()
reader := &Reader{
multiReader: m,
mtx: m.mtx,
cond: m.cond,
id: len(m.readers),
readPos: 0,
}
m.readers = append(m.readers, reader)
m.pos = append(m.pos, reader)
m.sortLastPos()
return reader
}
// NotifyFunc sets up a callback function notify. The notify function will be called when there is a Write operation.
// Notify function passed to the NotifyFunc will executed in a separate go routine for each notify function.
// It's the caller responsibility to setup any synchronization to prevent data race.
func (r *Reader) NotifyFunc(notify func(int)) {
r.mtx.Lock()
defer r.mtx.Unlock()
r.notify = notify
var x int
switch len(r.multiReader.notif) {
case 0:
r.multiReader.notif = append(r.multiReader.notif, r)
case 1:
if r.multiReader.notif[0].id != r.id {
goto notifyFuncAppend2
}
default:
goto notifyFuncAppend1
}
return
notifyFuncAppend1:
x = sort.Search(len(r.multiReader.notif), func(i int) bool { return r.multiReader.notif[i].id >= r.id })
if x < len(r.multiReader.notif) && r.id == x {
return
}
notifyFuncAppend2:
r.multiReader.notif = append(r.multiReader.notif, r)
sort.Slice(r.multiReader.notif, func(i, j int) bool { return r.multiReader.notif[i].id < r.multiReader.notif[j].id })
}
// Read implements Reader interface. If the reader reached the same position with the writer. It will wait.
// Calling Read() is the same with calling WaitAvailable(), ReadAhead(), and Seek()
func (r *Reader) Read(data []byte) (int, error) {
r.mtx.Lock()
defer r.mtx.Unlock()
var (
n int
err error
)
_, err = r.waitAvailable()
if err != nil {
return 0, err
}
n, err = r.readAhead(data)
_, _ = r.seek(n, io.SeekCurrent)
return n, err
}
func (r *Reader) seek(offset int, whence int) (int, error) {
if offset > r.multiReader.lastWritePos || offset < 0 {
return r.readPos, ErrOutOfBound
}
switch whence {
case io.SeekCurrent:
r.readPos += offset
case io.SeekStart:
r.readPos = offset
case io.SeekEnd:
r.readPos = r.multiReader.lastWritePos - offset
default:
return 0, ErrUnsupportedWhence
}
r.multiReader.sortLastPos()
r.multiReader.shift()
return r.readPos, nil
}
// Seek implements Seeker interface. The underlying function will move the reader position.
// It only returns error when the whence parameter is not in io.SeekStart, io.SeekCurrent, io.SeekEnd
func (r *Reader) Seek(offset int64, whence int) (int64, error) {
n, err := r.seek(int(offset), whence)
return int64(n), err
}
func (r *Reader) readAhead(data []byte) (int, error) {
var err error
if r.readPos+len(data) > r.multiReader.lastWritePos {
copy(data, r.multiReader.buffer[r.readPos:r.multiReader.lastWritePos])
if r.multiReader.closed {
err = ErrMultiReaderClosed
}
return r.multiReader.lastWritePos - r.readPos, err
}
copy(data, r.multiReader.buffer[r.readPos:r.readPos+len(data)])
return len(data), nil
}
// ReadAhead reads buffer ahead of current position without moving the reader position.
func (r *Reader) ReadAhead(data []byte) (int, error) {
r.mtx.Lock()
defer r.mtx.Unlock()
return r.readAhead(data)
}
func (r *Reader) waitAvailable() (int, error) {
if r.closed {
return 0, ErrReaderClosed
}
for r.readPos >= r.multiReader.lastWritePos {
if r.multiReader.closed {
return 0, ErrMultiReaderClosed
}
r.cond.Wait()
}
return r.multiReader.lastWritePos - r.readPos, nil
}
// WaitAvailable will only wait when Reader position reached Writer position.
func (r *Reader) WaitAvailable() (int, error) {
r.mtx.Lock()
defer r.mtx.Unlock()
return r.waitAvailable()
}
// Close implements ReadCloser interface. It will not return error.
func (r *Reader) Close() error {
r.mtx.Lock()
defer r.mtx.Unlock()
r.closed = true
r.multiReader.removeReader(r.id)
return nil
}