From ddc20001db8b4735a330b08eb10ced2d31f8488f Mon Sep 17 00:00:00 2001 From: Cristian Maglie Date: Fri, 14 Nov 2025 11:01:21 +0100 Subject: [PATCH 1/6] Split udp/write operation into a 3-step beginPacket/write/endPacket --- network-api/network-api.go | 77 ++++++++++++++++++++++++++++----- network-api/network-api_test.go | 27 ++++++++++-- 2 files changed, 90 insertions(+), 14 deletions(-) diff --git a/network-api/network-api.go b/network-api/network-api.go index d4ec116..8b3b18e 100644 --- a/network-api/network-api.go +++ b/network-api/network-api.go @@ -47,7 +47,9 @@ func Register(router *msgpackrouter.Router) { _ = router.RegisterMethod("tcp/connectSSL", tcpConnectSSL) _ = router.RegisterMethod("udp/connect", udpConnect) + _ = router.RegisterMethod("udp/beginPacket", udpBeginPacket) _ = router.RegisterMethod("udp/write", udpWrite) + _ = router.RegisterMethod("udp/endPacket", udpEndPacket) _ = router.RegisterMethod("udp/awaitRead", udpAwaitRead) _ = router.RegisterMethod("udp/read", udpRead) _ = router.RegisterMethod("udp/close", udpClose) @@ -58,6 +60,8 @@ var liveConnections = make(map[uint]net.Conn) var liveListeners = make(map[uint]net.Listener) var liveUdpConnections = make(map[uint]net.PacketConn) var udpReadBuffers = make(map[uint][]byte) +var udpWriteTargets = make(map[uint]*net.UDPAddr) +var udpWriteBuffers = make(map[uint][]byte) var nextConnectionID atomic.Uint32 // takeLockAndGenerateNextID generates a new unique ID for a connection or listener. @@ -375,9 +379,9 @@ func udpConnect(ctx context.Context, rpc *msgpackrpc.Connection, params []any) ( return id, nil } -func udpWrite(ctx context.Context, rpc *msgpackrpc.Connection, params []any) (_result any, _err any) { - if len(params) != 4 { - return nil, []any{1, "Invalid number of parameters, expected udpConnId, dest address, dest port, payload"} +func udpBeginPacket(ctx context.Context, rpc *msgpackrpc.Connection, params []any) (_result any, _err any) { + if len(params) != 3 { + return nil, []any{1, "Invalid number of parameters, expected udpConnId, dest address, dest port"} } id, ok := msgpackrpc.ToUint(params[0]) if !ok { @@ -391,9 +395,33 @@ func udpWrite(ctx context.Context, rpc *msgpackrpc.Connection, params []any) (_r if !ok { return nil, []any{1, "Invalid parameter type, expected uint16 for server port"} } - data, ok := params[3].([]byte) + + lock.RLock() + defer lock.RUnlock() + if _, ok := liveUdpConnections[id]; !ok { + return nil, []any{2, fmt.Sprintf("UDP connection not found for ID: %d", id)} + } + targetAddr := net.JoinHostPort(targetIP, fmt.Sprintf("%d", targetPort)) + addr, err := net.ResolveUDPAddr("udp", targetAddr) // TODO: This is inefficient, implement some caching + if err != nil { + return nil, []any{3, "Failed to resolve target address: " + err.Error()} + } + udpWriteTargets[id] = addr + udpWriteBuffers[id] = nil + return true, nil +} + +func udpWrite(ctx context.Context, rpc *msgpackrpc.Connection, params []any) (_result any, _err any) { + if len(params) != 2 { + return nil, []any{1, "Invalid number of parameters, expected expected udpConnId, payload"} + } + id, ok := msgpackrpc.ToUint(params[0]) + if !ok { + return nil, []any{1, "Invalid parameter type, expected int for UDP connection ID"} + } + data, ok := params[1].([]byte) if !ok { - if dataStr, ok := params[3].(string); ok { + if dataStr, ok := params[1].(string); ok { data = []byte(dataStr) } else { // If data is not []byte or string, return an error @@ -402,18 +430,45 @@ func udpWrite(ctx context.Context, rpc *msgpackrpc.Connection, params []any) (_r } lock.RLock() - udpConn, ok := liveUdpConnections[id] + udpBuffer, ok := udpWriteBuffers[id] + if ok { + udpWriteBuffers[id] = append(udpBuffer, data...) + } lock.RUnlock() if !ok { return nil, []any{2, fmt.Sprintf("UDP connection not found for ID: %d", id)} } + return len(data), nil +} - targetAddr := net.JoinHostPort(targetIP, fmt.Sprintf("%d", targetPort)) - addr, err := net.ResolveUDPAddr("udp", targetAddr) // TODO: This is inefficient, implement some caching - if err != nil { - return nil, []any{3, "Failed to resolve target address: " + err.Error()} +func udpEndPacket(ctx context.Context, rpc *msgpackrpc.Connection, params []any) (_result any, _err any) { + if len(params) != 1 { + return nil, []any{1, "Invalid number of parameters, expected expected udpConnId"} } - if n, err := udpConn.WriteTo(data, addr); err != nil { + id, buffExists := msgpackrpc.ToUint(params[0]) + if !buffExists { + return nil, []any{1, "Invalid parameter type, expected int for UDP connection ID"} + } + + var udpBuffer []byte + var udpAddr *net.UDPAddr + lock.RLock() + udpConn, connExists := liveUdpConnections[id] + if connExists { + udpBuffer, buffExists = udpWriteBuffers[id] + udpAddr = udpWriteTargets[id] + delete(udpWriteBuffers, id) + delete(udpWriteTargets, id) + } + lock.RUnlock() + if !connExists { + return nil, []any{2, fmt.Sprintf("UDP connection not found for ID: %d", id)} + } + if !buffExists { + return nil, []any{3, fmt.Sprintf("No UDP packet begun for ID: %d", id)} + } + + if n, err := udpConn.WriteTo(udpBuffer, udpAddr); err != nil { return nil, []any{4, "Failed to write to UDP connection: " + err.Error()} } else { return n, nil diff --git a/network-api/network-api_test.go b/network-api/network-api_test.go index 079e71b..afac674 100644 --- a/network-api/network-api_test.go +++ b/network-api/network-api_test.go @@ -248,7 +248,13 @@ func TestUDPNetworkAPI(t *testing.T) { require.NotEqual(t, conn1, conn2) { - res, err := udpWrite(ctx, nil, []any{conn1, "127.0.0.1", 9900, []byte("Hello")}) + res, err := udpBeginPacket(ctx, nil, []any{conn1, "127.0.0.1", 9900}) + require.Nil(t, err) + require.True(t, res.(bool)) + res, err = udpWrite(ctx, nil, []any{conn1, []byte("Hello")}) + require.Nil(t, err) + require.Equal(t, 5, res) + res, err = udpEndPacket(ctx, nil, []any{conn1}) require.Nil(t, err) require.Equal(t, 5, res) } @@ -262,12 +268,27 @@ func TestUDPNetworkAPI(t *testing.T) { require.Equal(t, []uint8("Hello"), res2) } { - res, err := udpWrite(ctx, nil, []any{conn1, "127.0.0.1", 9900, []byte("One")}) + res, err := udpBeginPacket(ctx, nil, []any{conn1, "127.0.0.1", 9900}) + require.Nil(t, err) + require.True(t, res.(bool)) + res, err = udpWrite(ctx, nil, []any{conn1, []byte("On")}) + require.Nil(t, err) + require.Equal(t, 2, res) + res, err = udpWrite(ctx, nil, []any{conn1, []byte("e")}) + require.Nil(t, err) + require.Equal(t, 1, res) + res, err = udpEndPacket(ctx, nil, []any{conn1}) require.Nil(t, err) require.Equal(t, 3, res) } { - res, err := udpWrite(ctx, nil, []any{conn1, "127.0.0.1", 9900, []byte("Two")}) + res, err := udpBeginPacket(ctx, nil, []any{conn1, "127.0.0.1", 9900}) + require.Nil(t, err) + require.True(t, res.(bool)) + res, err = udpWrite(ctx, nil, []any{conn1, []byte("Two")}) + require.Nil(t, err) + require.Equal(t, 3, res) + res, err = udpEndPacket(ctx, nil, []any{conn1}) require.Nil(t, err) require.Equal(t, 3, res) } From 261a9865f95504355576cbbd5daa85a048cc7ada Mon Sep 17 00:00:00 2001 From: Cristian Maglie Date: Fri, 14 Nov 2025 11:03:05 +0100 Subject: [PATCH 2/6] Renamed udp/awaitRead -> udp/awaitPacket --- network-api/network-api.go | 4 ++-- network-api/network-api_test.go | 16 ++++++++-------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/network-api/network-api.go b/network-api/network-api.go index 8b3b18e..897c60a 100644 --- a/network-api/network-api.go +++ b/network-api/network-api.go @@ -50,7 +50,7 @@ func Register(router *msgpackrouter.Router) { _ = router.RegisterMethod("udp/beginPacket", udpBeginPacket) _ = router.RegisterMethod("udp/write", udpWrite) _ = router.RegisterMethod("udp/endPacket", udpEndPacket) - _ = router.RegisterMethod("udp/awaitRead", udpAwaitRead) + _ = router.RegisterMethod("udp/awaitPacket", udpAwaitPacket) _ = router.RegisterMethod("udp/read", udpRead) _ = router.RegisterMethod("udp/close", udpClose) } @@ -475,7 +475,7 @@ func udpEndPacket(ctx context.Context, rpc *msgpackrpc.Connection, params []any) } } -func udpAwaitRead(ctx context.Context, rpc *msgpackrpc.Connection, params []any) (_result any, _err any) { +func udpAwaitPacket(ctx context.Context, rpc *msgpackrpc.Connection, params []any) (_result any, _err any) { if len(params) != 1 && len(params) != 2 { return nil, []any{1, "Invalid number of parameters, expected (UDP connection ID[, optional timeout in ms])"} } diff --git a/network-api/network-api_test.go b/network-api/network-api_test.go index afac674..d6b2aea 100644 --- a/network-api/network-api_test.go +++ b/network-api/network-api_test.go @@ -259,7 +259,7 @@ func TestUDPNetworkAPI(t *testing.T) { require.Equal(t, 5, res) } { - res, err := udpAwaitRead(ctx, nil, []any{conn2}) + res, err := udpAwaitPacket(ctx, nil, []any{conn2}) require.Nil(t, err) require.Equal(t, []any{5, "127.0.0.1", 9800}, res) @@ -293,7 +293,7 @@ func TestUDPNetworkAPI(t *testing.T) { require.Equal(t, 3, res) } { - res, err := udpAwaitRead(ctx, nil, []any{conn2}) + res, err := udpAwaitPacket(ctx, nil, []any{conn2}) require.Nil(t, err) require.Equal(t, []any{3, "127.0.0.1", 9800}, res) @@ -302,7 +302,7 @@ func TestUDPNetworkAPI(t *testing.T) { require.Equal(t, []uint8("One"), res2) } { - res, err := udpAwaitRead(ctx, nil, []any{conn2}) + res, err := udpAwaitPacket(ctx, nil, []any{conn2}) require.Nil(t, err) require.Equal(t, []any{3, "127.0.0.1", 9800}, res) @@ -337,7 +337,7 @@ func TestUDPNetworkUnboundClientAPI(t *testing.T) { require.Equal(t, 5, res) } { - res, err := udpAwaitRead(ctx, nil, []any{conn2}) + res, err := udpAwaitPacket(ctx, nil, []any{conn2}) require.Nil(t, err) require.Equal(t, 5, res.([]any)[0]) @@ -360,7 +360,7 @@ func TestUDPNetworkUnboundClientAPI(t *testing.T) { require.Equal(t, 3, res) } { - res, err := udpAwaitRead(ctx, nil, []any{conn2}) + res, err := udpAwaitPacket(ctx, nil, []any{conn2}) require.Nil(t, err) require.Equal(t, 3, res.([]any)[0]) @@ -369,7 +369,7 @@ func TestUDPNetworkUnboundClientAPI(t *testing.T) { require.Equal(t, []uint8("One"), res2) } { - res, err := udpAwaitRead(ctx, nil, []any{conn2}) + res, err := udpAwaitPacket(ctx, nil, []any{conn2}) require.Nil(t, err) require.Equal(t, 3, res.([]any)[0]) @@ -387,13 +387,13 @@ func TestUDPNetworkUnboundClientAPI(t *testing.T) { }() { start := time.Now() - res, err := udpAwaitRead(ctx, nil, []any{conn2, 10}) + res, err := udpAwaitPacket(ctx, nil, []any{conn2, 10}) require.Less(t, time.Since(start), 20*time.Millisecond) require.Equal(t, []any{5, "Timeout"}, err) require.Nil(t, res) } { - res, err := udpAwaitRead(ctx, nil, []any{conn2, 0}) + res, err := udpAwaitPacket(ctx, nil, []any{conn2, 0}) require.Nil(t, err) require.Equal(t, 5, res.([]any)[0]) From e39f602f12db1490e0cc237ebf6b6ae105157606 Mon Sep 17 00:00:00 2001 From: Cristian Maglie Date: Fri, 14 Nov 2025 11:03:23 +0100 Subject: [PATCH 3/6] Added udp/dropPacket --- network-api/network-api.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/network-api/network-api.go b/network-api/network-api.go index 897c60a..905756e 100644 --- a/network-api/network-api.go +++ b/network-api/network-api.go @@ -52,6 +52,7 @@ func Register(router *msgpackrouter.Router) { _ = router.RegisterMethod("udp/endPacket", udpEndPacket) _ = router.RegisterMethod("udp/awaitPacket", udpAwaitPacket) _ = router.RegisterMethod("udp/read", udpRead) + _ = router.RegisterMethod("udp/dropPacket", udpDropPacket) _ = router.RegisterMethod("udp/close", udpClose) } @@ -527,6 +528,24 @@ func udpAwaitPacket(ctx context.Context, rpc *msgpackrpc.Connection, params []an return []any{n, host, port}, nil } +func udpDropPacket(ctx context.Context, rpc *msgpackrpc.Connection, params []any) (_result any, _err any) { + if len(params) != 1 && len(params) != 2 { + return nil, []any{1, "Invalid number of parameters, expected (UDP connection ID[, optional timeout in ms])"} + } + id, ok := msgpackrpc.ToUint(params[0]) + if !ok { + return nil, []any{1, "Invalid parameter type, expected uint for UDP connection ID"} + } + + lock.RLock() + delete(udpReadBuffers, id) + lock.RUnlock() + if !ok { + return nil, []any{2, fmt.Sprintf("UDP connection not found for ID: %d", id)} + } + return true, nil +} + func udpRead(ctx context.Context, rpc *msgpackrpc.Connection, params []any) (_result any, _err any) { if len(params) != 2 && len(params) != 3 { return nil, []any{1, "Invalid number of parameters, expected (UDP connection ID, max bytes to read)"} From a607f4a1c6e0c53379ec86794d822b14ee9299a4 Mon Sep 17 00:00:00 2001 From: Cristian Maglie Date: Fri, 14 Nov 2025 11:05:29 +0100 Subject: [PATCH 4/6] Enforce awaitPacket behaviour on unit tests --- network-api/network-api.go | 2 +- network-api/network-api_test.go | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/network-api/network-api.go b/network-api/network-api.go index 905756e..42f59e8 100644 --- a/network-api/network-api.go +++ b/network-api/network-api.go @@ -568,7 +568,7 @@ func udpRead(ctx context.Context, rpc *msgpackrpc.Connection, params []any) (_re udpReadBuffers[id] = buffer[maxBytes:] n = maxBytes } else { - udpReadBuffers[id] = nil + delete(udpReadBuffers, id) } } lock.Unlock() diff --git a/network-api/network-api_test.go b/network-api/network-api_test.go index d6b2aea..b5d80c1 100644 --- a/network-api/network-api_test.go +++ b/network-api/network-api_test.go @@ -297,11 +297,14 @@ func TestUDPNetworkAPI(t *testing.T) { require.Nil(t, err) require.Equal(t, []any{3, "127.0.0.1", 9800}, res) - res2, err := udpRead(ctx, nil, []any{conn2, 100}) + // A partial read of a packet is allowed + res2, err := udpRead(ctx, nil, []any{conn2, 2}) require.Nil(t, err) - require.Equal(t, []uint8("One"), res2) + require.Equal(t, []uint8("On"), res2) } { + // Even if the previous packet was only partially read, + // the next packet can be received res, err := udpAwaitPacket(ctx, nil, []any{conn2}) require.Nil(t, err) require.Equal(t, []any{3, "127.0.0.1", 9800}, res) From f36efbd932d21abdb9e06bff8fa31758fa806193 Mon Sep 17 00:00:00 2001 From: Cristian Maglie Date: Fri, 14 Nov 2025 11:33:15 +0100 Subject: [PATCH 5/6] Updated tests --- network-api/network-api_test.go | 32 ++++++++++++++++++++++++++++---- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/network-api/network-api_test.go b/network-api/network-api_test.go index b5d80c1..7f24fd3 100644 --- a/network-api/network-api_test.go +++ b/network-api/network-api_test.go @@ -335,7 +335,13 @@ func TestUDPNetworkUnboundClientAPI(t *testing.T) { require.NotEqual(t, conn1, conn2) { - res, err := udpWrite(ctx, nil, []any{conn1, "127.0.0.1", 9901, []byte("Hello")}) + res, err := udpBeginPacket(ctx, nil, []any{conn1, "127.0.0.1", 9901}) + require.Nil(t, err) + require.True(t, res.(bool)) + res, err = udpWrite(ctx, nil, []any{conn1, []byte("Hello")}) + require.Nil(t, err) + require.Equal(t, 5, res) + res, err = udpEndPacket(ctx, nil, []any{conn1}) require.Nil(t, err) require.Equal(t, 5, res) } @@ -353,12 +359,24 @@ func TestUDPNetworkUnboundClientAPI(t *testing.T) { require.Equal(t, []uint8("llo"), res2) } { - res, err := udpWrite(ctx, nil, []any{conn1, "127.0.0.1", 9901, []byte("One")}) + res, err := udpBeginPacket(ctx, nil, []any{conn1, "127.0.0.1", 9901}) + require.Nil(t, err) + require.True(t, res.(bool)) + res, err = udpWrite(ctx, nil, []any{conn1, []byte("One")}) + require.Nil(t, err) + require.Equal(t, 3, res) + res, err = udpEndPacket(ctx, nil, []any{conn1}) require.Nil(t, err) require.Equal(t, 3, res) } { - res, err := udpWrite(ctx, nil, []any{conn1, "127.0.0.1", 9901, []byte("Two")}) + res, err := udpBeginPacket(ctx, nil, []any{conn1, "127.0.0.1", 9901}) + require.Nil(t, err) + require.True(t, res.(bool)) + res, err = udpWrite(ctx, nil, []any{conn1, []byte("Two")}) + require.Nil(t, err) + require.Equal(t, 3, res) + res, err = udpEndPacket(ctx, nil, []any{conn1}) require.Nil(t, err) require.Equal(t, 3, res) } @@ -384,7 +402,13 @@ func TestUDPNetworkUnboundClientAPI(t *testing.T) { // Check timeouts go func() { time.Sleep(200 * time.Millisecond) - res, err := udpWrite(ctx, nil, []any{conn1, "127.0.0.1", 9901, []byte("Three")}) + res, err := udpBeginPacket(ctx, nil, []any{conn1, "127.0.0.1", 9901}) + require.Nil(t, err) + require.True(t, res.(bool)) + res, err = udpWrite(ctx, nil, []any{conn1, []byte("Three")}) + require.Nil(t, err) + require.Equal(t, 5, res) + res, err = udpEndPacket(ctx, nil, []any{conn1}) require.Nil(t, err) require.Equal(t, 5, res) }() From dea5db021785bf6c5bb3eee1815212cd06582659 Mon Sep 17 00:00:00 2001 From: Cristian Maglie Date: Fri, 14 Nov 2025 14:52:52 +0100 Subject: [PATCH 6/6] Fix typo --- network-api/network-api.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/network-api/network-api.go b/network-api/network-api.go index 42f59e8..566d0f8 100644 --- a/network-api/network-api.go +++ b/network-api/network-api.go @@ -414,7 +414,7 @@ func udpBeginPacket(ctx context.Context, rpc *msgpackrpc.Connection, params []an func udpWrite(ctx context.Context, rpc *msgpackrpc.Connection, params []any) (_result any, _err any) { if len(params) != 2 { - return nil, []any{1, "Invalid number of parameters, expected expected udpConnId, payload"} + return nil, []any{1, "Invalid number of parameters, expected udpConnId, payload"} } id, ok := msgpackrpc.ToUint(params[0]) if !ok {