package multireader import ( "errors" "io" "sort" "sync" ) var ( ErrMultiReaderClosed = errors.New("multireader closed") ErrReaderClosed = errors.New("reader closed") ErrOutOfBound = errors.New("seek beyond writer") ErrUnsupportedWhence = errors.New("unsupported whence") ) type MultiReader struct { mtx *sync.Mutex cond *sync.Cond buffer []byte readers []*Reader pos []*Reader notif []*Reader lastWritePos int closed bool } type Reader struct { multiReader *MultiReader mtx *sync.Mutex cond *sync.Cond notify func(int) id int readPos int closed bool } //MultiReader initialize new MultiReader object. The object also implements WriteCloser. func NewMultiReader(len int) *MultiReader { if len < 0 { len = 0 } buff := make([]byte, len) mtx := &sync.Mutex{} return &MultiReader{ mtx: mtx, cond: sync.NewCond(mtx), buffer: buff, readers: make([]*Reader, 0), pos: make([]*Reader, 0), notif: make([]*Reader, 0), lastWritePos: 0, closed: false, } } func (m *MultiReader) notify(dataLen int) { m.cond.Broadcast() for _, reader := range m.notif { if reader.notify != nil && reader.readPos >= m.lastWritePos-dataLen && !reader.closed { go func() { reader.notify(dataLen) }() } } } // Write implements Writer interface. The function will not return error. Error return only satisfy the interface. func (m *MultiReader) Write(data []byte) (int, error) { m.mtx.Lock() defer m.mtx.Unlock() var ( tmpBuff []byte dataLen int ) if m.closed { return 0, ErrMultiReaderClosed } defer func(ip *int) { m.notify(*ip) }(&dataLen) if m.lastWritePos+len(data) > len(m.buffer) { tmpBuff = make([]byte, m.lastWritePos+len(data)-len(m.buffer)) m.buffer = append(m.buffer, tmpBuff...) } copy(m.buffer[m.lastWritePos:], data) m.lastWritePos += len(data) dataLen = len(data) //_ = dataLen //prevents lint complaining about ineffectual assignment return len(data), nil } func (m *MultiReader) sortLastPos() { sort.Slice(m.pos, func(i, j int) bool { return m.pos[i].readPos < m.pos[j].readPos }) } func (m *MultiReader) shift() { shiftPos := m.pos[0].readPos if shiftPos > 0 { for _, r := range m.pos { r.readPos -= shiftPos } copy(m.buffer, m.buffer[shiftPos:shiftPos+(m.lastWritePos-shiftPos)]) m.lastWritePos -= shiftPos } } func (m *MultiReader) removeReader(id int) { var ( pin int nb []*Reader ) for idx := range m.pos { if m.pos[idx].id == id { pin = idx break } } nb = make([]*Reader, len(m.pos)-1) if pin == 0 { copy(nb, m.pos[1:]) } else if pin == len(m.pos)-1 { copy(nb, m.pos[:len(m.pos)-1]) } else { copy(nb, m.pos[:pin]) copy(nb[pin:], m.pos[pin+1:]) } m.pos = nb } // Close will not return error. The error return only satisfy the interface signature. func (m *MultiReader) Close() error { m.mtx.Lock() defer m.mtx.Unlock() m.closed = true return nil } // NewReader initialize new Reader object. The object implements ReadCloser and Seeker interface. func (m *MultiReader) NewReader() *Reader { m.mtx.Lock() defer m.mtx.Unlock() reader := &Reader{ multiReader: m, mtx: m.mtx, cond: m.cond, id: len(m.readers), readPos: 0, } m.readers = append(m.readers, reader) m.pos = append(m.pos, reader) m.sortLastPos() return reader } // NotifyFunc sets up a callback function notify. The notify function will be called when there is a Write operation. // Notify function passed to the NotifyFunc will executed in a separate go routine for each notify function. // It's the caller responsibility to setup any synchronization to prevent data race. func (r *Reader) NotifyFunc(notify func(int)) { r.mtx.Lock() defer r.mtx.Unlock() r.notify = notify var x int switch len(r.multiReader.notif) { case 0: r.multiReader.notif = append(r.multiReader.notif, r) case 1: if r.multiReader.notif[0].id != r.id { goto notifyFuncAppend2 } default: goto notifyFuncAppend1 } return notifyFuncAppend1: x = sort.Search(len(r.multiReader.notif), func(i int) bool { return r.multiReader.notif[i].id >= r.id }) if x < len(r.multiReader.notif) && r.id == x { return } notifyFuncAppend2: r.multiReader.notif = append(r.multiReader.notif, r) sort.Slice(r.multiReader.notif, func(i, j int) bool { return r.multiReader.notif[i].id < r.multiReader.notif[j].id }) } // Read implements Reader interface. If the reader reached the same position with the writer. It will wait. // Calling Read() is the same with calling WaitAvailable(), ReadAhead(), and Seek() func (r *Reader) Read(data []byte) (int, error) { r.mtx.Lock() defer r.mtx.Unlock() var ( n int err error ) _, err = r.waitAvailable() if err != nil { return 0, err } n, err = r.readAhead(data) _, _ = r.seek(n, io.SeekCurrent) return n, err } func (r *Reader) seek(offset int, whence int) (int, error) { if offset > r.multiReader.lastWritePos || offset < 0 { return r.readPos, ErrOutOfBound } switch whence { case io.SeekCurrent: r.readPos += offset case io.SeekStart: r.readPos = offset case io.SeekEnd: r.readPos = r.multiReader.lastWritePos - offset default: return 0, ErrUnsupportedWhence } r.multiReader.sortLastPos() r.multiReader.shift() return r.readPos, nil } // Seek implements Seeker interface. The underlying function will move the reader position. // It only returns error when the whence parameter is not in io.SeekStart, io.SeekCurrent, io.SeekEnd func (r *Reader) Seek(offset int64, whence int) (int64, error) { n, err := r.seek(int(offset), whence) return int64(n), err } func (r *Reader) readAhead(data []byte) (int, error) { var err error if r.readPos+len(data) > r.multiReader.lastWritePos { copy(data, r.multiReader.buffer[r.readPos:r.multiReader.lastWritePos]) if r.multiReader.closed { err = ErrMultiReaderClosed } return r.multiReader.lastWritePos - r.readPos, err } copy(data, r.multiReader.buffer[r.readPos:r.readPos+len(data)]) return len(data), nil } // ReadAhead reads buffer ahead of current position without moving the reader position. func (r *Reader) ReadAhead(data []byte) (int, error) { r.mtx.Lock() defer r.mtx.Unlock() return r.readAhead(data) } func (r *Reader) waitAvailable() (int, error) { if r.closed { return 0, ErrReaderClosed } for r.readPos >= r.multiReader.lastWritePos { if r.multiReader.closed { return 0, ErrMultiReaderClosed } r.cond.Wait() } return r.multiReader.lastWritePos - r.readPos, nil } // WaitAvailable will only wait when Reader position reached Writer position. func (r *Reader) WaitAvailable() (int, error) { r.mtx.Lock() defer r.mtx.Unlock() return r.waitAvailable() } // Close implements ReadCloser interface. It will not return error. func (r *Reader) Close() error { r.mtx.Lock() defer r.mtx.Unlock() r.closed = true r.multiReader.removeReader(r.id) return nil }