initial work

This commit is contained in:
Suyono 2021-01-31 01:19:37 +07:00
commit 7aa1f19f56
8 changed files with 397 additions and 0 deletions

8
.idea/.gitignore vendored Normal file
View File

@ -0,0 +1,8 @@
# Default ignored files
/shelf/
/workspace.xml
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
# Editor-based HTTP Client requests
/httpRequests/

8
.idea/modules.xml Normal file
View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/multireader.iml" filepath="$PROJECT_DIR$/.idea/multireader.iml" />
</modules>
</component>
</project>

9
.idea/multireader.iml Normal file
View File

@ -0,0 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="WEB_MODULE" version="4">
<component name="Go" enabled="true" />
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

6
.idea/vcs.xml Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

53
.idea/watcherTasks.xml Normal file
View File

@ -0,0 +1,53 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectTasksOptions">
<TaskOptions isEnabled="true">
<option name="arguments" value="-w $FilePath$" />
<option name="checkSyntaxErrors" value="true" />
<option name="description" />
<option name="exitCodeBehavior" value="ERROR" />
<option name="fileExtension" value="go" />
<option name="immediateSync" value="false" />
<option name="name" value="goimports" />
<option name="output" value="$FilePath$" />
<option name="outputFilters">
<array />
</option>
<option name="outputFromStdout" value="false" />
<option name="program" value="goimports" />
<option name="runOnExternalChanges" value="false" />
<option name="scopeName" value="Project Files" />
<option name="trackOnlyRoot" value="true" />
<option name="workingDir" value="$ProjectFileDir$" />
<envs>
<env name="GOROOT" value="$GOROOT$" />
<env name="GOPATH" value="$GOPATH$" />
<env name="PATH" value="$GoBinDirs$" />
</envs>
</TaskOptions>
<TaskOptions isEnabled="true">
<option name="arguments" value="run --disable=typecheck $FileDir$" />
<option name="checkSyntaxErrors" value="true" />
<option name="description" />
<option name="exitCodeBehavior" value="ERROR" />
<option name="fileExtension" value="go" />
<option name="immediateSync" value="false" />
<option name="name" value="golangci-lint" />
<option name="output" value="" />
<option name="outputFilters">
<array />
</option>
<option name="outputFromStdout" value="false" />
<option name="program" value="golangci-lint" />
<option name="runOnExternalChanges" value="false" />
<option name="scopeName" value="Project Files" />
<option name="trackOnlyRoot" value="true" />
<option name="workingDir" value="$ProjectFileDir$" />
<envs>
<env name="GOROOT" value="$GOROOT$" />
<env name="GOPATH" value="$GOPATH$" />
<env name="PATH" value="$GoBinDirs$" />
</envs>
</TaskOptions>
</component>
</project>

3
go.mod Normal file
View File

@ -0,0 +1,3 @@
module suyono.dev/code/multireader
go 1.15

114
multiread_test.go Normal file
View File

@ -0,0 +1,114 @@
package multireader
import (
"bytes"
"math/rand"
"sync"
"testing"
)
func TestMultiRead(t *testing.T) {
t.Run("positive", func(t *testing.T) {
_ = NewMultiReader(64)
})
t.Run("zero", func(t *testing.T) {
_ = NewMultiReader(0)
})
t.Run("negative", func(t *testing.T) {
_ = NewMultiReader(-64)
})
}
func TestReadWrite(t *testing.T) {
var (
src []byte
dst1 []byte
dst2 []byte
dst3 []byte
wg *sync.WaitGroup
)
src = make([]byte, 20480)
dst1 = make([]byte, 20480)
dst2 = make([]byte, 20480)
dst3 = make([]byte, 20480)
n, err := rand.Read(src)
if n < len(src) || err != nil {
t.Fatalf("cannot initiate test data: %d; %v", n, err)
}
m := NewMultiReader(512)
r1 := m.NewReader()
r2 := m.NewReader()
r3 := m.NewReader()
wg = &sync.WaitGroup{}
writeFunc := func(t *testing.T, m *MultiReader, data []byte, wg *sync.WaitGroup) {
var (
n int
dl int
r int
)
defer wg.Done()
for n < len(data) {
dl = rand.Intn(512)
if n+512+dl > len(data) {
r, _ = m.Write(data[n:])
} else {
r, _ = m.Write(data[n : n+512+dl])
}
n += r
}
_ = m.Close()
}
wg.Add(1)
go writeFunc(t, m, src, wg)
readFunc := func(t *testing.T, r *Reader, data []byte, wg *sync.WaitGroup) {
var (
offset int
n int
err error
)
defer wg.Done()
offset = 0
for {
if offset+1024 < len(data) {
n, err = r.Read(data[offset : offset+1024])
if err != nil {
t.Logf("reader got error: %v", err)
break
}
} else {
n, err = r.Read(data[offset:])
if err != nil {
t.Logf("reader got error: %v", err)
break
}
}
offset += n
}
}
wg.Add(1)
go readFunc(t, r1, dst1, wg)
wg.Add(1)
go readFunc(t, r2, dst2, wg)
wg.Add(1)
go readFunc(t, r3, dst3, wg)
wg.Wait()
if !bytes.Equal(src, dst1) {
t.Error("src and dst1 mismatch")
}
if !bytes.Equal(src, dst2) {
t.Error("src and dst2 mismatch")
}
if !bytes.Equal(src, dst3) {
t.Error("src and dst3 mismatch")
}
}

196
multireader.go Normal file
View File

@ -0,0 +1,196 @@
package multireader
import (
"errors"
"io"
"sort"
"sync"
)
var (
ErrMultiReaderClosed = errors.New("multireader closed")
ErrReaderClosed = errors.New("reader closed")
ErrOutOfBound = errors.New("seek beyond writer")
ErrUnsupportedWhence = errors.New("unsupported whence")
)
type MultiReader struct {
mtx *sync.Mutex
cond *sync.Cond
buffer []byte
readers []*Reader
pos []*Reader
lastWritePos int
closed bool
}
type Reader struct {
multiReader *MultiReader
mtx *sync.Mutex
cond *sync.Cond
id int
readPos int
closed bool
}
func NewMultiReader(len int) *MultiReader {
if len < 0 {
len = 0
}
buff := make([]byte, len)
mtx := &sync.Mutex{}
return &MultiReader{
mtx: mtx,
cond: sync.NewCond(mtx),
buffer: buff,
readers: make([]*Reader, 0),
pos: make([]*Reader, 0),
lastWritePos: 0,
closed: false,
}
}
func (m *MultiReader) Write(data []byte) (int, error) {
m.mtx.Lock()
defer m.mtx.Unlock()
defer m.cond.Broadcast()
var (
tmpBuff []byte
)
if m.closed {
return 0, ErrMultiReaderClosed
}
if m.lastWritePos+len(data) > len(m.buffer) {
tmpBuff = make([]byte, m.lastWritePos+len(data)-len(m.buffer))
m.buffer = append(m.buffer, tmpBuff...)
}
copy(m.buffer[m.lastWritePos:], data)
m.lastWritePos += len(data)
return len(data), nil
}
func (m *MultiReader) sortLastPos() {
sort.Slice(m.pos, func(i, j int) bool {
return m.pos[i].readPos < m.pos[j].readPos
})
}
func (m *MultiReader) shift() {
shiftPos := m.pos[0].readPos
if shiftPos > 0 {
for _, r := range m.pos {
r.readPos -= shiftPos
}
copy(m.buffer, m.buffer[shiftPos:shiftPos+(m.lastWritePos-shiftPos)])
m.lastWritePos -= shiftPos
}
}
func (m *MultiReader) Close() error {
m.mtx.Lock()
defer m.mtx.Unlock()
m.closed = true
return nil
}
func (m *MultiReader) NewReader() *Reader {
m.mtx.Lock()
defer m.mtx.Unlock()
reader := &Reader{
multiReader: m,
mtx: m.mtx,
cond: m.cond,
id: len(m.readers),
readPos: 0,
}
m.readers = append(m.readers, reader)
m.pos = append(m.pos, reader)
m.sortLastPos()
return reader
}
func (r *Reader) Read(data []byte) (int, error) {
r.mtx.Lock()
defer r.mtx.Unlock()
defer r.cond.Broadcast()
var (
n int
err error
)
if r.closed {
return 0, ErrReaderClosed
}
for r.readPos >= r.multiReader.lastWritePos {
if r.multiReader.closed {
return 0, ErrMultiReaderClosed
}
r.cond.Wait()
}
n, err = r.readAhead(data)
_, _ = r.seek(n, io.SeekCurrent)
return n, err
}
func (r *Reader) seek(offset int, whence int) (int, error) {
if offset > r.multiReader.lastWritePos || offset < 0 {
return r.readPos, ErrOutOfBound
}
switch whence {
case io.SeekCurrent:
r.readPos += offset
case io.SeekStart:
r.readPos = offset
case io.SeekEnd:
r.readPos = r.multiReader.lastWritePos - offset
}
r.multiReader.sortLastPos()
r.multiReader.shift()
return r.readPos, nil
}
func (r *Reader) Seek(offset int64, whence int) (int64, error) {
n, err := r.seek(int(offset), whence)
return int64(n), err
}
func (r *Reader) readAhead(data []byte) (int, error) {
var err error
if r.readPos+len(data) > r.multiReader.lastWritePos {
copy(data, r.multiReader.buffer[r.readPos:r.multiReader.lastWritePos])
if r.multiReader.closed {
err = ErrMultiReaderClosed
}
return r.multiReader.lastWritePos - r.readPos, err
}
copy(data, r.multiReader.buffer[r.readPos:r.readPos+len(data)])
return len(data), nil
}
func (r *Reader) ReadAhead(data []byte) (int, error) {
r.mtx.Lock()
defer r.mtx.Unlock()
return r.readAhead(data)
}
func (r *Reader) Close() error {
r.mtx.Lock()
defer r.mtx.Unlock()
r.closed = true
return nil
}