WIP: random message to generate test data, looks good

This commit is contained in:
2025-05-19 18:19:45 +10:00
parent 06d39158a5
commit 17de302019
5 changed files with 355 additions and 6 deletions

View File

@@ -0,0 +1,70 @@
package randommessage
/*
Copyright 2025 Suyono <suyono3484@gmail.com>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import (
"encoding/binary"
"errors"
"hash/crc32"
"io"
)
type ChunkedMessage struct {
seq uint32
sr *StreamReader
}
func NewChunkedReader(size int64) (*ChunkedMessage, error) {
sr, err := NewStreamReader(size)
if err != nil {
return nil, err
}
return &ChunkedMessage{
seq: 0,
sr: sr,
}, nil
}
func (c *ChunkedMessage) Read(buf []byte) (int, error) {
if len(buf) < 8 {
return 0, ErrInvalidBufSize
}
var (
n int
err error
)
limit := len(buf) - 4
binary.BigEndian.PutUint32(buf, c.seq)
if n, err = c.sr.Read(buf[4:limit]); err != nil && !errors.Is(err, io.EOF) {
return n, err
}
if n == 0 {
return n, err
}
if n < limit-4 {
limit = n + 4
}
binary.BigEndian.PutUint32(buf[limit:], crc32.Checksum(buf[:limit], crc32.MakeTable(crc32.Castagnoli)))
c.seq++
return n + 8, err
}

View File

@@ -17,6 +17,7 @@ package randommessage
*/
import (
"bytes"
"crypto/rand"
"encoding/binary"
"errors"
@@ -30,10 +31,14 @@ type StreamReader struct {
pos int64
size int64
hashEngine hash.Hash
sum []byte
hashPos int
}
var (
ErrInvalidBufSize = errors.New("invalid buffer size")
ErrInvalidBufSize = errors.New("invalid buffer size")
ErrInvalidTrailingLength = errors.New("invalid trailing length")
ErrMessageVerification = errors.New("message verification failed")
)
func NewStreamReader(size int64) (*StreamReader, error) {
@@ -49,6 +54,8 @@ func NewStreamReader(size int64) (*StreamReader, error) {
pos: 0,
size: size,
hashEngine: h,
sum: nil,
hashPos: 0,
}, nil
}
@@ -70,6 +77,25 @@ func (s *StreamReader) Read(buf []byte) (int, error) {
n, err = s.read(buf)
}
if err != nil && errors.Is(err, io.EOF) {
if s.hashPos > 0 && s.hashPos < blake2s.Size {
n += copy(buf[n:], s.sum[s.hashPos:])
s.hashPos += n
if s.hashPos < blake2s.Size {
err = nil
}
} else if s.hashPos == 0 {
s.sum = s.hashEngine.Sum(nil)
if n < len(buf) {
s.hashPos = copy(buf[n:], s.sum[s.hashPos:])
if s.hashPos < blake2s.Size {
err = nil
}
n += s.hashPos
}
}
}
return n, err
}
@@ -98,3 +124,74 @@ func (s *StreamReader) read(buf []byte) (int, error) {
return n, err
}
type MessageChecker struct {
expectedLen int64
hashEngine hash.Hash
pos int64
buffer *bytes.Buffer
}
func NewMessageChecker() (*MessageChecker, error) {
var (
h hash.Hash
err error
)
if h, err = blake2s.New256(nil); err != nil {
return nil, err
}
return &MessageChecker{
hashEngine: h,
expectedLen: -1,
pos: 0,
buffer: bytes.NewBuffer(nil),
}, nil
}
func (m *MessageChecker) Write(buf []byte) (int, error) {
var (
n, limit int
)
n = 0
if m.expectedLen == -1 {
if len(buf) < 4 {
return 0, ErrInvalidBufSize
}
m.expectedLen = int64(binary.BigEndian.Uint32(buf))
n = 4
}
if m.expectedLen > 0 {
limit = len(buf)
if m.pos < m.expectedLen {
if m.pos+int64(limit-n) > m.expectedLen {
limit = int(int64(limit) - (m.pos + int64(limit-n) - m.expectedLen))
m.buffer.Write(buf[limit:])
}
m.hashEngine.Write(buf[n:limit])
m.pos += int64(limit - n)
} else {
m.buffer.Write(buf[n:limit])
m.pos += int64(limit - n)
}
}
return len(buf), nil
}
func (m *MessageChecker) Close() error {
buf := m.buffer.Bytes()
if len(buf) != blake2s.Size {
return ErrInvalidTrailingLength
}
sum := m.hashEngine.Sum(nil)
if !bytes.Equal(sum, buf) {
return ErrMessageVerification
}
return nil
}

View File

@@ -0,0 +1,183 @@
package randommessage
/*
Copyright 2025 Suyono <suyono3484@gmail.com>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import (
"bytes"
"encoding/binary"
"errors"
"hash"
"hash/crc32"
"io"
"testing"
"gitea.suyono.dev/suyono/go-sizes/sizes"
"golang.org/x/crypto/blake2s"
)
func TestStreamReader(t *testing.T) {
var (
n int
h hash.Hash
testSize int64 = int64(16 * sizes.KibiByte)
mc *MessageChecker
)
sr, err := NewStreamReader(testSize)
if err != nil {
t.Fatalf("failed to initialize stream: %v", err)
}
buf := make([]byte, (10 * sizes.KibiByte).MustInt())
if n, err = sr.Read(buf); err != nil {
t.Fatalf("failed to read from stream: %v", err)
}
if int64(binary.BigEndian.Uint32(buf)) != testSize {
t.Fatal("test size failed")
}
if mc, err = NewMessageChecker(); err != nil {
t.Fatalf("failed to instantiate Message Checker: %v", err)
}
h, err = blake2s.New256(nil)
if err != nil {
t.Fatalf("failed to instanciate blake2s: %v", err)
}
h.Write(buf[4:n])
if _, err = mc.Write(buf[:n]); err != nil {
t.Fatalf("write to Message Checker failed: %v", err)
}
if n, err = sr.Read(buf); err != nil {
if !errors.Is(err, io.EOF) {
t.Fatalf("failed to read from stream: %v", err)
}
}
h.Write(buf[:n-32])
sum := h.Sum(nil)
if !bytes.Equal(buf[n-32:n], sum) {
t.Fatal("mismatch hash")
}
if _, err = mc.Write(buf[:n]); err != nil {
t.Fatalf("write to Message Checker failed: %v", err)
}
if err = mc.Close(); err != nil {
t.Fatalf("message checker: %v", err)
}
}
func TestStreamReaderCasePartialHash(t *testing.T) {
var (
n int
testSize int64 = int64(16 * sizes.KibiByte)
mc *MessageChecker
)
sr, err := NewStreamReader(testSize)
if err != nil {
t.Fatalf("failed to initialize stream: %v", err)
}
buf := make([]byte, (10 * sizes.KibiByte).MustInt())
if n, err = sr.Read(buf); err != nil {
t.Fatalf("failed to read from stream: %v", err)
}
if int64(binary.BigEndian.Uint32(buf)) != testSize {
t.Fatal("test size failed")
}
if mc, err = NewMessageChecker(); err != nil {
t.Fatalf("failed to instantiate Message Checker: %v", err)
}
if _, err = mc.Write(buf[:n]); err != nil {
t.Fatalf("write to Message Checker failed: %v", err)
}
if n, err = sr.Read(buf[:6170]); err != nil {
t.Fatalf("failed to read from stream: %v", err)
}
var m int
if m, err = sr.Read(buf[n:]); err != nil {
if !errors.Is(err, io.EOF) {
t.Fatalf("failed to read from stream: %v", err)
}
}
n += m
if _, err = mc.Write(buf[:n]); err != nil {
t.Fatalf("write to Message Checker failed: %v", err)
}
if err = mc.Close(); err != nil {
t.Fatalf("message checker: %v", err)
}
}
func TestChunkedMessage(t *testing.T) {
var (
cm *ChunkedMessage
err error
n int
crc uint32
h hash.Hash
)
if cm, err = NewChunkedReader(int64(16 * sizes.KibiByte)); err != nil {
t.Fatalf("failed to instantiate chunked reader: %v", err)
}
if h, err = blake2s.New256(nil); err != nil {
t.Fatalf("failed to instantiate blake2s: %v", err)
}
buf := make([]byte, sizes.KibiByte.MustInt())
for i := range 16 {
if n, err = cm.Read(buf); err != nil {
t.Fatalf("failed to read: %v", err)
}
crc = binary.BigEndian.Uint32(buf[n-4:])
if crc != crc32.Checksum(buf[:n-4], crc32.MakeTable(crc32.Castagnoli)) {
t.Fatalf("mismatch checksum")
}
if i == 0 {
h.Write(buf[8 : n-4])
} else {
h.Write(buf[4 : n-4])
}
}
if n, err = cm.Read(buf); err != nil && !errors.Is(err, io.EOF) {
t.Fatalf("failed to read: %v", err)
}
if err == nil {
t.Fatal("unexpected nil error")
}
crc = binary.BigEndian.Uint32(buf[n-4:])
if crc != crc32.Checksum(buf[:n-4], crc32.MakeTable(crc32.Castagnoli)) {
t.Fatalf("mismatch checksum")
}
h.Write(buf[4 : n-36])
if !bytes.Equal(h.Sum(nil), buf[n-36:n-4]) {
t.Fatal("mismatch hash")
}
}