mirror of
https://github.com/Facepunch/sbox-public.git
synced 2026-04-19 05:48:07 -04:00
Network: Broadcast and Chunking compression optimization (#4417)
**Broadcast** now encodes the wire payload just once and sends the same bytes to every recipient, before we did one redundant compression per connection. This primarily reduces CPU load on the server/host. **Chunking** Large messages are now compressed before chunking rather than after. Resulting in slightly smaller payloads. The receiver now decompresses a single reassembled payload instead of decompressing every chunk independently, significantly reducing CPU load on receiving clients. **Refactor** Chunking and compression are now low-level wire concerns handled by Connection rather than being mixed into the high-level message types. The old `InternalMessageType.Chunk` enum is removed; chunk framing uses a dedicated wire flag byte alongside `FlagRaw` and `FlagCompressed`. **Results (Chunking changes)** Synthetic data, results on real payloads may differ. Benchmarks (1000 GOs / 2000 components, ~1MB payload, 500 iterations): Wire size (chunk-first-then-compress): 275KB Wire size (compress-first): 259KB (5.7% smaller) Send chunk-first: 0.85 ms/op (old) Send compress-first: 0.88 ms/op (new) Recv chunk-first: 1.16 ms/op (old) Recv compress-first: 0.34 ms/op (new, 3.4x faster)
This commit is contained in:
@@ -13,7 +13,7 @@ public static class Protocol
|
||||
/// <summary>
|
||||
/// We cannot talk to servers or clients with a network protocol different to this.
|
||||
/// </summary>
|
||||
public static int Network => 1099;
|
||||
public static int Network => 1100;
|
||||
}
|
||||
|
||||
// Api Versions
|
||||
@@ -26,6 +26,7 @@ public static class Protocol
|
||||
|
||||
|
||||
// Network Versions
|
||||
// 1100. 30th March 2026 - Compress-before-chunk, chunking moved to wire layer
|
||||
// 1099. 24th Feburary 2026 - ResourceId is now a long
|
||||
// 1098. 20th Feburary 2026 - Send network time as double
|
||||
// 1097. 13rd January 2026 - Support for binary blobs
|
||||
|
||||
@@ -880,7 +880,7 @@ public partial class GameObject
|
||||
bs.Write( RootGameObject.Id );
|
||||
bs.Write( isCulled );
|
||||
|
||||
target.SendRawMessage( bs );
|
||||
target.SendStream( bs );
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
||||
@@ -121,7 +121,7 @@ public partial class SceneNetworkSystem : GameNetworkSystem
|
||||
continue;
|
||||
|
||||
PendingSceneLoads[c.Id] = loadMsg.Id;
|
||||
c.SendRawMessage( msg );
|
||||
c.SendStream( msg );
|
||||
c.State = Connection.ChannelState.MountVPKs;
|
||||
}
|
||||
|
||||
@@ -297,7 +297,7 @@ public partial class SceneNetworkSystem : GameNetworkSystem
|
||||
bs.Write( InternalMessageType.Packed );
|
||||
|
||||
Networking.System.Serialize( output, ref bs );
|
||||
connection.SendRawMessage( bs );
|
||||
connection.SendStream( bs );
|
||||
|
||||
bs.Dispose();
|
||||
}
|
||||
|
||||
@@ -140,7 +140,7 @@ public partial class Scene : GameObject
|
||||
userCommand.Serialize( ref msg );
|
||||
}
|
||||
|
||||
connection.SendRawMessage( msg, NetFlags.UnreliableNoDelay );
|
||||
connection.SendStream( msg, NetFlags.UnreliableNoDelay );
|
||||
msg.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,12 +28,6 @@ public static partial class Networking
|
||||
if ( !IsRecordingMessages )
|
||||
return;
|
||||
|
||||
// Skip fragments and anything too large to be compressed as a single unit.
|
||||
if ( data.Length > 127 * 1024 )
|
||||
return;
|
||||
if ( data.Length > 0 && (InternalMessageType)data[0] == InternalMessageType.Chunk )
|
||||
return;
|
||||
|
||||
var category = GetMessageCategory( data );
|
||||
var bytes = data.ToArray();
|
||||
|
||||
|
||||
@@ -1,11 +1,8 @@
|
||||
using Sandbox.Compression;
|
||||
using Sandbox.Network;
|
||||
using Sandbox.Network;
|
||||
using Sandbox.Utility;
|
||||
using Sentry;
|
||||
using Steamworks;
|
||||
using Steamworks.Data;
|
||||
using System.Buffers.Binary;
|
||||
using System.Runtime.InteropServices;
|
||||
using System.Text;
|
||||
using System.Threading;
|
||||
using Steam = NativeEngine.Steam;
|
||||
@@ -28,82 +25,6 @@ public static partial class Networking
|
||||
|
||||
internal static Dictionary<string, string> ServerData { get; set; } = new();
|
||||
|
||||
private const byte FlagUncompressed = 0;
|
||||
private const byte FlagLz4 = 1;
|
||||
|
||||
/// <summary>
|
||||
/// The minimum byte count required to compress using LZ4 encoding. This number
|
||||
/// was chosen because the overhead is often not worth it otherwise.
|
||||
/// </summary>
|
||||
private const int MinimumCompressionByteCount = 128;
|
||||
|
||||
/// <summary>
|
||||
/// Try to encode the data from the specified <see cref="ByteStream"/> using LZ4 encoding.
|
||||
/// If the data is less than the required byte count, the data will not be compressed.
|
||||
/// </summary>
|
||||
internal static byte[] EncodeStream( ByteStream stream )
|
||||
{
|
||||
var src = stream.ToSpan();
|
||||
|
||||
// Compress only if it’s large enough
|
||||
if ( src.Length > MinimumCompressionByteCount )
|
||||
{
|
||||
var compressed = LZ4.CompressBlock( src );
|
||||
|
||||
// Only keep compression if it actually helped
|
||||
if ( compressed.Length < src.Length )
|
||||
{
|
||||
var output = new byte[1 + sizeof( int ) + compressed.Length];
|
||||
output[0] = FlagLz4;
|
||||
|
||||
BinaryPrimitives.WriteInt32LittleEndian( output.AsSpan( 1 ), src.Length );
|
||||
compressed.CopyTo( output.AsSpan( 1 + sizeof( int ) ) );
|
||||
|
||||
return output;
|
||||
}
|
||||
}
|
||||
|
||||
var result = new byte[1 + sizeof( int ) + src.Length];
|
||||
result[0] = FlagUncompressed;
|
||||
|
||||
BinaryPrimitives.WriteInt32LittleEndian( result.AsSpan( 1 ), src.Length );
|
||||
src.CopyTo( result.AsSpan( 1 + sizeof( int ) ) );
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
private static readonly byte[] ReceiveBuffer = new byte[1024 * 1024 * 4];
|
||||
|
||||
/// <summary>
|
||||
/// Try to decode the supplied data using LZ4. If the data cannot be decompressed, then the
|
||||
/// original data will be returned.
|
||||
/// </summary>
|
||||
internal static Span<byte> DecodeStream( byte[] data )
|
||||
{
|
||||
if ( data.Length < 1 + sizeof( int ) )
|
||||
return data;
|
||||
|
||||
var flag = data[0];
|
||||
var originalLen = BinaryPrimitives.ReadInt32LittleEndian( data.AsSpan( 1, sizeof( int ) ) );
|
||||
ReadOnlySpan<byte> payload = data.AsSpan( 1 + sizeof( int ) );
|
||||
|
||||
switch ( flag )
|
||||
{
|
||||
case FlagUncompressed:
|
||||
return MemoryMarshal.CreateSpan( ref MemoryMarshal.GetArrayDataReference( data ), data.Length )
|
||||
.Slice( 1 + sizeof( int ), originalLen );
|
||||
case FlagLz4:
|
||||
{
|
||||
int written = LZ4.DecompressBlock( payload.ToArray(), ReceiveBuffer );
|
||||
var result = ReceiveBuffer.AsSpan( 0, written );
|
||||
TryRecordMessage( result );
|
||||
return result;
|
||||
}
|
||||
default:
|
||||
return data;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Set data about the current server or lobby. Other players can query this
|
||||
/// when searching for a game. Note: for now, try to keep the key and value as short
|
||||
|
||||
@@ -0,0 +1,124 @@
|
||||
using Sandbox.Network;
|
||||
using System.Buffers;
|
||||
using System.Buffers.Binary;
|
||||
using System.IO;
|
||||
|
||||
namespace Sandbox;
|
||||
|
||||
public abstract partial class Connection
|
||||
{
|
||||
// Per-connection chunk reassembly buffer — rented from ArrayPool for the duration of a
|
||||
// single chunked message assembly, then returned immediately after delivery.
|
||||
// _chunkBufferLength == -1 signals "no assembly in progress".
|
||||
byte[] _chunkBuffer;
|
||||
int _chunkBufferLength = -1;
|
||||
uint _chunkExpectedIndex;
|
||||
|
||||
/// <summary>
|
||||
/// Entry point for all incoming transport packets. Handles chunk reassembly transparently;
|
||||
/// fully assembled (or non-chunked) payloads are decoded and dispatched to <paramref name="handler"/>.
|
||||
/// </summary>
|
||||
internal void OnRawPacketReceived( ReadOnlySpan<byte> rawPacket, NetworkSystem.MessageHandler handler )
|
||||
{
|
||||
if ( rawPacket.Length < 1 ) return;
|
||||
|
||||
if ( rawPacket[0] == FlagChunk )
|
||||
{
|
||||
AssembleChunk( rawPacket, handler );
|
||||
return;
|
||||
}
|
||||
|
||||
DeliverDecoded( rawPacket, handler );
|
||||
}
|
||||
|
||||
private void AssembleChunk( ReadOnlySpan<byte> rawPacket, NetworkSystem.MessageHandler handler )
|
||||
{
|
||||
if ( rawPacket.Length < 9 ) throw new InvalidDataException( "Chunk packet too short" );
|
||||
|
||||
var index = BinaryPrimitives.ReadUInt32LittleEndian( rawPacket.Slice( 1 ) );
|
||||
var total = BinaryPrimitives.ReadUInt32LittleEndian( rawPacket.Slice( 5 ) );
|
||||
var chunkData = rawPacket.Slice( 9 );
|
||||
|
||||
if ( index + 1 > total ) throw new InvalidDataException( $"chunkIndex {index} >= total {total}" );
|
||||
if ( total <= 1 ) throw new InvalidDataException( "Chunk total must be > 1" );
|
||||
if ( total > 1024 ) throw new InvalidDataException( $"Chunk total {total} exceeds 1024 limit" );
|
||||
|
||||
Log.Trace( $"Reading Chunk {index + 1} of {total} ({chunkData.Length}b, from {this})" );
|
||||
|
||||
if ( index == 0 )
|
||||
{
|
||||
// Pre-size to the upper bound: total chunks × max chunk size.
|
||||
if ( _chunkBuffer != null )
|
||||
ArrayPool<byte>.Shared.Return( _chunkBuffer );
|
||||
_chunkBuffer = ArrayPool<byte>.Shared.Rent( (int)total * MaxChunkSize );
|
||||
_chunkBufferLength = 0;
|
||||
_chunkExpectedIndex = 0;
|
||||
}
|
||||
|
||||
//
|
||||
// This can happen when leaving a lobby (usually during connect) and then rejoining:
|
||||
// packets from the previous connection arrive after the new one has started.
|
||||
//
|
||||
if ( _chunkBufferLength < 0 )
|
||||
throw new InvalidDataException( $"Received chunk {index + 1} of {total} with no assembly in progress for {this}" );
|
||||
|
||||
// Chunks must arrive in strict sequential order. Out-of-order or duplicate chunks
|
||||
// would silently corrupt the reassembled payload.
|
||||
if ( index != _chunkExpectedIndex )
|
||||
{
|
||||
_chunkBufferLength = -1;
|
||||
throw new InvalidDataException( $"Expected chunk {_chunkExpectedIndex} but received {index} of {total} from {this}" );
|
||||
}
|
||||
|
||||
_chunkExpectedIndex++;
|
||||
|
||||
if ( chunkData.Length > MaxChunkSize )
|
||||
throw new InvalidDataException( $"Chunk payload {chunkData.Length}b exceeds MaxChunkSize ({MaxChunkSize}b) from {this}" );
|
||||
|
||||
if ( _chunkBufferLength + chunkData.Length > _chunkBuffer.Length )
|
||||
throw new InvalidDataException( $"Chunk overflows reassembly buffer ({_chunkBufferLength} + {chunkData.Length} > {_chunkBuffer.Length}) from {this}" );
|
||||
|
||||
chunkData.CopyTo( _chunkBuffer.AsSpan( _chunkBufferLength ) );
|
||||
_chunkBufferLength += chunkData.Length;
|
||||
|
||||
Log.Trace( $"Chunk buffer is now {_chunkBufferLength}b" );
|
||||
|
||||
if ( index + 1 < total ) return; // Not the final chunk yet.
|
||||
|
||||
var assembledLength = _chunkBufferLength;
|
||||
var assembledBuffer = _chunkBuffer;
|
||||
_chunkBufferLength = -1;
|
||||
_chunkBuffer = null;
|
||||
|
||||
try
|
||||
{
|
||||
DeliverDecoded( assembledBuffer.AsSpan( 0, assembledLength ), handler );
|
||||
}
|
||||
finally
|
||||
{
|
||||
ArrayPool<byte>.Shared.Return( assembledBuffer );
|
||||
}
|
||||
}
|
||||
|
||||
private void DeliverDecoded( ReadOnlySpan<byte> encoded, NetworkSystem.MessageHandler handler )
|
||||
{
|
||||
var decoded = DecodeStream( encoded );
|
||||
using var stream = ByteStream.CreateReader( decoded );
|
||||
handler( new NetworkSystem.NetworkMessage { Source = this, Data = stream } );
|
||||
MessagesRecieved++;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns the rented chunk reassembly buffer to the shared pool, if one is held.
|
||||
/// Called from <see cref="Connection.Close"/>.
|
||||
/// </summary>
|
||||
internal void ReleaseChunkBuffer()
|
||||
{
|
||||
if ( _chunkBuffer != null )
|
||||
{
|
||||
ArrayPool<byte>.Shared.Return( _chunkBuffer );
|
||||
_chunkBuffer = null;
|
||||
_chunkBufferLength = -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -22,7 +22,7 @@ public abstract partial class Connection
|
||||
msg.Write( InternalMessageType.Packed );
|
||||
|
||||
System.Serialize( t, ref msg );
|
||||
SendRawMessage( msg );
|
||||
SendStream( msg );
|
||||
|
||||
msg.Dispose();
|
||||
|
||||
@@ -44,7 +44,7 @@ public abstract partial class Connection
|
||||
msg.Write( InternalMessageType.Packed );
|
||||
|
||||
System.Serialize( t, ref msg );
|
||||
SendRawMessage( msg );
|
||||
SendStream( msg );
|
||||
|
||||
msg.Dispose();
|
||||
}
|
||||
|
||||
@@ -0,0 +1,104 @@
|
||||
using Sandbox.Compression;
|
||||
using Sandbox.Network;
|
||||
using System.Buffers.Binary;
|
||||
using System.IO;
|
||||
|
||||
namespace Sandbox;
|
||||
|
||||
public abstract partial class Connection
|
||||
{
|
||||
// Transport-level wire flags — the first byte of every packet sent or received.
|
||||
internal const byte FlagRaw = 0; // [FlagRaw:1][data:N] — no origLen needed
|
||||
internal const byte FlagCompressed = 1; // [FlagCompressed:1][origLen:4][lz4:N]
|
||||
internal const byte FlagChunk = 2; // [FlagChunk:1][index:4][total:4][data:N]
|
||||
|
||||
private const int MinimumCompressionByteCount = 128;
|
||||
|
||||
/// <summary>
|
||||
/// Upper bound on decompressed payload size. Any <see cref="FlagCompressed"/> packet claiming a
|
||||
/// larger original length is treated as corrupt/malicious and rejected.
|
||||
/// </summary>
|
||||
private const int MaxDecompressedByteSize = 256 * 1024 * 1024;
|
||||
|
||||
/// <summary>
|
||||
/// Encode a <see cref="ByteStream"/> into a wire-ready byte array, applying LZ4 when beneficial.
|
||||
/// Wire format: <c>[FlagRaw][data]</c> or <c>[FlagCompressed][origLen:4][lz4data]</c>.
|
||||
/// </summary>
|
||||
internal static byte[] EncodeStream( ByteStream stream )
|
||||
{
|
||||
var src = stream.ToSpan();
|
||||
|
||||
if ( src.Length > MinimumCompressionByteCount )
|
||||
{
|
||||
var compressed = LZ4.CompressBlock( src );
|
||||
|
||||
if ( compressed.Length < src.Length )
|
||||
{
|
||||
var output = new byte[1 + sizeof( int ) + compressed.Length];
|
||||
output[0] = FlagCompressed;
|
||||
BinaryPrimitives.WriteInt32LittleEndian( output.AsSpan( 1 ), src.Length );
|
||||
compressed.CopyTo( output.AsSpan( 1 + sizeof( int ) ) );
|
||||
return output;
|
||||
}
|
||||
}
|
||||
|
||||
var result = new byte[1 + src.Length];
|
||||
result[0] = FlagRaw;
|
||||
src.CopyTo( result.AsSpan( 1 ) );
|
||||
return result;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Decode a wire payload (<see cref="FlagRaw"/> or <see cref="FlagCompressed"/>).
|
||||
/// <see cref="FlagChunk"/> packets must be fully reassembled by <see cref="OnRawPacketReceived"/>
|
||||
/// before reaching this method.
|
||||
/// </summary>
|
||||
internal static ReadOnlySpan<byte> DecodeStream( ReadOnlySpan<byte> data )
|
||||
{
|
||||
if ( data.Length < 1 )
|
||||
return ReadOnlySpan<byte>.Empty;
|
||||
|
||||
switch ( data[0] )
|
||||
{
|
||||
case FlagRaw:
|
||||
return data.Slice( 1 );
|
||||
case FlagCompressed:
|
||||
{
|
||||
const int headerSize = 1 + sizeof( int ); // flag + origLen
|
||||
if ( data.Length < headerSize )
|
||||
throw new InvalidDataException( $"Compressed packet too short ({data.Length}b, need {headerSize}b)" );
|
||||
|
||||
var origLen = BinaryPrimitives.ReadInt32LittleEndian( data.Slice( 1, sizeof( int ) ) );
|
||||
if ( origLen <= 0 || origLen > MaxDecompressedByteSize )
|
||||
throw new InvalidDataException( $"Compressed origLen {origLen} out of range (1..{MaxDecompressedByteSize})" );
|
||||
|
||||
var lz4Data = data.Slice( headerSize );
|
||||
var decompressed = new byte[origLen];
|
||||
int written = LZ4.DecompressBlock( lz4Data, decompressed );
|
||||
|
||||
if ( written != origLen )
|
||||
throw new InvalidDataException( $"LZ4 decompressed {written}b but header claimed {origLen}b" );
|
||||
|
||||
Networking.TryRecordMessage( decompressed );
|
||||
return decompressed;
|
||||
}
|
||||
default:
|
||||
throw new InvalidOperationException( $"Unknown wire flag {data[0]}" );
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Build a transport-level chunk packet: <c>[FlagChunk:1][index:4][total:4][data:N]</c>.
|
||||
/// Chunks carry raw slices of an already-encoded (possibly compressed) payload.
|
||||
/// Chunk header is 9 bytes vs the old 14-byte envelope.
|
||||
/// </summary>
|
||||
internal static byte[] BuildChunkPacket( byte[] encoded, int offset, int length, int chunkIndex, int totalChunks )
|
||||
{
|
||||
var result = new byte[1 + sizeof( uint ) + sizeof( uint ) + length]; // 9-byte header
|
||||
result[0] = FlagChunk;
|
||||
BinaryPrimitives.WriteUInt32LittleEndian( result.AsSpan( 1 ), (uint)chunkIndex );
|
||||
BinaryPrimitives.WriteUInt32LittleEndian( result.AsSpan( 5 ), (uint)totalChunks );
|
||||
encoded.AsSpan( offset, length ).CopyTo( result.AsSpan( 9 ) );
|
||||
return result;
|
||||
}
|
||||
}
|
||||
@@ -10,7 +10,7 @@ namespace Sandbox;
|
||||
[Expose, ActionGraphIgnore]
|
||||
public abstract partial class Connection
|
||||
{
|
||||
internal abstract void InternalSend( ByteStream stream, NetFlags flags );
|
||||
internal abstract void InternalSend( byte[] data, NetFlags flags );
|
||||
internal abstract void InternalRecv( NetworkSystem.MessageHandler handler );
|
||||
internal abstract void InternalClose( int closeCode, string closeReason );
|
||||
|
||||
@@ -280,7 +280,7 @@ public abstract partial class Connection
|
||||
|
||||
System.Serialize( t, ref msg );
|
||||
|
||||
SendRawMessage( msg );
|
||||
SendStream( msg );
|
||||
msg.Dispose();
|
||||
}
|
||||
|
||||
@@ -307,44 +307,43 @@ public abstract partial class Connection
|
||||
|
||||
System.Serialize( t, ref msg );
|
||||
|
||||
SendRawMessage( msg, flags );
|
||||
SendStream( msg, flags );
|
||||
msg.Dispose();
|
||||
}
|
||||
|
||||
internal virtual void SendRawMessage( ByteStream stream, NetFlags flags = NetFlags.Reliable )
|
||||
{
|
||||
// Note: this is basically quater of k_cbMaxSteamNetworkingSocketsMessageSizeSend
|
||||
var maxChunkSize = 128 * 1024;
|
||||
var isReliableMessage = (flags & NetFlags.Reliable) != 0;
|
||||
/// <summary>
|
||||
/// Virtual override point — exists so <see cref="MockConnection"/> can intercept the raw
|
||||
/// <see cref="ByteStream"/> for routing through the host before encoding happens.
|
||||
/// All other code should prefer calling <see cref="Send"/> with already-encoded bytes.
|
||||
/// </summary>
|
||||
internal virtual void SendStream( ByteStream stream, NetFlags flags = NetFlags.Reliable ) => Send( EncodeStream( stream ), flags );
|
||||
|
||||
if ( !isReliableMessage || stream.Length < maxChunkSize )
|
||||
/// <summary>
|
||||
/// Send an already wire-encoded payload. Chunks it into <see cref="MaxChunkSize"/> packets
|
||||
/// if the payload exceeds the threshold and the message is reliable.
|
||||
/// </summary>
|
||||
internal const int MaxChunkSize = 128 * 1024; // ~quarter of Steam's k_cbMaxSteamNetworkingSocketsMessageSizeSend
|
||||
|
||||
internal virtual void Send( byte[] encoded, NetFlags flags )
|
||||
{
|
||||
var isReliable = (flags & NetFlags.Reliable) != 0;
|
||||
|
||||
if ( !isReliable || encoded.Length < MaxChunkSize )
|
||||
{
|
||||
InternalSend( stream, flags );
|
||||
InternalSend( encoded, flags );
|
||||
return;
|
||||
}
|
||||
|
||||
//
|
||||
// Split messages into multiple parts, this should hardly ever happen.
|
||||
//
|
||||
var chunks = (encoded.Length / (float)MaxChunkSize).CeilToInt();
|
||||
|
||||
var chunkHeader = 32;
|
||||
var chunks = (stream.Length / (float)maxChunkSize).CeilToInt();
|
||||
Log.Trace( $"splitting {encoded.Length}b into {chunks} chunks" );
|
||||
|
||||
Log.Trace( $"splitting {stream.Length} bytes into {chunks} {maxChunkSize}b chunks" );
|
||||
|
||||
for ( int i = 0; i < chunks; i++ )
|
||||
for ( var i = 0; i < chunks; i++ )
|
||||
{
|
||||
using ByteStream chunkMessage = ByteStream.Create( maxChunkSize + chunkHeader );
|
||||
chunkMessage.Write( InternalMessageType.Chunk );
|
||||
chunkMessage.Write( (uint)i );
|
||||
chunkMessage.Write( (uint)chunks );
|
||||
chunkMessage.Write( stream, i * maxChunkSize, maxChunkSize );
|
||||
|
||||
Log.Trace( $"Chunk {i + 1} is {chunkMessage.Length}b" );
|
||||
|
||||
InternalSend( chunkMessage, flags );
|
||||
|
||||
chunkMessage.Dispose();
|
||||
var offset = i * MaxChunkSize;
|
||||
var length = Math.Min( MaxChunkSize, encoded.Length - offset );
|
||||
var chunk = BuildChunkPacket( encoded, offset, length, i, chunks );
|
||||
InternalSend( chunk, flags );
|
||||
}
|
||||
}
|
||||
|
||||
@@ -363,6 +362,7 @@ public abstract partial class Connection
|
||||
|
||||
internal void Close( int reasonCode, string reasonString )
|
||||
{
|
||||
ReleaseChunkBuffer();
|
||||
InternalClose( reasonCode, reasonString );
|
||||
}
|
||||
|
||||
|
||||
@@ -143,7 +143,7 @@ public abstract partial class GameNetworkSystem : IDisposable
|
||||
}
|
||||
else
|
||||
{
|
||||
targetConnection.SendRawMessage( bs, flags );
|
||||
targetConnection.SendStream( bs, flags );
|
||||
}
|
||||
}
|
||||
|
||||
@@ -166,7 +166,7 @@ public abstract partial class GameNetworkSystem : IDisposable
|
||||
Log.Warning( e, $"Error when trying to network serialize object: {e.Message}" );
|
||||
}
|
||||
|
||||
connection.SendRawMessage( bs, flags );
|
||||
connection.SendStream( bs, flags );
|
||||
bs.Dispose();
|
||||
}
|
||||
|
||||
@@ -188,7 +188,7 @@ public abstract partial class GameNetworkSystem : IDisposable
|
||||
Log.Warning( e, $"Error when trying to network serialize object: {e.Message}" );
|
||||
}
|
||||
|
||||
connection.SendRawMessage( bs, flags );
|
||||
connection.SendStream( bs, flags );
|
||||
|
||||
bs.Dispose();
|
||||
}
|
||||
|
||||
@@ -16,11 +16,6 @@ internal enum InternalMessageType : byte
|
||||
HeartbeatPing,
|
||||
HeartbeatPong,
|
||||
|
||||
/// <summary>
|
||||
/// Is multiple chunks of the same message
|
||||
/// </summary>
|
||||
Chunk,
|
||||
|
||||
/// <summary>
|
||||
/// Is a struct packed using TypeLibrary
|
||||
/// </summary>
|
||||
|
||||
@@ -91,8 +91,6 @@ internal partial class NetworkSystem
|
||||
Connection?.GetIncomingMessages( HandleIncomingMessage );
|
||||
}
|
||||
|
||||
MemoryStream chunkStream;
|
||||
|
||||
void HandleIncomingMessage( NetworkMessage msg )
|
||||
{
|
||||
// Conna: If this message is not from the host and we're still connecting, ignore it.
|
||||
@@ -140,51 +138,6 @@ internal partial class NetworkSystem
|
||||
return;
|
||||
}
|
||||
|
||||
if ( type == InternalMessageType.Chunk )
|
||||
{
|
||||
var index = msg.Data.Read<uint>();
|
||||
var total = msg.Data.Read<uint>();
|
||||
|
||||
if ( index < 0 ) throw new InvalidDataException();
|
||||
if ( index + 1 > total ) throw new InvalidDataException();
|
||||
if ( total <= 1 ) throw new InvalidDataException();
|
||||
if ( total > 1024 ) throw new InvalidDataException();
|
||||
|
||||
if ( index == 0 )
|
||||
{
|
||||
chunkStream = new MemoryStream();
|
||||
}
|
||||
|
||||
unsafe
|
||||
{
|
||||
Log.Trace( $"Reading Chunk {index + 1} of {total} (chunk is {msg.Data.ReadRemaining}b)" );
|
||||
|
||||
//
|
||||
// This can happen when leaving a lobby (usually during connect), and then rejoining it..
|
||||
// getting packets sennt during previous connection. Maybe need smarter headers to avoid it.
|
||||
//
|
||||
Assert.NotNull( chunkStream, $"Reading chunk {index + 1} but not started a chunk!" );
|
||||
|
||||
chunkStream.Write( msg.Data.GetRemainingBytes() );
|
||||
Log.Trace( $"Total chuunk stream is now {chunkStream.Length}b long" );
|
||||
}
|
||||
|
||||
if ( index + 1 == total )
|
||||
{
|
||||
var constructedMessage = new NetworkMessage();
|
||||
constructedMessage.Source = msg.Source;
|
||||
constructedMessage.Data = ByteStream.CreateReader( chunkStream.ToArray() ); //todo make suck less
|
||||
|
||||
chunkStream = null;
|
||||
|
||||
HandleIncomingMessage( constructedMessage );
|
||||
|
||||
constructedMessage.Data.Dispose();
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
var responseTo = Guid.Empty;
|
||||
var requestGuid = Guid.Empty;
|
||||
|
||||
@@ -339,7 +292,7 @@ internal partial class NetworkSystem
|
||||
ByteStream bs = ByteStream.Create( 512 );
|
||||
bs.Write( InternalMessageType.HeartbeatPong );
|
||||
bs.Write( serverRealTime ); // the time they sent
|
||||
source.SendRawMessage( bs );
|
||||
source.SendStream( bs );
|
||||
bs.Dispose();
|
||||
}
|
||||
|
||||
|
||||
@@ -20,18 +20,15 @@ internal partial class NetworkSystem
|
||||
.Distinct();
|
||||
}
|
||||
|
||||
// Encode once so every recipient gets the same wire bytes without re-compressing per connection.
|
||||
var compressed = Connection.EncodeStream( msg );
|
||||
|
||||
foreach ( var c in availableConnections )
|
||||
{
|
||||
if ( c == Connection.Local )
|
||||
continue;
|
||||
|
||||
if ( c.State < minimumState )
|
||||
continue;
|
||||
|
||||
if ( filter.HasValue && !filter.Value.IsRecipient( c ) )
|
||||
continue;
|
||||
|
||||
c.SendRawMessage( msg, flags );
|
||||
if ( c == Connection.Local ) continue;
|
||||
if ( c.State < minimumState ) continue;
|
||||
if ( filter.HasValue && !filter.Value.IsRecipient( c ) ) continue;
|
||||
c.Send( compressed, flags );
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -361,7 +361,7 @@ internal partial class NetworkSystem
|
||||
bs.Write( InternalMessageType.HeartbeatPing );
|
||||
bs.Write( RealTime.Now ); // Real time
|
||||
bs.Write( Time.NowDouble ); // Game time
|
||||
c.SendRawMessage( bs );
|
||||
c.SendStream( bs );
|
||||
bs.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -127,7 +127,7 @@ internal class StringTable
|
||||
bs.Write( Name );
|
||||
|
||||
BuildSnapshotMessage( ref bs );
|
||||
source.SendRawMessage( bs );
|
||||
source.SendStream( bs );
|
||||
|
||||
bs.Dispose();
|
||||
}
|
||||
|
||||
@@ -13,7 +13,7 @@ internal class EmptyConnection : Connection
|
||||
|
||||
internal override void InternalClose( int closeCode, string closeReason ) { }
|
||||
internal override void InternalRecv( NetworkSystem.MessageHandler handler ) { }
|
||||
internal override void InternalSend( ByteStream stream, NetFlags flags ) { }
|
||||
internal override void InternalSend( byte[] data, NetFlags flags ) { }
|
||||
|
||||
public EmptyConnection( Guid id )
|
||||
{
|
||||
|
||||
@@ -11,7 +11,7 @@ internal class LocalConnection : Connection
|
||||
|
||||
internal override void InternalClose( int closeCode, string closeReason ) { }
|
||||
internal override void InternalRecv( NetworkSystem.MessageHandler handler ) { }
|
||||
internal override void InternalSend( ByteStream stream, NetFlags flags ) { }
|
||||
internal override void InternalSend( byte[] data, NetFlags flags ) { }
|
||||
|
||||
public LocalConnection( Guid id )
|
||||
{
|
||||
@@ -32,24 +32,13 @@ internal class MockConnection : Connection
|
||||
|
||||
internal override void InternalClose( int closeCode, string closeReason ) { }
|
||||
internal override void InternalRecv( NetworkSystem.MessageHandler handler ) { }
|
||||
internal override void InternalSend( ByteStream stream, NetFlags flags ) { }
|
||||
internal override void InternalSend( byte[] data, NetFlags flags ) { }
|
||||
|
||||
internal override void SendRawMessage( ByteStream stream, NetFlags flags = NetFlags.Reliable )
|
||||
internal override void SendStream( ByteStream stream, NetFlags flags = NetFlags.Reliable )
|
||||
{
|
||||
// If we're a mock connection - we don't have a direct connection. We're probably
|
||||
// on a dedicated server, so let's route through the host.
|
||||
|
||||
var availableHost = Host;
|
||||
|
||||
if ( availableHost is null or MockConnection )
|
||||
{
|
||||
if ( Networking.Debug )
|
||||
{
|
||||
Log.Warning( "MockConnection.SendRawMessage: no available host to route through!" );
|
||||
}
|
||||
|
||||
// Route unencoded messages through the host — we don't have a direct transport.
|
||||
if ( !TryGetRoutableHost( out var host ) )
|
||||
return;
|
||||
}
|
||||
|
||||
var wrapper = new TargetedMessage
|
||||
{
|
||||
@@ -59,7 +48,45 @@ internal class MockConnection : Connection
|
||||
Flags = (byte)flags
|
||||
};
|
||||
|
||||
availableHost.SendMessage( wrapper, flags );
|
||||
host.SendMessage( wrapper, flags );
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Override for pre-encoded payloads (e.g. from Broadcast's single-encode path).
|
||||
/// Decodes the wire bytes and routes through the host via <see cref="TargetedInternalMessage"/>.
|
||||
/// </summary>
|
||||
internal override void Send( byte[] encoded, NetFlags flags )
|
||||
{
|
||||
if ( !TryGetRoutableHost( out var host ) )
|
||||
return;
|
||||
|
||||
var decoded = DecodeStream( encoded );
|
||||
|
||||
var wrapper = new TargetedInternalMessage
|
||||
{
|
||||
SenderId = Local.Id,
|
||||
TargetId = Id,
|
||||
Data = decoded.ToArray(),
|
||||
Flags = (byte)flags
|
||||
};
|
||||
|
||||
host.SendMessage( wrapper, flags );
|
||||
}
|
||||
|
||||
private bool TryGetRoutableHost( out Connection host )
|
||||
{
|
||||
host = Host;
|
||||
|
||||
if ( host is null or MockConnection )
|
||||
{
|
||||
if ( Networking.Debug )
|
||||
Log.Warning( $"MockConnection: no available host to route through for {this}" );
|
||||
|
||||
host = null;
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
public MockConnection( Guid id )
|
||||
|
||||
@@ -56,12 +56,11 @@ internal unsafe class SteamLobbyConnection : Connection, IValid
|
||||
}
|
||||
}
|
||||
|
||||
internal override void InternalSend( ByteStream stream, NetFlags flags )
|
||||
internal override void InternalSend( byte[] data, NetFlags flags )
|
||||
{
|
||||
byte[] output = Networking.EncodeStream( stream );
|
||||
var steamFlags = flags.ToSteamFlags();
|
||||
steamFlags |= 32; // k_nSteamNetworkingSend_AutoRestartBrokenSession
|
||||
Lobby.SendMessage( Friend.Id, output, steamFlags );
|
||||
Lobby.SendMessage( Friend.Id, data, steamFlags );
|
||||
}
|
||||
|
||||
internal override void InternalRecv( NetworkSystem.MessageHandler handler )
|
||||
|
||||
@@ -392,18 +392,7 @@ internal class SteamLobbySocket : NetworkSocket, ILobby
|
||||
if ( !Connections.TryGetValue( msg.SteamId, out var connection ) )
|
||||
continue;
|
||||
|
||||
Span<byte> data = Networking.DecodeStream( msg.Data );
|
||||
|
||||
using var stream = ByteStream.CreateReader( data );
|
||||
|
||||
var nwm = new NetworkSystem.NetworkMessage
|
||||
{
|
||||
Data = stream,
|
||||
Source = connection
|
||||
};
|
||||
|
||||
connection.MessagesRecieved++;
|
||||
handler( nwm );
|
||||
connection.OnRawPacketReceived( msg.Data, handler );
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -143,12 +143,11 @@ internal static partial class SteamNetwork
|
||||
return await base.OnReceiveUserInfo( info );
|
||||
}
|
||||
|
||||
internal override void InternalSend( ByteStream stream, NetFlags flags )
|
||||
internal override void InternalSend( byte[] data, NetFlags flags )
|
||||
{
|
||||
if ( !Socket.IsValid() )
|
||||
return;
|
||||
|
||||
byte[] data = Networking.EncodeStream( stream );
|
||||
Socket.SendMessage( Handle, data, flags.ToSteamFlags() );
|
||||
}
|
||||
|
||||
@@ -339,14 +338,12 @@ internal static partial class SteamNetwork
|
||||
count = 0;
|
||||
}
|
||||
|
||||
internal override void InternalSend( ByteStream stream, NetFlags flags )
|
||||
internal override void InternalSend( byte[] data, NetFlags flags )
|
||||
{
|
||||
byte[] output = Networking.EncodeStream( stream );
|
||||
|
||||
var message = new OutgoingSteamMessage
|
||||
{
|
||||
Connection = handle,
|
||||
Data = output,
|
||||
Data = data,
|
||||
Flags = flags.ToSteamFlags()
|
||||
};
|
||||
|
||||
@@ -357,17 +354,7 @@ internal static partial class SteamNetwork
|
||||
{
|
||||
while ( IncomingMessages.Reader.TryRead( out var msg ) )
|
||||
{
|
||||
Span<byte> output = Networking.DecodeStream( msg.Data );
|
||||
|
||||
using ByteStream stream = ByteStream.CreateReader( output );
|
||||
|
||||
var nwm = new NetworkSystem.NetworkMessage
|
||||
{
|
||||
Data = stream,
|
||||
Source = this
|
||||
};
|
||||
|
||||
handler( nwm );
|
||||
OnRawPacketReceived( msg.Data, handler );
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -226,18 +226,7 @@ internal static partial class SteamNetwork
|
||||
if ( !Connections.TryGetValue( msg.Connection.Id, out var connection ) )
|
||||
continue;
|
||||
|
||||
Span<byte> data = Networking.DecodeStream( msg.Data );
|
||||
|
||||
using var stream = ByteStream.CreateReader( data );
|
||||
|
||||
var nwm = new NetworkSystem.NetworkMessage
|
||||
{
|
||||
Data = stream,
|
||||
Source = connection
|
||||
};
|
||||
|
||||
connection.MessagesRecieved++;
|
||||
handler( nwm );
|
||||
connection.OnRawPacketReceived( msg.Data, handler );
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -160,7 +160,7 @@ internal class TcpChannel : Connection
|
||||
}
|
||||
}
|
||||
|
||||
internal override void InternalSend( ByteStream stream, NetFlags flags )
|
||||
internal override void InternalSend( byte[] output, NetFlags flags )
|
||||
{
|
||||
if ( !client.Connected )
|
||||
return;
|
||||
@@ -172,8 +172,6 @@ internal class TcpChannel : Connection
|
||||
return;
|
||||
}
|
||||
|
||||
byte[] output = Networking.EncodeStream( stream );
|
||||
|
||||
if ( Networking.FakeLag > 0 )
|
||||
{
|
||||
fakeLagOutgoing.Enqueue( (output, Networking.FakeLag / 1000f) );
|
||||
@@ -231,31 +229,18 @@ internal class TcpChannel : Connection
|
||||
{
|
||||
while ( incoming.Reader.TryRead( out byte[] data ) )
|
||||
{
|
||||
Span<byte> output = Networking.DecodeStream( data );
|
||||
|
||||
if ( Networking.FakeLag > 0 )
|
||||
{
|
||||
fakeLagIncoming.Enqueue( (output.ToArray(), Networking.FakeLag / 1000f, handler) );
|
||||
fakeLagIncoming.Enqueue( (data, Networking.FakeLag / 1000f, handler) );
|
||||
continue;
|
||||
}
|
||||
|
||||
using ByteStream stream = ByteStream.CreateReader( output );
|
||||
|
||||
handler( new NetworkSystem.NetworkMessage
|
||||
{
|
||||
Data = stream,
|
||||
Source = this
|
||||
} );
|
||||
|
||||
MessagesRecieved++;
|
||||
OnRawPacketReceived( data, handler );
|
||||
}
|
||||
}
|
||||
|
||||
private void InvokeMessageHandler( NetworkSystem.MessageHandler handler, byte[] data )
|
||||
{
|
||||
using ByteStream stream = ByteStream.CreateReader( data );
|
||||
handler( new() { Data = stream, Source = this } );
|
||||
|
||||
MessagesRecieved++;
|
||||
OnRawPacketReceived( data, handler );
|
||||
}
|
||||
}
|
||||
|
||||
235
engine/Sandbox.Test.Unit/Network/ConnectionWire.cs
Normal file
235
engine/Sandbox.Test.Unit/Network/ConnectionWire.cs
Normal file
@@ -0,0 +1,235 @@
|
||||
using System;
|
||||
using System.Buffers.Binary;
|
||||
using System.IO;
|
||||
using Sandbox.Compression;
|
||||
|
||||
namespace Networking;
|
||||
|
||||
[TestClass]
|
||||
public class ConnectionWire
|
||||
{
|
||||
[TestMethod]
|
||||
public void RoundTrip_SmallPayload_UsesRawFlag()
|
||||
{
|
||||
using var stream = ByteStream.Create( 32 );
|
||||
stream.Write( 42 );
|
||||
stream.Write( "hello" );
|
||||
|
||||
var encoded = Connection.EncodeStream( stream );
|
||||
|
||||
Assert.AreEqual( Connection.FlagRaw, encoded[0], "Small payload should use FlagRaw" );
|
||||
|
||||
var decoded = Connection.DecodeStream( encoded );
|
||||
var original = stream.ToSpan();
|
||||
Assert.IsTrue( decoded.SequenceEqual( original ) );
|
||||
}
|
||||
|
||||
[TestMethod]
|
||||
public void RoundTrip_LargePayload_Compresses()
|
||||
{
|
||||
// Build a payload large enough that LZ4 should actually compress it (repetitive data).
|
||||
using var stream = ByteStream.Create( 1024 );
|
||||
for ( int i = 0; i < 200; i++ )
|
||||
stream.Write( 0x41414141 );
|
||||
|
||||
var encoded = Connection.EncodeStream( stream );
|
||||
|
||||
Assert.AreEqual( Connection.FlagCompressed, encoded[0], "Large repetitive payload should compress" );
|
||||
|
||||
var decoded = Connection.DecodeStream( encoded );
|
||||
var original = stream.ToSpan();
|
||||
Assert.IsTrue( decoded.SequenceEqual( original ) );
|
||||
}
|
||||
|
||||
[TestMethod]
|
||||
public void BuildChunkPacket_HeaderFormat()
|
||||
{
|
||||
var payload = new byte[] { 0xAA, 0xBB, 0xCC };
|
||||
var chunk = Connection.BuildChunkPacket( payload, 0, payload.Length, chunkIndex: 2, totalChunks: 5 );
|
||||
|
||||
Assert.AreEqual( Connection.FlagChunk, chunk[0] );
|
||||
Assert.AreEqual( 2u, BinaryPrimitives.ReadUInt32LittleEndian( chunk.AsSpan( 1 ) ) );
|
||||
Assert.AreEqual( 5u, BinaryPrimitives.ReadUInt32LittleEndian( chunk.AsSpan( 5 ) ) );
|
||||
Assert.IsTrue( chunk.AsSpan( 9 ).SequenceEqual( payload ) );
|
||||
}
|
||||
|
||||
[TestMethod]
|
||||
public void DecodeStream_EmptyPacket_ReturnsEmpty()
|
||||
{
|
||||
var result = Connection.DecodeStream( ReadOnlySpan<byte>.Empty );
|
||||
Assert.AreEqual( 0, result.Length );
|
||||
}
|
||||
|
||||
[TestMethod]
|
||||
public void DecodeStream_UnknownFlag_Throws()
|
||||
{
|
||||
var packet = new byte[] { 0xFF, 0x01, 0x02 };
|
||||
Assert.ThrowsException<InvalidOperationException>( () =>
|
||||
{
|
||||
Connection.DecodeStream( packet );
|
||||
} );
|
||||
}
|
||||
|
||||
[TestMethod]
|
||||
public void DecodeStream_CompressedTooShort_Throws()
|
||||
{
|
||||
// FlagCompressed but no origLen bytes
|
||||
var packet = new byte[] { Connection.FlagCompressed, 0x01 };
|
||||
Assert.ThrowsException<InvalidDataException>( () =>
|
||||
{
|
||||
Connection.DecodeStream( packet );
|
||||
} );
|
||||
}
|
||||
|
||||
[TestMethod]
|
||||
public void DecodeStream_CompressedNegativeOrigLen_Throws()
|
||||
{
|
||||
// FlagCompressed + origLen = -1
|
||||
var packet = new byte[1 + sizeof( int )];
|
||||
packet[0] = Connection.FlagCompressed;
|
||||
BinaryPrimitives.WriteInt32LittleEndian( packet.AsSpan( 1 ), -1 );
|
||||
|
||||
Assert.ThrowsException<InvalidDataException>( () =>
|
||||
{
|
||||
Connection.DecodeStream( packet );
|
||||
} );
|
||||
}
|
||||
|
||||
[TestMethod]
|
||||
public void DecodeStream_CompressedZeroOrigLen_Throws()
|
||||
{
|
||||
var packet = new byte[1 + sizeof( int )];
|
||||
packet[0] = Connection.FlagCompressed;
|
||||
BinaryPrimitives.WriteInt32LittleEndian( packet.AsSpan( 1 ), 0 );
|
||||
|
||||
Assert.ThrowsException<InvalidDataException>( () =>
|
||||
{
|
||||
Connection.DecodeStream( packet );
|
||||
} );
|
||||
}
|
||||
|
||||
[TestMethod]
|
||||
public void DecodeStream_CompressedHugeOrigLen_Throws()
|
||||
{
|
||||
// origLen = int.MaxValue — way above MaxDecompressedSize
|
||||
var packet = new byte[1 + sizeof( int ) + 4];
|
||||
packet[0] = Connection.FlagCompressed;
|
||||
BinaryPrimitives.WriteInt32LittleEndian( packet.AsSpan( 1 ), int.MaxValue );
|
||||
|
||||
Assert.ThrowsException<InvalidDataException>( () =>
|
||||
{
|
||||
Connection.DecodeStream( packet );
|
||||
} );
|
||||
}
|
||||
|
||||
[TestMethod]
|
||||
public void DecodeStream_CompressedSizeMismatch_Throws()
|
||||
{
|
||||
// Craft a valid compressed packet but tamper the origLen to be larger than actual data.
|
||||
var original = new byte[256];
|
||||
Random.Shared.NextBytes( original );
|
||||
var compressed = LZ4.CompressBlock( original );
|
||||
|
||||
// Claim origLen is original.Length + 100 but LZ4 will only decompress original.Length bytes.
|
||||
var packet = new byte[1 + sizeof( int ) + compressed.Length];
|
||||
packet[0] = Connection.FlagCompressed;
|
||||
BinaryPrimitives.WriteInt32LittleEndian( packet.AsSpan( 1 ), original.Length + 100 );
|
||||
compressed.CopyTo( packet.AsSpan( 1 + sizeof( int ) ) );
|
||||
|
||||
Assert.ThrowsException<InvalidDataException>( () =>
|
||||
{
|
||||
Connection.DecodeStream( packet );
|
||||
} );
|
||||
}
|
||||
|
||||
[TestMethod]
|
||||
public void AssembleChunk_TooShort_Throws()
|
||||
{
|
||||
// FlagChunk header needs 9 bytes minimum
|
||||
var packet = new byte[] { Connection.FlagChunk, 0x00, 0x00, 0x00, 0x00 };
|
||||
|
||||
var conn = new StubConnection();
|
||||
Assert.ThrowsException<InvalidDataException>( () =>
|
||||
{
|
||||
conn.OnRawPacketReceived( packet, StubHandler );
|
||||
} );
|
||||
}
|
||||
|
||||
[TestMethod]
|
||||
public void AssembleChunk_IndexExceedsTotal_Throws()
|
||||
{
|
||||
// index=5, total=3 — invalid
|
||||
var packet = MakeChunkPacket( chunkIndex: 5, totalChunks: 3, dataLength: 10 );
|
||||
|
||||
var conn = new StubConnection();
|
||||
Assert.ThrowsException<InvalidDataException>( () =>
|
||||
{
|
||||
conn.OnRawPacketReceived( packet, StubHandler );
|
||||
} );
|
||||
}
|
||||
|
||||
[TestMethod]
|
||||
public void AssembleChunk_TotalOne_Throws()
|
||||
{
|
||||
// total=1 is invalid for chunked messages (should have been sent as a single packet)
|
||||
var packet = MakeChunkPacket( chunkIndex: 0, totalChunks: 1, dataLength: 10 );
|
||||
|
||||
var conn = new StubConnection();
|
||||
Assert.ThrowsException<InvalidDataException>( () =>
|
||||
{
|
||||
conn.OnRawPacketReceived( packet, StubHandler );
|
||||
} );
|
||||
}
|
||||
|
||||
[TestMethod]
|
||||
public void AssembleChunk_TotalExceedsLimit_Throws()
|
||||
{
|
||||
var packet = MakeChunkPacket( chunkIndex: 0, totalChunks: 2000, dataLength: 10 );
|
||||
|
||||
var conn = new StubConnection();
|
||||
Assert.ThrowsException<InvalidDataException>( () =>
|
||||
{
|
||||
conn.OnRawPacketReceived( packet, StubHandler );
|
||||
} );
|
||||
}
|
||||
|
||||
[TestMethod]
|
||||
public void AssembleChunk_OutOfOrderWithoutFirst_Throws()
|
||||
{
|
||||
// Send chunk index=1 without a preceding index=0 — no assembly in progress
|
||||
var packet = MakeChunkPacket( chunkIndex: 1, totalChunks: 3, dataLength: 10 );
|
||||
|
||||
var conn = new StubConnection();
|
||||
Assert.ThrowsException<InvalidDataException>( () =>
|
||||
{
|
||||
conn.OnRawPacketReceived( packet, StubHandler );
|
||||
} );
|
||||
}
|
||||
|
||||
// Helpers
|
||||
|
||||
private static byte[] MakeChunkPacket( uint chunkIndex, uint totalChunks, int dataLength )
|
||||
{
|
||||
var result = new byte[9 + dataLength];
|
||||
result[0] = Connection.FlagChunk;
|
||||
BinaryPrimitives.WriteUInt32LittleEndian( result.AsSpan( 1 ), chunkIndex );
|
||||
BinaryPrimitives.WriteUInt32LittleEndian( result.AsSpan( 5 ), totalChunks );
|
||||
return result;
|
||||
}
|
||||
|
||||
private static void StubHandler( Sandbox.Network.NetworkSystem.NetworkMessage msg )
|
||||
{
|
||||
// No-op; we only care about whether the receive path throws.
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Minimal concrete <see cref="Connection"/> for testing receive-side logic without a real transport.
|
||||
/// </summary>
|
||||
private sealed class StubConnection : Connection
|
||||
{
|
||||
public override bool IsHost => false;
|
||||
internal override void InternalSend( byte[] data, NetFlags flags ) { }
|
||||
internal override void InternalRecv( Sandbox.Network.NetworkSystem.MessageHandler handler ) { }
|
||||
internal override void InternalClose( int closeCode, string closeReason ) { }
|
||||
}
|
||||
}
|
||||
@@ -35,17 +35,19 @@ internal sealed class TestConnection : Connection
|
||||
}
|
||||
}
|
||||
|
||||
internal override void InternalSend( ByteStream stream, NetFlags flags )
|
||||
internal override void InternalSend( byte[] data, NetFlags flags )
|
||||
{
|
||||
var reader = new ByteStream( stream.ToArray() );
|
||||
if ( data[0] == FlagChunk )
|
||||
throw new NotImplementedException( "TestConnection does not support chunked messages" );
|
||||
|
||||
// Decode the wire envelope and dispatch by InternalMessageType.
|
||||
var decoded = Connection.DecodeStream( data );
|
||||
var reader = ByteStream.CreateReader( decoded );
|
||||
|
||||
var type = reader.Read<InternalMessageType>();
|
||||
|
||||
switch ( type )
|
||||
{
|
||||
case InternalMessageType.Chunk:
|
||||
throw new NotImplementedException();
|
||||
|
||||
case InternalMessageType.Packed:
|
||||
Messages.Add( new Message( type, GlobalGameNamespace.TypeLibrary.FromBytes<object>( ref reader ) ) );
|
||||
break;
|
||||
@@ -54,6 +56,8 @@ internal sealed class TestConnection : Connection
|
||||
Messages.Add( new Message( type, reader.GetRemainingBytes().ToArray() ) );
|
||||
break;
|
||||
}
|
||||
|
||||
reader.Dispose();
|
||||
}
|
||||
|
||||
internal override void InternalRecv( NetworkSystem.MessageHandler handler )
|
||||
|
||||
Reference in New Issue
Block a user