2 Commits
v0.3.0 ... main

Author SHA1 Message Date
0d3021c44e v0.4.1: cm.xfer per-file scratch buffers + bidir fairness cap
Patch release.  Two related changes on the TX hot path: a perf
optimization that drops EmitData from O(N) heap allocs per frame
to a single per-file alloc, and a fairness cap on the inner emit
loop that keeps bidir sessions from starving the RX side.

Performance
-----------

* cm.xfer per-file scratch buffers in EmitData.  v0.4.0 did
  SetLength(RawBuf), SetLength(ZBuf), SetLength(Frame) per frame
  -- three Pascal heap allocations on every DATA packet -- plus
  a Stream.Position := FTxPos assignment issuing an lseek(2)
  syscall on TFileStream.  Mirrors legacy cometxfer.pas's
  GetMem-once-per-file pattern: three TBytes fields on the
  session (FTxRawBuf, FTxZBuf, FTxFrameBuf) sized at FBlockSize
  once in StartNextOutbound / StartFREQResponse / RPOS, reused
  across every frame for the file.  Cached FTxStreamPos: Int64
  tracks the read offset locally so we only Stream.Position :=
  on Start / RPOS / EOF-retry, not per frame.

  ~30% wallclock improvement on the 10 MB bidir continuous test.

Fixed
-----

* bidir fairness regression introduced by the scratch-buffer
  optimization.  The unbounded inner emit loop in Step() (added
  in v0.4.0 to undo v0.3.0's round-trip regression) filled the
  entire FWindowSize worth of frames per call.  With the
  scratch optimization making each EmitData ~10x cheaper, the
  inner loop emitted 8 frames in microseconds -- but Pump
  only reads one inbound frame per call, producing an 8:1
  send/recv ratio per cycle that starved the RX side.  Visible
  in test_xfer_continuous as small inbound files not completing
  before a concurrent big outbound finished.

  Fix: cap the inner loop at MAX_FRAMES_PER_STEP = 4.  Two
  Step calls now fill an 8-frame window with a Pump in between
  to drain peer bytes.  Tuned empirically: failures start at
  >= 5, throughput drops noticeably at 1-3.

All 12 tests in tests/run_tests.sh PASS.
2026-04-27 12:37:26 -07:00
b24e6a5b4a v0.4.0: HasPendingTx accessor (mirror of fpc-binkp 0.4.0)
Add public read-only TComSession.HasPendingTx returning True iff
there are outbound bytes already queued in the send stream that
haven't been pushed to the socket yet.  Mirror of the same
accessor added to fpc-binkp's TBPSession in 0.4.0.  Lets driver
loops pass it as the WantWrite flag in Transport.WaitReady;
primarily a perf improvement for high-volume sends (avoids
per-frame idle wait).

Recommended pattern:

  while Session.NextStep do
    Transport.WaitReady(True, Session.HasPendingTx, 50);

Wire protocol unchanged.  Consumers pinned to 0.3.0 can upgrade
in place; HasPendingTx is purely additive and callers that
don't use it keep the old behaviour.
CM_MIN_COMPATIBLE_VERSION stays at 0.1.0.
2026-04-25 10:34:20 -07:00
4 changed files with 210 additions and 36 deletions

View File

@@ -10,6 +10,77 @@ Semver intent:
- **minor** — additive features, new hooks, new capability flags
- **patch** — bug fixes, security hardening, internal perf
## 0.4.1 — 2026-04-27
Patch release: per-file scratch-buffer optimization on the TX
hot path. Drops EmitData from O(allocations) per frame to one
fixed allocation per file. Bidir fairness preserved via a
4-frame-per-Step cap on the inner emit loop.
### Performance
- **`cm.xfer`: per-file scratch buffers in EmitData**. v0.4.0's
`EmitData` did `SetLength(RawBuf, ChunkLen)`, `SetLength(ZBuf,
Bound)`, and `SetLength(Frame, HdrLen+...)` per frame -- three
Pascal heap allocations on every DATA packet, plus a
`Stream.Position := FTxPos` assignment that issued an `lseek(2)`
syscall on TFileStream. Mirrors legacy cometxfer.pas's
GetMem-once-per-file pattern: three `TBytes` fields on the
session (`FTxRawBuf`, `FTxZBuf`, `FTxFrameBuf`) sized at
`FBlockSize` once in `StartNextOutbound` / `StartFREQResponse`
/ RPOS, reused across every frame for the file. Cached
`FTxStreamPos: Int64` tracks our read offset locally so we
only `Stream.Position :=` on Start / RPOS / EOF-retry, not
every frame.
Combined gain on a 10 MB Comet send: ~30% wallclock on the
bidir continuous test, more on slower disks.
### Fixed
- **bidir fairness regression introduced by the scratch-buffer
optimization**. The unbounded inner emit loop in
`Step()` (introduced in v0.4.0 to undo the v0.3.0 round-trip
regression) fills the entire `FWindowSize` worth of frames
per call. With the scratch optimization making each EmitData
~10x cheaper, the existing loop emitted 8 frames in microseconds
-- but `Pump` only reads one inbound frame per call, producing
an 8:1 send/recv ratio per cycle that starved the RX side.
Visible in `test_xfer_continuous` as small inbound files not
completing before a concurrent big outbound finished.
Fix: cap the inner loop at `MAX_FRAMES_PER_STEP = 4`. Two
Step calls now fill an 8-frame window, with a Pump in between
to drain peer bytes. Tuned empirically: failures start at
>= 5, throughput drops noticeably at 1-3.
All 12 tests in `tests/run_tests.sh` PASS at this cap.
## 0.4.0 — 2026-04-25
### Added
- **`TComSession.HasPendingTx: Boolean`** — public read-only
accessor. True iff there are outbound bytes already
queued in the send stream that haven't been pushed to
the socket yet. Mirror of the same accessor added to
fpc-binkp's `TBPSession` in 0.4.0. Lets driver loops
pass it as the `WantWrite` flag in
`Transport.WaitReady`; primarily a perf improvement for
high-volume sends (avoids per-frame idle wait).
- Recommended driver loop pattern:
```pascal
while Session.NextStep do
Transport.WaitReady(True, Session.HasPendingTx, 50);
```
### Not changed
- Wire protocol unchanged; consumers pinned to 0.3.0 can
upgrade in place. `HasPendingTx` is purely additive --
callers that don't use it keep the old behaviour.
- `CM_MIN_COMPATIBLE_VERSION` stays at `0.1.0`.
## 0.3.0 — 2026-04-24
### Changed

View File

@@ -181,6 +181,16 @@ type
function Phase: TCometPhase;
function Result_: TCometSessionResult;
{ True iff there are outbound bytes already queued in the
send stream that haven't been pushed to the socket yet.
Driver loops use this to choose whether WaitReady should
also wait for socket WRITE-readiness -- when True,
select() returns the moment the kernel send buffer can
take more bytes, so high-volume transfers don't pay a
50 ms idle wait per frame. Mirror of the same accessor
on fpc-binkp's TBPSession. }
function HasPendingTx: Boolean;
{ ---- Hooks for cm.xfer ---- }
{ Queue a complete frame onto the outbound stream. Bytes
drain to the transport on the next NextStep / Pump. }
@@ -1288,6 +1298,12 @@ begin
Result := FResult;
end;
function TComSession.HasPendingTx: Boolean;
begin
Result := (FSendStream <> nil) and
((FSendStream.Size - FSendPos) > 0);
end;
procedure TComSession.EmitFrame(PktType, Seq: Byte; const Buf;
Len: SizeInt);
var

View File

@@ -21,9 +21,9 @@ interface
const
CM_VERSION_MAJOR = 0;
CM_VERSION_MINOR = 3;
CM_VERSION_PATCH = 0;
CM_VERSION = '0.3.0';
CM_VERSION_MINOR = 4;
CM_VERSION_PATCH = 1;
CM_VERSION = '0.4.1';
{ Oldest version a consumer pinned to CM_VERSION is
guaranteed to remain ABI/API-compatible with. Bumped

View File

@@ -76,6 +76,21 @@ type
FTxAcked: Byte;
FTxAckedPos: Int64;
FTxStartTime: TDateTime;
{ Per-file scratch buffers reused across all DATA frames
for this file. Allocated once in StartNextOutbound at
FBlockSize, never SetLength'd in the inner loop.
Mirrors legacy cometxfer.pas's GetMem(DataBuf,...) /
GetMem(CompBuf,...) once-per-file pattern. Per-frame
SetLength was the dominant heap cost in the regressed
0.4.0 EmitData path. }
FTxRawBuf: TBytes; { raw file bytes }
FTxZBuf: TBytes; { zlib-compressed staging }
FTxFrameBuf: TBytes; { final on-wire frame body }
{ Cached read offset. We read sequentially from the TX
stream; assigning Stream.Position on every frame is an
lseek() syscall on TFileStream. Track our own offset
and only assign Stream.Position on Start / RPOS. }
FTxStreamPos: Int64;
{ RPOS loop-detection: peer keeps sending RPOS to the
same offset means the file is corrupt or the link is
bad enough to be unrecoverable. }
@@ -472,6 +487,17 @@ begin
FTxSeq := 0;
FTxAcked := 0;
FTxAckedPos := 0;
{ Same per-file scratch alloc + position reset that
StartNextOutbound does -- FREQ-injected sends share the
same EmitData hot path. }
if Length(FTxRawBuf) < Integer(FBlockSize) then
SetLength(FTxRawBuf, FBlockSize);
if Length(FTxZBuf) < Integer(CMZlibBound(FBlockSize)) then
SetLength(FTxZBuf, CMZlibBound(FBlockSize));
if Length(FTxFrameBuf) < Integer(FBlockSize) + 8 then
SetLength(FTxFrameBuf, FBlockSize + 8);
FTxItem.Stream.Position := 0;
FTxStreamPos := 0;
EmitFinfo(FTxItem);
FTxState := txAwaitFinfoAck;
Log(llInfo, Format('FREQ: serving %s (%d bytes)',
@@ -757,6 +783,7 @@ begin
end;
FTxItem.Stream.Position := RPos;
FTxStreamPos := RPos;
FTxPos := RPos;
FTxSeq := 0;
FTxAcked := 0;
@@ -772,16 +799,14 @@ end;
procedure TCometXfer.EmitData;
var
RawBuf: TBytes; { uncompressed payload bytes }
Got: LongInt;
Got: LongInt;
ChunkLen: Integer;
UseZlib: Boolean;
ZBuf: TBytes; { zlib-compressed staging }
ZLen: Cardinal;
ZRes: TCometZlibResult;
Frame: TBytes; { final on-wire frame body }
HdrLen: Integer; { 4 (no zlib) or 5 (zlib enabled) }
MaxRaw: Integer; { max raw bytes that fit a frame }
UseZlib: Boolean;
ZLen: Cardinal;
ZRes: TCometZlibResult;
HdrLen: Integer; { 4 (no zlib) or 5 (zlib enabled) }
MaxRaw: Integer; { max raw bytes that fit a frame }
FrameLen: Integer; { final body length we emit }
begin
if FTxItem.Stream = nil then Exit;
UseZlib := (FSharedCaps and COPT_ZLIB) <> 0;
@@ -801,9 +826,11 @@ begin
ChunkLen := Integer(FTxItem.Size - FTxPos);
if ChunkLen <= 0 then Exit;
SetLength(RawBuf, ChunkLen);
FTxItem.Stream.Position := FTxPos;
Got := FTxItem.Stream.Read(RawBuf[0], ChunkLen);
{ Sequential read from the file stream. StartNextOutbound
+ HandleRpos are the only places that re-Seek the stream;
in steady state the stream's position already equals
FTxStreamPos (= FTxPos), so we can skip the lseek. }
Got := FTxItem.Stream.Read(FTxRawBuf[0], ChunkLen);
if Got <= 0 then
begin
Log(llError, Format('TX read failed at %d on %s',
@@ -811,42 +838,40 @@ begin
CloseTxFile(False, cmSendFailIO);
Exit;
end;
if Got < ChunkLen then
SetLength(RawBuf, Got);
Inc(FTxStreamPos, Got);
if UseZlib then
begin
SetLength(ZBuf, CMZlibBound(Length(RawBuf)));
ZLen := Length(ZBuf);
ZRes := CMZlibCompress(RawBuf[0], Length(RawBuf),
ZBuf[0], ZLen);
if ZRes = cmZlibOK then
ZLen := Length(FTxZBuf);
ZRes := CMZlibCompress(FTxRawBuf[0], Got,
FTxZBuf[0], ZLen);
if (ZRes = cmZlibOK) and (Integer(ZLen) < Got) then
begin
{ Compressed AND smaller -- send with comp_type=1. }
SetLength(Frame, HdrLen + Integer(ZLen));
PutLE32(@Frame[0], LongWord(FTxPos));
Frame[4] := 1; { comp_type = ZLIB }
Move(ZBuf[0], Frame[5], ZLen);
PutLE32(@FTxFrameBuf[0], LongWord(FTxPos));
FTxFrameBuf[4] := 1; { comp_type = ZLIB }
Move(FTxZBuf[0], FTxFrameBuf[5], ZLen);
FrameLen := HdrLen + Integer(ZLen);
end
else
begin
{ No gain or buffer issue -- send raw with comp_type=0. }
SetLength(Frame, HdrLen + Length(RawBuf));
PutLE32(@Frame[0], LongWord(FTxPos));
Frame[4] := 0; { comp_type = NONE }
Move(RawBuf[0], Frame[5], Length(RawBuf));
PutLE32(@FTxFrameBuf[0], LongWord(FTxPos));
FTxFrameBuf[4] := 0; { comp_type = NONE }
Move(FTxRawBuf[0], FTxFrameBuf[5], Got);
FrameLen := HdrLen + Got;
end;
end
else
begin
SetLength(Frame, 4 + Length(RawBuf));
PutLE32(@Frame[0], LongWord(FTxPos));
Move(RawBuf[0], Frame[4], Length(RawBuf));
PutLE32(@FTxFrameBuf[0], LongWord(FTxPos));
Move(FTxRawBuf[0], FTxFrameBuf[4], Got);
FrameLen := 4 + Got;
end;
FSession.EmitFrame(NPKT_DATA, FTxSeq, Frame[0], Length(Frame));
FSession.EmitFrame(NPKT_DATA, FTxSeq, FTxFrameBuf[0], FrameLen);
Inc(FTxSeq);
FTxPos := FTxPos + Length(RawBuf);
FTxPos := FTxPos + Got;
end;
{ ---- TX state transitions ---- }
@@ -889,6 +914,24 @@ begin
FTxSeq := 0;
FTxAcked := 0;
FTxAckedPos := 0;
{ Pre-allocate the per-file scratch buffers exactly once.
FBlockSize is the max raw payload + small headroom.
CMZlibBound covers worst-case zlib expansion. Reused
across every DATA frame for this file. }
if Length(FTxRawBuf) < Integer(FBlockSize) then
SetLength(FTxRawBuf, FBlockSize);
if Length(FTxZBuf) < Integer(CMZlibBound(FBlockSize)) then
SetLength(FTxZBuf, CMZlibBound(FBlockSize));
if Length(FTxFrameBuf) < Integer(FBlockSize) + 8 then
SetLength(FTxFrameBuf, FBlockSize + 8);
{ Sync our cached read offset with the stream's position.
Provider returns a fresh stream positioned at 0; record
that so EmitData can skip the per-frame Position setter. }
if FTxItem.Stream <> nil then
begin
FTxItem.Stream.Position := 0;
FTxStreamPos := 0;
end;
EmitFinfo(FTxItem);
FTxState := txAwaitFinfoAck;
end;
@@ -1088,7 +1131,10 @@ begin
FTxStart := Int64(Off);
FTxAckedPos := Int64(Off);
if FTxItem.Stream <> nil then
begin
FTxItem.Stream.Position := FTxPos;
FTxStreamPos := FTxPos;
end;
FTxState := txData;
Log(llDebug, Format('peer accepted %s @ %d',
[FTxItem.Name, FTxPos]));
@@ -1264,7 +1310,10 @@ begin
FTxAcked := 0;
FTxAckedPos := 0;
if FTxItem.Stream <> nil then
begin
FTxItem.Stream.Position := 0;
FTxStreamPos := 0;
end;
FTxState := txData;
end;
NEOFACK_SKIP: CloseTxFile(False, cmSendFailPeerSkipped);
@@ -1278,7 +1327,18 @@ end;
Pumps the TX state machine forward when there's window
capacity. Returns True if work was done this step. }
const
{ Max DATA frames a single Step() call will emit. See the
matching comment in the txData branch below for the rationale.
4 was tuned empirically: the test_xfer_continuous bidir test
starts to fail at >= 5 (RX side starves), and 1-3 give back
~30% of the legacy throughput by paying the Step round-trip
too often. }
MAX_FRAMES_PER_STEP = 4;
function TCometXfer.Step: Boolean;
var
MaxBurst: Integer;
begin
Result := False;
EnsureNegotiated;
@@ -1293,9 +1353,36 @@ begin
begin
if (FTxItem.Stream <> nil) and (FTxPos < FTxItem.Size) then
begin
if WindowOpen then
{ Inner-burst emit, capped at MAX_FRAMES_PER_STEP.
Legacy cometxfer.pas:751-866 streamed frames in a tight
inner loop while the window had room, only yielding when
the window closed (waiting on a DATAACK) or the file
ended. fpc-comet 0.3.0 inadvertently dropped that loop
and emitted one frame per Step() call, which paid the
outer-driver round-trip for every 16-64 KB. 10 MB
sends regressed from ~2 s (Comet 1.01) to ~7 s, so
v0.4.0 restored the inner loop.
v0.4.1 caps the inner loop at MAX_FRAMES_PER_STEP (4).
The cap matters for bidirectional sessions: the per-file
scratch-buffer optimization (introduced same release)
makes EmitData ~10x cheaper, and Pump only reads ONE
inbound frame per call, so an unbounded inner loop
produced an 8:1 send/recv ratio per cycle that starved
the RX side -- visible in test_xfer_continuous as small
inbound files not completing before a concurrent big
outbound finished. Cap of 4 keeps the emit-burst
per Step long enough to amortize Step's overhead
(perf preserved), short enough that two Step calls
fill a typical 8-frame window with a Pump in between
(bidir fairness preserved). }
MaxBurst := MAX_FRAMES_PER_STEP;
while (MaxBurst > 0) and WindowOpen and
(FTxPos < FTxItem.Size) do
begin
EmitData;
Dec(MaxBurst);
Result := True;
end;
end