From 7aa1f19f568f72bc7085446581867fbe89ce45df Mon Sep 17 00:00:00 2001 From: Suyono Date: Sun, 31 Jan 2021 01:19:37 +0700 Subject: [PATCH] initial work --- .idea/.gitignore | 8 ++ .idea/modules.xml | 8 ++ .idea/multireader.iml | 9 ++ .idea/vcs.xml | 6 ++ .idea/watcherTasks.xml | 53 +++++++++++ go.mod | 3 + multiread_test.go | 114 ++++++++++++++++++++++++ multireader.go | 196 +++++++++++++++++++++++++++++++++++++++++ 8 files changed, 397 insertions(+) create mode 100644 .idea/.gitignore create mode 100644 .idea/modules.xml create mode 100644 .idea/multireader.iml create mode 100644 .idea/vcs.xml create mode 100644 .idea/watcherTasks.xml create mode 100644 go.mod create mode 100644 multiread_test.go create mode 100644 multireader.go diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..73f69e0 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml +# Editor-based HTTP Client requests +/httpRequests/ diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..0fe10e5 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/multireader.iml b/.idea/multireader.iml new file mode 100644 index 0000000..5e764c4 --- /dev/null +++ b/.idea/multireader.iml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/.idea/watcherTasks.xml b/.idea/watcherTasks.xml new file mode 100644 index 0000000..d1dc02b --- /dev/null +++ b/.idea/watcherTasks.xml @@ -0,0 +1,53 @@ + + + + + + + + + + + \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..da11c58 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module suyono.dev/code/multireader + +go 1.15 diff --git a/multiread_test.go b/multiread_test.go new file mode 100644 index 0000000..9f0113f --- /dev/null +++ b/multiread_test.go @@ -0,0 +1,114 @@ +package multireader + +import ( + "bytes" + "math/rand" + "sync" + "testing" +) + +func TestMultiRead(t *testing.T) { + t.Run("positive", func(t *testing.T) { + _ = NewMultiReader(64) + }) + t.Run("zero", func(t *testing.T) { + _ = NewMultiReader(0) + }) + t.Run("negative", func(t *testing.T) { + _ = NewMultiReader(-64) + }) +} + +func TestReadWrite(t *testing.T) { + var ( + src []byte + dst1 []byte + dst2 []byte + dst3 []byte + wg *sync.WaitGroup + ) + + src = make([]byte, 20480) + dst1 = make([]byte, 20480) + dst2 = make([]byte, 20480) + dst3 = make([]byte, 20480) + + n, err := rand.Read(src) + if n < len(src) || err != nil { + t.Fatalf("cannot initiate test data: %d; %v", n, err) + } + + m := NewMultiReader(512) + r1 := m.NewReader() + r2 := m.NewReader() + r3 := m.NewReader() + wg = &sync.WaitGroup{} + + writeFunc := func(t *testing.T, m *MultiReader, data []byte, wg *sync.WaitGroup) { + var ( + n int + dl int + r int + ) + defer wg.Done() + + for n < len(data) { + dl = rand.Intn(512) + if n+512+dl > len(data) { + r, _ = m.Write(data[n:]) + } else { + r, _ = m.Write(data[n : n+512+dl]) + } + n += r + } + _ = m.Close() + } + wg.Add(1) + go writeFunc(t, m, src, wg) + + readFunc := func(t *testing.T, r *Reader, data []byte, wg *sync.WaitGroup) { + var ( + offset int + n int + err error + ) + defer wg.Done() + + offset = 0 + for { + if offset+1024 < len(data) { + n, err = r.Read(data[offset : offset+1024]) + if err != nil { + t.Logf("reader got error: %v", err) + break + } + } else { + n, err = r.Read(data[offset:]) + if err != nil { + t.Logf("reader got error: %v", err) + break + } + + } + offset += n + } + } + + wg.Add(1) + go readFunc(t, r1, dst1, wg) + wg.Add(1) + go readFunc(t, r2, dst2, wg) + wg.Add(1) + go readFunc(t, r3, dst3, wg) + + wg.Wait() + if !bytes.Equal(src, dst1) { + t.Error("src and dst1 mismatch") + } + if !bytes.Equal(src, dst2) { + t.Error("src and dst2 mismatch") + } + if !bytes.Equal(src, dst3) { + t.Error("src and dst3 mismatch") + } +} diff --git a/multireader.go b/multireader.go new file mode 100644 index 0000000..1cb8cf8 --- /dev/null +++ b/multireader.go @@ -0,0 +1,196 @@ +package multireader + +import ( + "errors" + "io" + "sort" + "sync" +) + +var ( + ErrMultiReaderClosed = errors.New("multireader closed") + ErrReaderClosed = errors.New("reader closed") + ErrOutOfBound = errors.New("seek beyond writer") + ErrUnsupportedWhence = errors.New("unsupported whence") +) + +type MultiReader struct { + mtx *sync.Mutex + cond *sync.Cond + buffer []byte + readers []*Reader + pos []*Reader + lastWritePos int + closed bool +} + +type Reader struct { + multiReader *MultiReader + mtx *sync.Mutex + cond *sync.Cond + id int + readPos int + closed bool +} + +func NewMultiReader(len int) *MultiReader { + if len < 0 { + len = 0 + } + + buff := make([]byte, len) + mtx := &sync.Mutex{} + return &MultiReader{ + mtx: mtx, + cond: sync.NewCond(mtx), + buffer: buff, + readers: make([]*Reader, 0), + pos: make([]*Reader, 0), + lastWritePos: 0, + closed: false, + } +} + +func (m *MultiReader) Write(data []byte) (int, error) { + m.mtx.Lock() + defer m.mtx.Unlock() + defer m.cond.Broadcast() + + var ( + tmpBuff []byte + ) + + if m.closed { + return 0, ErrMultiReaderClosed + } + + if m.lastWritePos+len(data) > len(m.buffer) { + tmpBuff = make([]byte, m.lastWritePos+len(data)-len(m.buffer)) + m.buffer = append(m.buffer, tmpBuff...) + } + copy(m.buffer[m.lastWritePos:], data) + m.lastWritePos += len(data) + + return len(data), nil +} + +func (m *MultiReader) sortLastPos() { + sort.Slice(m.pos, func(i, j int) bool { + return m.pos[i].readPos < m.pos[j].readPos + }) +} + +func (m *MultiReader) shift() { + shiftPos := m.pos[0].readPos + if shiftPos > 0 { + for _, r := range m.pos { + r.readPos -= shiftPos + } + copy(m.buffer, m.buffer[shiftPos:shiftPos+(m.lastWritePos-shiftPos)]) + m.lastWritePos -= shiftPos + } +} + +func (m *MultiReader) Close() error { + m.mtx.Lock() + defer m.mtx.Unlock() + m.closed = true + return nil +} + +func (m *MultiReader) NewReader() *Reader { + m.mtx.Lock() + defer m.mtx.Unlock() + + reader := &Reader{ + multiReader: m, + mtx: m.mtx, + cond: m.cond, + id: len(m.readers), + readPos: 0, + } + m.readers = append(m.readers, reader) + m.pos = append(m.pos, reader) + m.sortLastPos() + + return reader +} + +func (r *Reader) Read(data []byte) (int, error) { + r.mtx.Lock() + defer r.mtx.Unlock() + defer r.cond.Broadcast() + + var ( + n int + err error + ) + + if r.closed { + return 0, ErrReaderClosed + } + + for r.readPos >= r.multiReader.lastWritePos { + if r.multiReader.closed { + return 0, ErrMultiReaderClosed + } + r.cond.Wait() + } + + n, err = r.readAhead(data) + _, _ = r.seek(n, io.SeekCurrent) + + return n, err +} + +func (r *Reader) seek(offset int, whence int) (int, error) { + if offset > r.multiReader.lastWritePos || offset < 0 { + return r.readPos, ErrOutOfBound + } + + switch whence { + case io.SeekCurrent: + r.readPos += offset + case io.SeekStart: + r.readPos = offset + case io.SeekEnd: + r.readPos = r.multiReader.lastWritePos - offset + } + + r.multiReader.sortLastPos() + r.multiReader.shift() + + return r.readPos, nil +} + +func (r *Reader) Seek(offset int64, whence int) (int64, error) { + n, err := r.seek(int(offset), whence) + return int64(n), err +} + +func (r *Reader) readAhead(data []byte) (int, error) { + var err error + if r.readPos+len(data) > r.multiReader.lastWritePos { + copy(data, r.multiReader.buffer[r.readPos:r.multiReader.lastWritePos]) + if r.multiReader.closed { + err = ErrMultiReaderClosed + } + return r.multiReader.lastWritePos - r.readPos, err + } + + copy(data, r.multiReader.buffer[r.readPos:r.readPos+len(data)]) + return len(data), nil +} + +func (r *Reader) ReadAhead(data []byte) (int, error) { + r.mtx.Lock() + defer r.mtx.Unlock() + return r.readAhead(data) +} + +func (r *Reader) Close() error { + r.mtx.Lock() + defer r.mtx.Unlock() + r.closed = true + return nil +}