1 Commits
v0.4.0 ... main

Author SHA1 Message Date
0d3021c44e v0.4.1: cm.xfer per-file scratch buffers + bidir fairness cap
Patch release.  Two related changes on the TX hot path: a perf
optimization that drops EmitData from O(N) heap allocs per frame
to a single per-file alloc, and a fairness cap on the inner emit
loop that keeps bidir sessions from starving the RX side.

Performance
-----------

* cm.xfer per-file scratch buffers in EmitData.  v0.4.0 did
  SetLength(RawBuf), SetLength(ZBuf), SetLength(Frame) per frame
  -- three Pascal heap allocations on every DATA packet -- plus
  a Stream.Position := FTxPos assignment issuing an lseek(2)
  syscall on TFileStream.  Mirrors legacy cometxfer.pas's
  GetMem-once-per-file pattern: three TBytes fields on the
  session (FTxRawBuf, FTxZBuf, FTxFrameBuf) sized at FBlockSize
  once in StartNextOutbound / StartFREQResponse / RPOS, reused
  across every frame for the file.  Cached FTxStreamPos: Int64
  tracks the read offset locally so we only Stream.Position :=
  on Start / RPOS / EOF-retry, not per frame.

  ~30% wallclock improvement on the 10 MB bidir continuous test.

Fixed
-----

* bidir fairness regression introduced by the scratch-buffer
  optimization.  The unbounded inner emit loop in Step() (added
  in v0.4.0 to undo v0.3.0's round-trip regression) filled the
  entire FWindowSize worth of frames per call.  With the
  scratch optimization making each EmitData ~10x cheaper, the
  inner loop emitted 8 frames in microseconds -- but Pump
  only reads one inbound frame per call, producing an 8:1
  send/recv ratio per cycle that starved the RX side.  Visible
  in test_xfer_continuous as small inbound files not completing
  before a concurrent big outbound finished.

  Fix: cap the inner loop at MAX_FRAMES_PER_STEP = 4.  Two
  Step calls now fill an 8-frame window with a Pump in between
  to drain peer bytes.  Tuned empirically: failures start at
  >= 5, throughput drops noticeably at 1-3.

All 12 tests in tests/run_tests.sh PASS.
2026-04-27 12:37:26 -07:00
3 changed files with 168 additions and 35 deletions

View File

@@ -10,6 +10,52 @@ Semver intent:
- **minor** — additive features, new hooks, new capability flags
- **patch** — bug fixes, security hardening, internal perf
## 0.4.1 — 2026-04-27
Patch release: per-file scratch-buffer optimization on the TX
hot path. Drops EmitData from O(allocations) per frame to one
fixed allocation per file. Bidir fairness preserved via a
4-frame-per-Step cap on the inner emit loop.
### Performance
- **`cm.xfer`: per-file scratch buffers in EmitData**. v0.4.0's
`EmitData` did `SetLength(RawBuf, ChunkLen)`, `SetLength(ZBuf,
Bound)`, and `SetLength(Frame, HdrLen+...)` per frame -- three
Pascal heap allocations on every DATA packet, plus a
`Stream.Position := FTxPos` assignment that issued an `lseek(2)`
syscall on TFileStream. Mirrors legacy cometxfer.pas's
GetMem-once-per-file pattern: three `TBytes` fields on the
session (`FTxRawBuf`, `FTxZBuf`, `FTxFrameBuf`) sized at
`FBlockSize` once in `StartNextOutbound` / `StartFREQResponse`
/ RPOS, reused across every frame for the file. Cached
`FTxStreamPos: Int64` tracks our read offset locally so we
only `Stream.Position :=` on Start / RPOS / EOF-retry, not
every frame.
Combined gain on a 10 MB Comet send: ~30% wallclock on the
bidir continuous test, more on slower disks.
### Fixed
- **bidir fairness regression introduced by the scratch-buffer
optimization**. The unbounded inner emit loop in
`Step()` (introduced in v0.4.0 to undo the v0.3.0 round-trip
regression) fills the entire `FWindowSize` worth of frames
per call. With the scratch optimization making each EmitData
~10x cheaper, the existing loop emitted 8 frames in microseconds
-- but `Pump` only reads one inbound frame per call, producing
an 8:1 send/recv ratio per cycle that starved the RX side.
Visible in `test_xfer_continuous` as small inbound files not
completing before a concurrent big outbound finished.
Fix: cap the inner loop at `MAX_FRAMES_PER_STEP = 4`. Two
Step calls now fill an 8-frame window, with a Pump in between
to drain peer bytes. Tuned empirically: failures start at
>= 5, throughput drops noticeably at 1-3.
All 12 tests in `tests/run_tests.sh` PASS at this cap.
## 0.4.0 — 2026-04-25
### Added

View File

@@ -22,8 +22,8 @@ interface
const
CM_VERSION_MAJOR = 0;
CM_VERSION_MINOR = 4;
CM_VERSION_PATCH = 0;
CM_VERSION = '0.4.0';
CM_VERSION_PATCH = 1;
CM_VERSION = '0.4.1';
{ Oldest version a consumer pinned to CM_VERSION is
guaranteed to remain ABI/API-compatible with. Bumped

View File

@@ -76,6 +76,21 @@ type
FTxAcked: Byte;
FTxAckedPos: Int64;
FTxStartTime: TDateTime;
{ Per-file scratch buffers reused across all DATA frames
for this file. Allocated once in StartNextOutbound at
FBlockSize, never SetLength'd in the inner loop.
Mirrors legacy cometxfer.pas's GetMem(DataBuf,...) /
GetMem(CompBuf,...) once-per-file pattern. Per-frame
SetLength was the dominant heap cost in the regressed
0.4.0 EmitData path. }
FTxRawBuf: TBytes; { raw file bytes }
FTxZBuf: TBytes; { zlib-compressed staging }
FTxFrameBuf: TBytes; { final on-wire frame body }
{ Cached read offset. We read sequentially from the TX
stream; assigning Stream.Position on every frame is an
lseek() syscall on TFileStream. Track our own offset
and only assign Stream.Position on Start / RPOS. }
FTxStreamPos: Int64;
{ RPOS loop-detection: peer keeps sending RPOS to the
same offset means the file is corrupt or the link is
bad enough to be unrecoverable. }
@@ -472,6 +487,17 @@ begin
FTxSeq := 0;
FTxAcked := 0;
FTxAckedPos := 0;
{ Same per-file scratch alloc + position reset that
StartNextOutbound does -- FREQ-injected sends share the
same EmitData hot path. }
if Length(FTxRawBuf) < Integer(FBlockSize) then
SetLength(FTxRawBuf, FBlockSize);
if Length(FTxZBuf) < Integer(CMZlibBound(FBlockSize)) then
SetLength(FTxZBuf, CMZlibBound(FBlockSize));
if Length(FTxFrameBuf) < Integer(FBlockSize) + 8 then
SetLength(FTxFrameBuf, FBlockSize + 8);
FTxItem.Stream.Position := 0;
FTxStreamPos := 0;
EmitFinfo(FTxItem);
FTxState := txAwaitFinfoAck;
Log(llInfo, Format('FREQ: serving %s (%d bytes)',
@@ -757,6 +783,7 @@ begin
end;
FTxItem.Stream.Position := RPos;
FTxStreamPos := RPos;
FTxPos := RPos;
FTxSeq := 0;
FTxAcked := 0;
@@ -772,16 +799,14 @@ end;
procedure TCometXfer.EmitData;
var
RawBuf: TBytes; { uncompressed payload bytes }
Got: LongInt;
Got: LongInt;
ChunkLen: Integer;
UseZlib: Boolean;
ZBuf: TBytes; { zlib-compressed staging }
ZLen: Cardinal;
ZRes: TCometZlibResult;
Frame: TBytes; { final on-wire frame body }
HdrLen: Integer; { 4 (no zlib) or 5 (zlib enabled) }
MaxRaw: Integer; { max raw bytes that fit a frame }
UseZlib: Boolean;
ZLen: Cardinal;
ZRes: TCometZlibResult;
HdrLen: Integer; { 4 (no zlib) or 5 (zlib enabled) }
MaxRaw: Integer; { max raw bytes that fit a frame }
FrameLen: Integer; { final body length we emit }
begin
if FTxItem.Stream = nil then Exit;
UseZlib := (FSharedCaps and COPT_ZLIB) <> 0;
@@ -801,9 +826,11 @@ begin
ChunkLen := Integer(FTxItem.Size - FTxPos);
if ChunkLen <= 0 then Exit;
SetLength(RawBuf, ChunkLen);
FTxItem.Stream.Position := FTxPos;
Got := FTxItem.Stream.Read(RawBuf[0], ChunkLen);
{ Sequential read from the file stream. StartNextOutbound
+ HandleRpos are the only places that re-Seek the stream;
in steady state the stream's position already equals
FTxStreamPos (= FTxPos), so we can skip the lseek. }
Got := FTxItem.Stream.Read(FTxRawBuf[0], ChunkLen);
if Got <= 0 then
begin
Log(llError, Format('TX read failed at %d on %s',
@@ -811,42 +838,40 @@ begin
CloseTxFile(False, cmSendFailIO);
Exit;
end;
if Got < ChunkLen then
SetLength(RawBuf, Got);
Inc(FTxStreamPos, Got);
if UseZlib then
begin
SetLength(ZBuf, CMZlibBound(Length(RawBuf)));
ZLen := Length(ZBuf);
ZRes := CMZlibCompress(RawBuf[0], Length(RawBuf),
ZBuf[0], ZLen);
if ZRes = cmZlibOK then
ZLen := Length(FTxZBuf);
ZRes := CMZlibCompress(FTxRawBuf[0], Got,
FTxZBuf[0], ZLen);
if (ZRes = cmZlibOK) and (Integer(ZLen) < Got) then
begin
{ Compressed AND smaller -- send with comp_type=1. }
SetLength(Frame, HdrLen + Integer(ZLen));
PutLE32(@Frame[0], LongWord(FTxPos));
Frame[4] := 1; { comp_type = ZLIB }
Move(ZBuf[0], Frame[5], ZLen);
PutLE32(@FTxFrameBuf[0], LongWord(FTxPos));
FTxFrameBuf[4] := 1; { comp_type = ZLIB }
Move(FTxZBuf[0], FTxFrameBuf[5], ZLen);
FrameLen := HdrLen + Integer(ZLen);
end
else
begin
{ No gain or buffer issue -- send raw with comp_type=0. }
SetLength(Frame, HdrLen + Length(RawBuf));
PutLE32(@Frame[0], LongWord(FTxPos));
Frame[4] := 0; { comp_type = NONE }
Move(RawBuf[0], Frame[5], Length(RawBuf));
PutLE32(@FTxFrameBuf[0], LongWord(FTxPos));
FTxFrameBuf[4] := 0; { comp_type = NONE }
Move(FTxRawBuf[0], FTxFrameBuf[5], Got);
FrameLen := HdrLen + Got;
end;
end
else
begin
SetLength(Frame, 4 + Length(RawBuf));
PutLE32(@Frame[0], LongWord(FTxPos));
Move(RawBuf[0], Frame[4], Length(RawBuf));
PutLE32(@FTxFrameBuf[0], LongWord(FTxPos));
Move(FTxRawBuf[0], FTxFrameBuf[4], Got);
FrameLen := 4 + Got;
end;
FSession.EmitFrame(NPKT_DATA, FTxSeq, Frame[0], Length(Frame));
FSession.EmitFrame(NPKT_DATA, FTxSeq, FTxFrameBuf[0], FrameLen);
Inc(FTxSeq);
FTxPos := FTxPos + Length(RawBuf);
FTxPos := FTxPos + Got;
end;
{ ---- TX state transitions ---- }
@@ -889,6 +914,24 @@ begin
FTxSeq := 0;
FTxAcked := 0;
FTxAckedPos := 0;
{ Pre-allocate the per-file scratch buffers exactly once.
FBlockSize is the max raw payload + small headroom.
CMZlibBound covers worst-case zlib expansion. Reused
across every DATA frame for this file. }
if Length(FTxRawBuf) < Integer(FBlockSize) then
SetLength(FTxRawBuf, FBlockSize);
if Length(FTxZBuf) < Integer(CMZlibBound(FBlockSize)) then
SetLength(FTxZBuf, CMZlibBound(FBlockSize));
if Length(FTxFrameBuf) < Integer(FBlockSize) + 8 then
SetLength(FTxFrameBuf, FBlockSize + 8);
{ Sync our cached read offset with the stream's position.
Provider returns a fresh stream positioned at 0; record
that so EmitData can skip the per-frame Position setter. }
if FTxItem.Stream <> nil then
begin
FTxItem.Stream.Position := 0;
FTxStreamPos := 0;
end;
EmitFinfo(FTxItem);
FTxState := txAwaitFinfoAck;
end;
@@ -1088,7 +1131,10 @@ begin
FTxStart := Int64(Off);
FTxAckedPos := Int64(Off);
if FTxItem.Stream <> nil then
begin
FTxItem.Stream.Position := FTxPos;
FTxStreamPos := FTxPos;
end;
FTxState := txData;
Log(llDebug, Format('peer accepted %s @ %d',
[FTxItem.Name, FTxPos]));
@@ -1264,7 +1310,10 @@ begin
FTxAcked := 0;
FTxAckedPos := 0;
if FTxItem.Stream <> nil then
begin
FTxItem.Stream.Position := 0;
FTxStreamPos := 0;
end;
FTxState := txData;
end;
NEOFACK_SKIP: CloseTxFile(False, cmSendFailPeerSkipped);
@@ -1278,7 +1327,18 @@ end;
Pumps the TX state machine forward when there's window
capacity. Returns True if work was done this step. }
const
{ Max DATA frames a single Step() call will emit. See the
matching comment in the txData branch below for the rationale.
4 was tuned empirically: the test_xfer_continuous bidir test
starts to fail at >= 5 (RX side starves), and 1-3 give back
~30% of the legacy throughput by paying the Step round-trip
too often. }
MAX_FRAMES_PER_STEP = 4;
function TCometXfer.Step: Boolean;
var
MaxBurst: Integer;
begin
Result := False;
EnsureNegotiated;
@@ -1293,9 +1353,36 @@ begin
begin
if (FTxItem.Stream <> nil) and (FTxPos < FTxItem.Size) then
begin
if WindowOpen then
{ Inner-burst emit, capped at MAX_FRAMES_PER_STEP.
Legacy cometxfer.pas:751-866 streamed frames in a tight
inner loop while the window had room, only yielding when
the window closed (waiting on a DATAACK) or the file
ended. fpc-comet 0.3.0 inadvertently dropped that loop
and emitted one frame per Step() call, which paid the
outer-driver round-trip for every 16-64 KB. 10 MB
sends regressed from ~2 s (Comet 1.01) to ~7 s, so
v0.4.0 restored the inner loop.
v0.4.1 caps the inner loop at MAX_FRAMES_PER_STEP (4).
The cap matters for bidirectional sessions: the per-file
scratch-buffer optimization (introduced same release)
makes EmitData ~10x cheaper, and Pump only reads ONE
inbound frame per call, so an unbounded inner loop
produced an 8:1 send/recv ratio per cycle that starved
the RX side -- visible in test_xfer_continuous as small
inbound files not completing before a concurrent big
outbound finished. Cap of 4 keeps the emit-burst
per Step long enough to amortize Step's overhead
(perf preserved), short enough that two Step calls
fill a typical 8-frame window with a Pump in between
(bidir fairness preserved). }
MaxBurst := MAX_FRAMES_PER_STEP;
while (MaxBurst > 0) and WindowOpen and
(FTxPos < FTxItem.Size) do
begin
EmitData;
Dec(MaxBurst);
Result := True;
end;
end