Compare commits

..

3 Commits

12 changed files with 934 additions and 380 deletions

View File

@@ -1,36 +0,0 @@
package slicewriter
import "io"
type SliceWriter struct {
off int
buf []byte
}
func (w *SliceWriter) Write(p []byte) (int, error) {
if len(p)+w.off <= len(w.buf) {
copy(w.buf[w.off:], p)
w.off += len(p)
return len(p), nil
}
space := len(w.buf) - w.off
copy(w.buf[:w.off], p[:space])
w.off += space
return space, io.ErrShortWrite
}
func (w *SliceWriter) Reset() {
w.off = 0
}
func (w *SliceWriter) Bytes() []byte {
return w.buf[:w.off]
}
func NewSliceWriter(b []byte) *SliceWriter {
return &SliceWriter{
off: 0,
buf: b,
}
}

View File

@@ -18,185 +18,64 @@ package main
import (
"context"
"fmt"
"net"
"os/signal"
"time"
"golang.org/x/sys/unix"
"gitea.suyono.dev/suyono/netbounce/cmd/slicewriter"
"gitea.suyono.dev/suyono/netbounce/testlib/client"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/spf13/pflag"
"github.com/spf13/viper"
)
var gLimit *counter
func main() {
ctx, cancel := signal.NotifyContext(context.Background(), unix.SIGINT, unix.SIGTERM)
defer cancel()
parseFlags()
log.Debug().Caller().Msgf("Sending messages to %s server: %s", viper.GetString(PROTOCOL), viper.GetString(SERVER))
sendMessages(ctx)
}
const (
SERVER = "server"
PROTOCOL = "protocol"
UDP = "udp"
TCP = "tcp"
NAME = "name"
MESSAGE = "message"
SLEEP = "sleep"
READTIMEOUT = "read-timeout"
)
func sendMessages(ctx context.Context) {
switch viper.GetString(PROTOCOL) {
case "udp":
sendUDP(ctx)
case "tcp":
sendTCP(ctx)
default:
log.Fatal().Caller().Str(PROTOCOL, viper.GetString(PROTOCOL)).Msg("Unknown protocol")
}
}
func sendTCP(ctx context.Context) {
var (
conn net.Conn
err error
buf, b []byte
n int
)
if conn, err = net.Dial(TCP, viper.GetString(SERVER)); err != nil {
log.Fatal().Caller().Err(err).Msg("Failed to connect to server")
}
defer func() {
_ = conn.Close()
}()
go func() {
<-ctx.Done()
_ = conn.Close()
}()
buf = make([]byte, 4096)
for gLimit.isContinue(ctx) {
sb := slicewriter.NewSliceWriter(buf)
if _, err = fmt.Fprintf(sb, "client %s | %v | %s", viper.GetString(NAME), time.Now(), viper.GetString(MESSAGE)); err != nil {
log.Fatal().Caller().Err(err).Msg("Failed to build client message")
}
b = sb.Bytes()
if _, err = conn.Write(b); err != nil {
log.Fatal().Caller().Err(err).Str(PROTOCOL, TCP).Str(SERVER, viper.GetString(SERVER)).Msg("Failed to send client message")
}
if err = conn.SetReadDeadline(time.Now().Add(viper.GetDuration(READTIMEOUT))); err != nil {
log.Fatal().Caller().Err(err).Str(PROTOCOL, TCP).Str(SERVER, viper.GetString(SERVER)).Msg("Failed to send client message")
}
if n, err = conn.Read(buf); err != nil {
log.Fatal().Caller().Err(err).Str(PROTOCOL, TCP).Str(SERVER, viper.GetString(SERVER)).Msg("read from the server")
}
log.Info().Caller().Str(PROTOCOL, TCP).Str(SERVER, viper.GetString(SERVER)).Msgf("%s", buf[:n])
time.Sleep(viper.GetDuration(SLEEP))
}
}
func sendUDP(ctx context.Context) {
var (
addr, laddr *net.UDPAddr
conn *net.UDPConn
rAddr net.Addr
err error
buf, b []byte
n int
)
if addr, err = net.ResolveUDPAddr(UDP, viper.GetString(SERVER)); err != nil {
log.Fatal().Caller().Err(err).Str(SERVER, viper.GetString(SERVER)).Msg("udp resolve server address")
}
if laddr, err = net.ResolveUDPAddr(UDP, ""); err != nil {
log.Fatal().Caller().Err(err).Str(SERVER, viper.GetString(SERVER)).Msg("udp resolve local/self address")
}
log.Info().Str(SERVER, viper.GetString(SERVER)).Msgf("bound address %v", laddr)
// In Go, binding address and port for UDP use ListenUDP. Confusing!!
if conn, err = net.ListenUDP(UDP, laddr); err != nil {
log.Fatal().Caller().Err(err).Str(SERVER, viper.GetString(SERVER)).Msg("fail to bind local/self address")
}
defer func() {
_ = conn.Close()
}()
go func() {
<-ctx.Done()
_ = conn.Close()
}()
buf = make([]byte, 4096)
for gLimit.isContinue(ctx) {
sb := slicewriter.NewSliceWriter(buf)
if _, err = fmt.Fprintf(sb, "client %s | %v | %s", viper.GetString(NAME), time.Now(), viper.GetString(MESSAGE)); err != nil {
log.Fatal().Caller().Err(err).Msg("Failed to build client message")
}
b = sb.Bytes()
if _, err = conn.WriteTo(b, addr); err != nil {
log.Fatal().Caller().Err(err).Str(PROTOCOL, UDP).Str(SERVER, viper.GetString(SERVER)).Msg("Failed to send client message")
}
if err = conn.SetReadDeadline(time.Now().Add(viper.GetDuration(READTIMEOUT))); err != nil {
log.Error().Caller().Err(err).Str(PROTOCOL, UDP).Str(SERVER, viper.GetString(SERVER)).Msg("set read timeout on the socket")
}
if n, rAddr, err = conn.ReadFrom(b); err != nil {
log.Fatal().Caller().Err(err).Str(PROTOCOL, UDP).Str(SERVER, viper.GetString(SERVER)).Msg("read from server")
}
log.Info().Caller().Str(PROTOCOL, UDP).Str(SERVER, rAddr.String()).Msgf("%s", buf[:n])
}
log.Debug().Caller().Msgf("Sending messages to %s server: %s", viper.GetString(client.PROTOCOL), viper.GetString(client.SERVER))
// sendMessages(ctx)
client.SendMessages(ctx)
}
func parseFlags() {
pflag.String(NAME, "", "client name")
pflag.String(client.NAME, "", "client name")
pflag.IntP("number", "n", 5, "number of messages to send; default 5; set to 0 for infinite")
pflag.Duration(READTIMEOUT, 5*time.Second, "read timeout; default 5 seconds")
pflag.DurationP(SLEEP, "s", 10*time.Millisecond, "sleep time between requests; default 10ms")
pflag.StringP(MESSAGE, "m", "message from client", "message to send")
pflag.Duration(client.READTIMEOUT, 5*time.Second, "read timeout; default 5 seconds")
pflag.DurationP(client.SLEEP, "s", 10*time.Millisecond, "sleep time between requests; default 10ms")
pflag.StringP(client.MESSAGE, "m", "message from client", "message to send")
pflag.Bool("debug", false, "run in debug mode")
pflag.String(TCP, "", "tcp server address")
pflag.String(UDP, "", "udp server address")
pflag.String(client.TCP, "", "tcp server address")
pflag.String(client.UDP, "", "udp server address")
pflag.Parse()
_ = viper.BindPFlags(pflag.CommandLine)
gLimit = makeCounter(viper.GetInt("number"))
// gLimit = makeCounter(viper.GetInt("number"))
client.InitLimit()
if !viper.IsSet(NAME) {
if !viper.IsSet(client.NAME) {
log.Fatal().Caller().Msg("server name is required")
}
if viper.IsSet(TCP) && viper.IsSet(UDP) {
if viper.IsSet(client.TCP) && viper.IsSet(client.UDP) {
log.Fatal().Caller().Msg("cannot use tcp and udp at once")
}
if !viper.IsSet(TCP) && !viper.IsSet(UDP) {
if !viper.IsSet(client.TCP) && !viper.IsSet(client.UDP) {
log.Fatal().Caller().Msg("--tcp or --udp is required, use one of them not both.")
}
if viper.IsSet(TCP) {
viper.Set(PROTOCOL, TCP)
viper.Set(SERVER, viper.GetString(TCP))
if viper.IsSet(client.TCP) {
viper.Set(client.PROTOCOL, client.TCP)
viper.Set(client.SERVER, viper.GetString(client.TCP))
}
if viper.IsSet(UDP) {
viper.Set(PROTOCOL, UDP)
viper.Set(SERVER, viper.GetString(UDP))
if viper.IsSet(client.UDP) {
viper.Set(client.PROTOCOL, client.UDP)
viper.Set(client.SERVER, viper.GetString(client.UDP))
}
zerolog.SetGlobalLevel(zerolog.InfoLevel)
@@ -204,30 +83,3 @@ func parseFlags() {
zerolog.SetGlobalLevel(zerolog.DebugLevel)
}
}
type counter struct {
limit, tick int
}
func makeCounter(limit int) *counter {
if limit <= 0 {
log.Fatal().Msg("number must be > 0")
}
return &counter{limit: limit, tick: -1}
}
func (c *counter) isContinue(ctx context.Context) bool {
select {
case <-ctx.Done():
return false
default:
}
if c.limit == 0 {
return true
}
c.tick++
return c.tick < c.limit
}

View File

@@ -18,13 +18,10 @@ package main
import (
"context"
"fmt"
"net"
"os/signal"
"sync"
"time"
"gitea.suyono.dev/suyono/netbounce/cmd/slicewriter"
"gitea.suyono.dev/suyono/netbounce/testlib/server"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/spf13/pflag"
@@ -41,7 +38,7 @@ func main() {
defer cancel()
wg := &sync.WaitGroup{}
openPorts(wg, ctx)
server.OpenPorts(wg, ctx)
wg.Wait()
@@ -71,167 +68,3 @@ func parseFlags() {
zerolog.SetGlobalLevel(zerolog.DebugLevel)
}
}
func openPorts(wg *sync.WaitGroup, ctx context.Context) {
tcpPorts := viper.GetStringSlice("tcp")
for _, tcpPort := range tcpPorts {
wg.Add(1)
go listen(ctx, wg, tcpPort)
}
udpPorts := viper.GetStringSlice("udp")
for _, udpPort := range udpPorts {
wg.Add(1)
go bindUDP(ctx, wg, udpPort)
}
}
func ClosePacket(ctx context.Context, conn net.PacketConn) {
<-ctx.Done()
_ = conn.Close()
}
func bindUDP(ctx context.Context, wg *sync.WaitGroup, address string) {
defer wg.Done()
var (
conn net.PacketConn
err error
buf, b []byte
n int
addr net.Addr
)
log.Debug().Caller().Msgf("binding socket for UDP on %v", address)
if conn, err = net.ListenPacket("udp", address); err != nil {
panic(fmt.Errorf("failed to bind udp address: %v", err))
}
go ClosePacket(ctx, conn)
buf = make([]byte, 4096)
udpLoop:
for {
if n, addr, err = conn.ReadFrom(buf); err != nil && n == 0 {
select {
case <-ctx.Done():
break udpLoop
default:
}
log.Error().Err(err).Msg("failed to read packet")
continue udpLoop
}
log.Info().Str("client", addr.String()).Msgf("received message: %s", buf[:n])
select {
case <-ctx.Done():
break udpLoop
default:
}
sb := slicewriter.NewSliceWriter(buf)
if _, err = fmt.Fprintf(sb, "server: %s | UDP | %v", viper.GetString("name"), time.Now()); err != nil {
log.Error().Err(err).Msg("build server message")
}
b = sb.Bytes()
if n, err = conn.WriteTo(b, addr); err != nil {
select {
case <-ctx.Done():
break udpLoop
default:
}
log.Error().Err(err).Str("client", addr.String()).Msg("failed to write packet")
continue udpLoop
}
if n != len(b) {
log.Debug().Str("client", addr.String()).Msg("incomplete packet sent")
}
log.Info().Str("client", addr.String()).Msg("packet received and replied")
}
}
func CloseListener(ctx context.Context, listener net.Listener) {
<-ctx.Done()
_ = listener.Close()
}
func listen(ctx context.Context, wg *sync.WaitGroup, address string) {
defer wg.Done()
var (
listener net.Listener
err error
conn net.Conn
)
log.Debug().Caller().Msgf("listeng for TCP on %v", address)
if listener, err = net.Listen("tcp", address); err != nil {
log.Error().Err(err).Str("address", address).Msg("failed to listen")
return
}
go CloseListener(ctx, listener)
tcpIncoming:
for {
if conn, err = listener.Accept(); err != nil {
select {
case <-ctx.Done():
break tcpIncoming
default:
}
log.Error().Err(err).Str("address", address).Msg("failed to accept connection")
continue tcpIncoming
}
wg.Add(1)
go handleTCP(ctx, wg, conn)
}
}
func CloseConnection(ctx context.Context, conn net.Conn) {
<-ctx.Done()
_ = conn.Close()
}
func handleTCP(ctx context.Context, wg *sync.WaitGroup, conn net.Conn) {
defer wg.Done()
defer func() {
_ = conn.Close()
}()
var (
buf, b []byte
err error
n int
)
buf = make([]byte, 4096)
addr := conn.RemoteAddr()
cctx, cancel := context.WithCancel(ctx)
go CloseConnection(cctx, conn)
defer cancel()
for {
if n, err = conn.Read(buf); err != nil {
log.Error().Err(err).Str("client", addr.String()).Msg("failed to read data from TCP connection")
return
}
log.Info().Str("client", addr.String()).Msgf("received message: %s", buf[:n])
sb := slicewriter.NewSliceWriter(buf)
if _, err = fmt.Fprintf(sb, "server: %s | TCP | %v", viper.GetString("name"), time.Now()); err != nil {
log.Error().Err(err).Msg("build server message")
}
b = sb.Bytes()
if _, err = conn.Write(b); err != nil {
log.Error().Err(err).Str("client", addr.String()).Msg("failed to write data to TCP connection")
return
}
}
}

6
go.mod
View File

@@ -3,10 +3,12 @@ module gitea.suyono.dev/suyono/netbounce
go 1.24
require (
gitea.suyono.dev/suyono/go-sizes v0.1.2
github.com/rs/zerolog v1.34.0
github.com/spf13/pflag v1.0.6
github.com/spf13/viper v1.20.1
golang.org/x/sys v0.32.0
golang.org/x/crypto v0.38.0
golang.org/x/sys v0.33.0
)
require (
@@ -22,6 +24,6 @@ require (
github.com/subosito/gotenv v1.6.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/text v0.21.0 // indirect
golang.org/x/text v0.25.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

12
go.sum
View File

@@ -1,3 +1,5 @@
gitea.suyono.dev/suyono/go-sizes v0.1.2 h1:uXViVtTh8/mrJrrv7ApBMIOD4/0dz+Z1Y2R2anqklhU=
gitea.suyono.dev/suyono/go-sizes v0.1.2/go.mod h1:v9jk4b+wlIUkYoT6KwoLHfE/EpFefey6c/WR51liI98=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
@@ -54,13 +56,15 @@ go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI=
go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ=
golang.org/x/crypto v0.38.0 h1:jt+WWG8IZlBnVbomuhg2Mdq0+BBQaHbtqHEFEigjUV8=
golang.org/x/crypto v0.38.0/go.mod h1:MvrbAqul58NNYPKnOra203SB9vpuZW0e+RRZV+Ggqjw=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20=
golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw=
golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.25.0 h1:qVyWApTSYLk/drJRO5mDlNYskwQznZmkpV2c8q9zls4=
golang.org/x/text v0.25.0/go.mod h1:WEdwpYrmk1qmdHvhkSTNPm3app7v4rsT8F2UD6+VHIA=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

View File

@@ -1,5 +1,21 @@
package slicewriter
/*
Copyright 2025 Suyono <suyono3484@gmail.com>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import (
"sync"
"testing"

View File

@@ -0,0 +1,52 @@
package slicewriter
/*
Copyright 2025 Suyono <suyono3484@gmail.com>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import "io"
type SliceWriter struct {
off int
buf []byte
}
func (w *SliceWriter) Write(p []byte) (int, error) {
if len(p)+w.off <= len(w.buf) {
copy(w.buf[w.off:], p)
w.off += len(p)
return len(p), nil
}
space := len(w.buf) - w.off
copy(w.buf[:w.off], p[:space])
w.off += space
return space, io.ErrShortWrite
}
func (w *SliceWriter) Reset() {
w.off = 0
}
func (w *SliceWriter) Bytes() []byte {
return w.buf[:w.off]
}
func NewSliceWriter(b []byte) *SliceWriter {
return &SliceWriter{
off: 0,
buf: b,
}
}

184
testlib/client/client.go Normal file
View File

@@ -0,0 +1,184 @@
package client
/*
Copyright 2025 Suyono <suyono3484@gmail.com>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import (
"context"
"fmt"
"net"
"time"
"gitea.suyono.dev/suyono/netbounce/slicewriter"
"github.com/rs/zerolog/log"
"github.com/spf13/viper"
)
const (
SERVER = "server"
PROTOCOL = "protocol"
UDP = "udp"
TCP = "tcp"
NAME = "name"
MESSAGE = "message"
SLEEP = "sleep"
READTIMEOUT = "read-timeout"
)
var gLimit *counter
func SendMessages(ctx context.Context) {
sendMessages(ctx)
}
func InitLimit() {
gLimit = makeCounter(viper.GetInt("number"))
}
func sendMessages(ctx context.Context) {
switch viper.GetString(PROTOCOL) {
case "udp":
sendUDP(ctx)
case "tcp":
sendTCP(ctx)
default:
log.Fatal().Caller().Str(PROTOCOL, viper.GetString(PROTOCOL)).Msg("Unknown protocol")
}
}
func sendTCP(ctx context.Context) {
var (
conn net.Conn
err error
buf, b []byte
n int
)
if conn, err = net.Dial(TCP, viper.GetString(SERVER)); err != nil {
log.Fatal().Caller().Err(err).Msg("Failed to connect to server")
}
defer func() {
_ = conn.Close()
}()
go func() {
<-ctx.Done()
_ = conn.Close()
}()
buf = make([]byte, 4096)
for gLimit.isContinue(ctx) {
sb := slicewriter.NewSliceWriter(buf)
if _, err = fmt.Fprintf(sb, "client %s | %v | %s", viper.GetString(NAME), time.Now(), viper.GetString(MESSAGE)); err != nil {
log.Fatal().Caller().Err(err).Msg("Failed to build client message")
}
b = sb.Bytes()
if _, err = conn.Write(b); err != nil {
log.Fatal().Caller().Err(err).Str(PROTOCOL, TCP).Str(SERVER, viper.GetString(SERVER)).Msg("Failed to send client message")
}
if err = conn.SetReadDeadline(time.Now().Add(viper.GetDuration(READTIMEOUT))); err != nil {
log.Fatal().Caller().Err(err).Str(PROTOCOL, TCP).Str(SERVER, viper.GetString(SERVER)).Msg("Failed to send client message")
}
if n, err = conn.Read(buf); err != nil {
log.Fatal().Caller().Err(err).Str(PROTOCOL, TCP).Str(SERVER, viper.GetString(SERVER)).Msg("read from the server")
}
log.Info().Caller().Str(PROTOCOL, TCP).Str(SERVER, viper.GetString(SERVER)).Msgf("%s", buf[:n])
time.Sleep(viper.GetDuration(SLEEP))
}
}
func sendUDP(ctx context.Context) {
var (
addr, laddr *net.UDPAddr
conn *net.UDPConn
rAddr net.Addr
err error
buf, b []byte
n int
)
if addr, err = net.ResolveUDPAddr(UDP, viper.GetString(SERVER)); err != nil {
log.Fatal().Caller().Err(err).Str(SERVER, viper.GetString(SERVER)).Msg("udp resolve server address")
}
if laddr, err = net.ResolveUDPAddr(UDP, ""); err != nil {
log.Fatal().Caller().Err(err).Str(SERVER, viper.GetString(SERVER)).Msg("udp resolve local/self address")
}
log.Info().Str(SERVER, viper.GetString(SERVER)).Msgf("bound address %v", laddr)
// In Go, binding address and port for UDP use ListenUDP. Confusing!!
if conn, err = net.ListenUDP(UDP, laddr); err != nil {
log.Fatal().Caller().Err(err).Str(SERVER, viper.GetString(SERVER)).Msg("fail to bind local/self address")
}
defer func() {
_ = conn.Close()
}()
go func() {
<-ctx.Done()
_ = conn.Close()
}()
buf = make([]byte, 4096)
for gLimit.isContinue(ctx) {
sb := slicewriter.NewSliceWriter(buf)
if _, err = fmt.Fprintf(sb, "client %s | %v | %s", viper.GetString(NAME), time.Now(), viper.GetString(MESSAGE)); err != nil {
log.Fatal().Caller().Err(err).Msg("Failed to build client message")
}
b = sb.Bytes()
if _, err = conn.WriteTo(b, addr); err != nil {
log.Fatal().Caller().Err(err).Str(PROTOCOL, UDP).Str(SERVER, viper.GetString(SERVER)).Msg("Failed to send client message")
}
if err = conn.SetReadDeadline(time.Now().Add(viper.GetDuration(READTIMEOUT))); err != nil {
log.Error().Caller().Err(err).Str(PROTOCOL, UDP).Str(SERVER, viper.GetString(SERVER)).Msg("set read timeout on the socket")
}
if n, rAddr, err = conn.ReadFrom(b); err != nil {
log.Fatal().Caller().Err(err).Str(PROTOCOL, UDP).Str(SERVER, viper.GetString(SERVER)).Msg("read from server")
}
log.Info().Caller().Str(PROTOCOL, UDP).Str(SERVER, rAddr.String()).Msgf("%s", buf[:n])
}
}
type counter struct {
limit, tick int
}
func makeCounter(limit int) *counter {
if limit <= 0 {
log.Fatal().Msg("number must be > 0")
}
return &counter{limit: limit, tick: -1}
}
func (c *counter) isContinue(ctx context.Context) bool {
select {
case <-ctx.Done():
return false
default:
}
if c.limit == 0 {
return true
}
c.tick++
return c.tick < c.limit
}

View File

@@ -0,0 +1,70 @@
package randommessage
/*
Copyright 2025 Suyono <suyono3484@gmail.com>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import (
"encoding/binary"
"errors"
"hash/crc32"
"io"
)
type ChunkedMessage struct {
seq uint32
sr *StreamReader
}
func NewChunkedReader(size int64) (*ChunkedMessage, error) {
sr, err := NewStreamReader(size)
if err != nil {
return nil, err
}
return &ChunkedMessage{
seq: 0,
sr: sr,
}, nil
}
func (c *ChunkedMessage) Read(buf []byte) (int, error) {
if len(buf) < 8 {
return 0, ErrInvalidBufSize
}
var (
n int
err error
)
limit := len(buf) - 4
binary.BigEndian.PutUint32(buf, c.seq)
if n, err = c.sr.Read(buf[4:limit]); err != nil && !errors.Is(err, io.EOF) {
return n, err
}
if n == 0 {
return n, err
}
if n < limit-4 {
limit = n + 4
}
binary.BigEndian.PutUint32(buf[limit:], crc32.Checksum(buf[:limit], crc32.MakeTable(crc32.Castagnoli)))
c.seq++
return n + 8, err
}

View File

@@ -0,0 +1,197 @@
package randommessage
/*
Copyright 2025 Suyono <suyono3484@gmail.com>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import (
"bytes"
"crypto/rand"
"encoding/binary"
"errors"
"hash"
"io"
"golang.org/x/crypto/blake2s"
)
type StreamReader struct {
pos int64
size int64
hashEngine hash.Hash
sum []byte
hashPos int
}
var (
ErrInvalidBufSize = errors.New("invalid buffer size")
ErrInvalidTrailingLength = errors.New("invalid trailing length")
ErrMessageVerification = errors.New("message verification failed")
)
func NewStreamReader(size int64) (*StreamReader, error) {
var (
h hash.Hash
err error
)
if h, err = blake2s.New256(nil); err != nil {
return nil, err
}
return &StreamReader{
pos: 0,
size: size,
hashEngine: h,
sum: nil,
hashPos: 0,
}, nil
}
func (s *StreamReader) Read(buf []byte) (int, error) {
var (
n int
err error
)
if s.pos == 0 {
if len(buf) < 4 {
return 0, ErrInvalidBufSize
}
binary.BigEndian.PutUint32(buf, uint32(s.size))
n, err = s.read(buf[4:])
n += 4
} else {
n, err = s.read(buf)
}
if err != nil && errors.Is(err, io.EOF) {
if s.hashPos > 0 && s.hashPos < blake2s.Size {
n += copy(buf[n:], s.sum[s.hashPos:])
s.hashPos += n
if s.hashPos < blake2s.Size {
err = nil
}
} else if s.hashPos == 0 {
s.sum = s.hashEngine.Sum(nil)
if n < len(buf) {
s.hashPos = copy(buf[n:], s.sum[s.hashPos:])
if s.hashPos < blake2s.Size {
err = nil
}
n += s.hashPos
}
}
}
return n, err
}
func (s *StreamReader) read(buf []byte) (int, error) {
var (
n int
limit int
err error
)
limit = len(buf)
err = nil
if s.size-s.pos < int64(len(buf)) {
err = io.EOF
limit = int(s.size - s.pos)
if limit < 0 {
panic("unexpected limit")
}
}
if limit > 0 {
n, _ = rand.Read(buf[:limit])
s.pos += int64(limit)
s.hashEngine.Write(buf[:limit])
}
return n, err
}
type MessageChecker struct {
expectedLen int64
hashEngine hash.Hash
pos int64
buffer *bytes.Buffer
}
func NewMessageChecker() (*MessageChecker, error) {
var (
h hash.Hash
err error
)
if h, err = blake2s.New256(nil); err != nil {
return nil, err
}
return &MessageChecker{
hashEngine: h,
expectedLen: -1,
pos: 0,
buffer: bytes.NewBuffer(nil),
}, nil
}
func (m *MessageChecker) Write(buf []byte) (int, error) {
var (
n, limit int
)
n = 0
if m.expectedLen == -1 {
if len(buf) < 4 {
return 0, ErrInvalidBufSize
}
m.expectedLen = int64(binary.BigEndian.Uint32(buf))
n = 4
}
if m.expectedLen > 0 {
limit = len(buf)
if m.pos < m.expectedLen {
if m.pos+int64(limit-n) > m.expectedLen {
limit = int(int64(limit) - (m.pos + int64(limit-n) - m.expectedLen))
m.buffer.Write(buf[limit:])
}
m.hashEngine.Write(buf[n:limit])
m.pos += int64(limit - n)
} else {
m.buffer.Write(buf[n:limit])
m.pos += int64(limit - n)
}
}
return len(buf), nil
}
func (m *MessageChecker) Close() error {
buf := m.buffer.Bytes()
if len(buf) != blake2s.Size {
return ErrInvalidTrailingLength
}
sum := m.hashEngine.Sum(nil)
if !bytes.Equal(sum, buf) {
return ErrMessageVerification
}
return nil
}

View File

@@ -0,0 +1,183 @@
package randommessage
/*
Copyright 2025 Suyono <suyono3484@gmail.com>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import (
"bytes"
"encoding/binary"
"errors"
"hash"
"hash/crc32"
"io"
"testing"
"gitea.suyono.dev/suyono/go-sizes/sizes"
"golang.org/x/crypto/blake2s"
)
func TestStreamReader(t *testing.T) {
var (
n int
h hash.Hash
testSize int64 = int64(16 * sizes.KibiByte)
mc *MessageChecker
)
sr, err := NewStreamReader(testSize)
if err != nil {
t.Fatalf("failed to initialize stream: %v", err)
}
buf := make([]byte, (10 * sizes.KibiByte).MustInt())
if n, err = sr.Read(buf); err != nil {
t.Fatalf("failed to read from stream: %v", err)
}
if int64(binary.BigEndian.Uint32(buf)) != testSize {
t.Fatal("test size failed")
}
if mc, err = NewMessageChecker(); err != nil {
t.Fatalf("failed to instantiate Message Checker: %v", err)
}
h, err = blake2s.New256(nil)
if err != nil {
t.Fatalf("failed to instanciate blake2s: %v", err)
}
h.Write(buf[4:n])
if _, err = mc.Write(buf[:n]); err != nil {
t.Fatalf("write to Message Checker failed: %v", err)
}
if n, err = sr.Read(buf); err != nil {
if !errors.Is(err, io.EOF) {
t.Fatalf("failed to read from stream: %v", err)
}
}
h.Write(buf[:n-32])
sum := h.Sum(nil)
if !bytes.Equal(buf[n-32:n], sum) {
t.Fatal("mismatch hash")
}
if _, err = mc.Write(buf[:n]); err != nil {
t.Fatalf("write to Message Checker failed: %v", err)
}
if err = mc.Close(); err != nil {
t.Fatalf("message checker: %v", err)
}
}
func TestStreamReaderCasePartialHash(t *testing.T) {
var (
n int
testSize int64 = int64(16 * sizes.KibiByte)
mc *MessageChecker
)
sr, err := NewStreamReader(testSize)
if err != nil {
t.Fatalf("failed to initialize stream: %v", err)
}
buf := make([]byte, (10 * sizes.KibiByte).MustInt())
if n, err = sr.Read(buf); err != nil {
t.Fatalf("failed to read from stream: %v", err)
}
if int64(binary.BigEndian.Uint32(buf)) != testSize {
t.Fatal("test size failed")
}
if mc, err = NewMessageChecker(); err != nil {
t.Fatalf("failed to instantiate Message Checker: %v", err)
}
if _, err = mc.Write(buf[:n]); err != nil {
t.Fatalf("write to Message Checker failed: %v", err)
}
if n, err = sr.Read(buf[:6170]); err != nil {
t.Fatalf("failed to read from stream: %v", err)
}
var m int
if m, err = sr.Read(buf[n:]); err != nil {
if !errors.Is(err, io.EOF) {
t.Fatalf("failed to read from stream: %v", err)
}
}
n += m
if _, err = mc.Write(buf[:n]); err != nil {
t.Fatalf("write to Message Checker failed: %v", err)
}
if err = mc.Close(); err != nil {
t.Fatalf("message checker: %v", err)
}
}
func TestChunkedMessage(t *testing.T) {
var (
cm *ChunkedMessage
err error
n int
crc uint32
h hash.Hash
)
if cm, err = NewChunkedReader(int64(16 * sizes.KibiByte)); err != nil {
t.Fatalf("failed to instantiate chunked reader: %v", err)
}
if h, err = blake2s.New256(nil); err != nil {
t.Fatalf("failed to instantiate blake2s: %v", err)
}
buf := make([]byte, sizes.KibiByte.MustInt())
for i := range 16 {
if n, err = cm.Read(buf); err != nil {
t.Fatalf("failed to read: %v", err)
}
crc = binary.BigEndian.Uint32(buf[n-4:])
if crc != crc32.Checksum(buf[:n-4], crc32.MakeTable(crc32.Castagnoli)) {
t.Fatalf("mismatch checksum")
}
if i == 0 {
h.Write(buf[8 : n-4])
} else {
h.Write(buf[4 : n-4])
}
}
if n, err = cm.Read(buf); err != nil && !errors.Is(err, io.EOF) {
t.Fatalf("failed to read: %v", err)
}
if err == nil {
t.Fatal("unexpected nil error")
}
crc = binary.BigEndian.Uint32(buf[n-4:])
if crc != crc32.Checksum(buf[:n-4], crc32.MakeTable(crc32.Castagnoli)) {
t.Fatalf("mismatch checksum")
}
h.Write(buf[4 : n-36])
if !bytes.Equal(h.Sum(nil), buf[n-36:n-4]) {
t.Fatal("mismatch hash")
}
}

197
testlib/server/server.go Normal file
View File

@@ -0,0 +1,197 @@
package server
/*
Copyright 2025 Suyono <suyono3484@gmail.com>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import (
"context"
"fmt"
"net"
"sync"
"time"
"gitea.suyono.dev/suyono/netbounce/slicewriter"
"github.com/rs/zerolog/log"
"github.com/spf13/viper"
)
func OpenPorts(wg *sync.WaitGroup, ctx context.Context) {
openPorts(wg, ctx)
}
func openPorts(wg *sync.WaitGroup, ctx context.Context) {
tcpPorts := viper.GetStringSlice("tcp")
for _, tcpPort := range tcpPorts {
wg.Add(1)
go listen(ctx, wg, tcpPort)
}
udpPorts := viper.GetStringSlice("udp")
for _, udpPort := range udpPorts {
wg.Add(1)
go bindUDP(ctx, wg, udpPort)
}
}
func ClosePacket(ctx context.Context, conn net.PacketConn) {
<-ctx.Done()
_ = conn.Close()
}
func bindUDP(ctx context.Context, wg *sync.WaitGroup, address string) {
defer wg.Done()
var (
conn net.PacketConn
err error
buf, b []byte
n int
addr net.Addr
)
log.Debug().Caller().Msgf("binding socket for UDP on %v", address)
if conn, err = net.ListenPacket("udp", address); err != nil {
panic(fmt.Errorf("failed to bind udp address: %v", err))
}
go ClosePacket(ctx, conn)
buf = make([]byte, 4096)
udpLoop:
for {
if n, addr, err = conn.ReadFrom(buf); err != nil && n == 0 {
select {
case <-ctx.Done():
break udpLoop
default:
}
log.Error().Err(err).Msg("failed to read packet")
continue udpLoop
}
log.Info().Str("client", addr.String()).Msgf("received message: %s", buf[:n])
select {
case <-ctx.Done():
break udpLoop
default:
}
sb := slicewriter.NewSliceWriter(buf)
if _, err = fmt.Fprintf(sb, "server: %s | UDP | %v", viper.GetString("name"), time.Now()); err != nil {
log.Error().Err(err).Msg("build server message")
}
b = sb.Bytes()
if n, err = conn.WriteTo(b, addr); err != nil {
select {
case <-ctx.Done():
break udpLoop
default:
}
log.Error().Err(err).Str("client", addr.String()).Msg("failed to write packet")
continue udpLoop
}
if n != len(b) {
log.Debug().Str("client", addr.String()).Msg("incomplete packet sent")
}
log.Info().Str("client", addr.String()).Msg("packet received and replied")
}
}
func CloseListener(ctx context.Context, listener net.Listener) {
<-ctx.Done()
_ = listener.Close()
}
func listen(ctx context.Context, wg *sync.WaitGroup, address string) {
defer wg.Done()
var (
listener net.Listener
err error
conn net.Conn
)
log.Debug().Caller().Msgf("listeng for TCP on %v", address)
if listener, err = net.Listen("tcp", address); err != nil {
log.Error().Err(err).Str("address", address).Msg("failed to listen")
return
}
go CloseListener(ctx, listener)
tcpIncoming:
for {
if conn, err = listener.Accept(); err != nil {
select {
case <-ctx.Done():
break tcpIncoming
default:
}
log.Error().Err(err).Str("address", address).Msg("failed to accept connection")
continue tcpIncoming
}
wg.Add(1)
go handleTCP(ctx, wg, conn)
}
}
func CloseConnection(ctx context.Context, conn net.Conn) {
<-ctx.Done()
_ = conn.Close()
}
func handleTCP(ctx context.Context, wg *sync.WaitGroup, conn net.Conn) {
defer wg.Done()
defer func() {
_ = conn.Close()
}()
var (
buf, b []byte
err error
n int
)
buf = make([]byte, 4096)
addr := conn.RemoteAddr()
cctx, cancel := context.WithCancel(ctx)
go CloseConnection(cctx, conn)
defer cancel()
for {
if n, err = conn.Read(buf); err != nil {
log.Error().Err(err).Str("client", addr.String()).Msg("failed to read data from TCP connection")
return
}
log.Info().Str("client", addr.String()).Msgf("received message: %s", buf[:n])
sb := slicewriter.NewSliceWriter(buf)
if _, err = fmt.Fprintf(sb, "server: %s | TCP | %v", viper.GetString("name"), time.Now()); err != nil {
log.Error().Err(err).Msg("build server message")
}
b = sb.Bytes()
if _, err = conn.Write(b); err != nil {
log.Error().Err(err).Str("client", addr.String()).Msg("failed to write data to TCP connection")
return
}
}
}