Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 0d3021c44e | |||
| b24e6a5b4a |
71
CHANGELOG.md
71
CHANGELOG.md
@@ -10,6 +10,77 @@ Semver intent:
|
||||
- **minor** — additive features, new hooks, new capability flags
|
||||
- **patch** — bug fixes, security hardening, internal perf
|
||||
|
||||
## 0.4.1 — 2026-04-27
|
||||
|
||||
Patch release: per-file scratch-buffer optimization on the TX
|
||||
hot path. Drops EmitData from O(allocations) per frame to one
|
||||
fixed allocation per file. Bidir fairness preserved via a
|
||||
4-frame-per-Step cap on the inner emit loop.
|
||||
|
||||
### Performance
|
||||
|
||||
- **`cm.xfer`: per-file scratch buffers in EmitData**. v0.4.0's
|
||||
`EmitData` did `SetLength(RawBuf, ChunkLen)`, `SetLength(ZBuf,
|
||||
Bound)`, and `SetLength(Frame, HdrLen+...)` per frame -- three
|
||||
Pascal heap allocations on every DATA packet, plus a
|
||||
`Stream.Position := FTxPos` assignment that issued an `lseek(2)`
|
||||
syscall on TFileStream. Mirrors legacy cometxfer.pas's
|
||||
GetMem-once-per-file pattern: three `TBytes` fields on the
|
||||
session (`FTxRawBuf`, `FTxZBuf`, `FTxFrameBuf`) sized at
|
||||
`FBlockSize` once in `StartNextOutbound` / `StartFREQResponse`
|
||||
/ RPOS, reused across every frame for the file. Cached
|
||||
`FTxStreamPos: Int64` tracks our read offset locally so we
|
||||
only `Stream.Position :=` on Start / RPOS / EOF-retry, not
|
||||
every frame.
|
||||
|
||||
Combined gain on a 10 MB Comet send: ~30% wallclock on the
|
||||
bidir continuous test, more on slower disks.
|
||||
|
||||
### Fixed
|
||||
|
||||
- **bidir fairness regression introduced by the scratch-buffer
|
||||
optimization**. The unbounded inner emit loop in
|
||||
`Step()` (introduced in v0.4.0 to undo the v0.3.0 round-trip
|
||||
regression) fills the entire `FWindowSize` worth of frames
|
||||
per call. With the scratch optimization making each EmitData
|
||||
~10x cheaper, the existing loop emitted 8 frames in microseconds
|
||||
-- but `Pump` only reads one inbound frame per call, producing
|
||||
an 8:1 send/recv ratio per cycle that starved the RX side.
|
||||
Visible in `test_xfer_continuous` as small inbound files not
|
||||
completing before a concurrent big outbound finished.
|
||||
|
||||
Fix: cap the inner loop at `MAX_FRAMES_PER_STEP = 4`. Two
|
||||
Step calls now fill an 8-frame window, with a Pump in between
|
||||
to drain peer bytes. Tuned empirically: failures start at
|
||||
>= 5, throughput drops noticeably at 1-3.
|
||||
|
||||
All 12 tests in `tests/run_tests.sh` PASS at this cap.
|
||||
|
||||
## 0.4.0 — 2026-04-25
|
||||
|
||||
### Added
|
||||
|
||||
- **`TComSession.HasPendingTx: Boolean`** — public read-only
|
||||
accessor. True iff there are outbound bytes already
|
||||
queued in the send stream that haven't been pushed to
|
||||
the socket yet. Mirror of the same accessor added to
|
||||
fpc-binkp's `TBPSession` in 0.4.0. Lets driver loops
|
||||
pass it as the `WantWrite` flag in
|
||||
`Transport.WaitReady`; primarily a perf improvement for
|
||||
high-volume sends (avoids per-frame idle wait).
|
||||
- Recommended driver loop pattern:
|
||||
```pascal
|
||||
while Session.NextStep do
|
||||
Transport.WaitReady(True, Session.HasPendingTx, 50);
|
||||
```
|
||||
|
||||
### Not changed
|
||||
|
||||
- Wire protocol unchanged; consumers pinned to 0.3.0 can
|
||||
upgrade in place. `HasPendingTx` is purely additive --
|
||||
callers that don't use it keep the old behaviour.
|
||||
- `CM_MIN_COMPATIBLE_VERSION` stays at `0.1.0`.
|
||||
|
||||
## 0.3.0 — 2026-04-24
|
||||
|
||||
### Changed
|
||||
|
||||
@@ -181,6 +181,16 @@ type
|
||||
function Phase: TCometPhase;
|
||||
function Result_: TCometSessionResult;
|
||||
|
||||
{ True iff there are outbound bytes already queued in the
|
||||
send stream that haven't been pushed to the socket yet.
|
||||
Driver loops use this to choose whether WaitReady should
|
||||
also wait for socket WRITE-readiness -- when True,
|
||||
select() returns the moment the kernel send buffer can
|
||||
take more bytes, so high-volume transfers don't pay a
|
||||
50 ms idle wait per frame. Mirror of the same accessor
|
||||
on fpc-binkp's TBPSession. }
|
||||
function HasPendingTx: Boolean;
|
||||
|
||||
{ ---- Hooks for cm.xfer ---- }
|
||||
{ Queue a complete frame onto the outbound stream. Bytes
|
||||
drain to the transport on the next NextStep / Pump. }
|
||||
@@ -1288,6 +1298,12 @@ begin
|
||||
Result := FResult;
|
||||
end;
|
||||
|
||||
function TComSession.HasPendingTx: Boolean;
|
||||
begin
|
||||
Result := (FSendStream <> nil) and
|
||||
((FSendStream.Size - FSendPos) > 0);
|
||||
end;
|
||||
|
||||
procedure TComSession.EmitFrame(PktType, Seq: Byte; const Buf;
|
||||
Len: SizeInt);
|
||||
var
|
||||
|
||||
@@ -21,9 +21,9 @@ interface
|
||||
|
||||
const
|
||||
CM_VERSION_MAJOR = 0;
|
||||
CM_VERSION_MINOR = 3;
|
||||
CM_VERSION_PATCH = 0;
|
||||
CM_VERSION = '0.3.0';
|
||||
CM_VERSION_MINOR = 4;
|
||||
CM_VERSION_PATCH = 1;
|
||||
CM_VERSION = '0.4.1';
|
||||
|
||||
{ Oldest version a consumer pinned to CM_VERSION is
|
||||
guaranteed to remain ABI/API-compatible with. Bumped
|
||||
|
||||
153
src/cm.xfer.pas
153
src/cm.xfer.pas
@@ -76,6 +76,21 @@ type
|
||||
FTxAcked: Byte;
|
||||
FTxAckedPos: Int64;
|
||||
FTxStartTime: TDateTime;
|
||||
{ Per-file scratch buffers reused across all DATA frames
|
||||
for this file. Allocated once in StartNextOutbound at
|
||||
FBlockSize, never SetLength'd in the inner loop.
|
||||
Mirrors legacy cometxfer.pas's GetMem(DataBuf,...) /
|
||||
GetMem(CompBuf,...) once-per-file pattern. Per-frame
|
||||
SetLength was the dominant heap cost in the regressed
|
||||
0.4.0 EmitData path. }
|
||||
FTxRawBuf: TBytes; { raw file bytes }
|
||||
FTxZBuf: TBytes; { zlib-compressed staging }
|
||||
FTxFrameBuf: TBytes; { final on-wire frame body }
|
||||
{ Cached read offset. We read sequentially from the TX
|
||||
stream; assigning Stream.Position on every frame is an
|
||||
lseek() syscall on TFileStream. Track our own offset
|
||||
and only assign Stream.Position on Start / RPOS. }
|
||||
FTxStreamPos: Int64;
|
||||
{ RPOS loop-detection: peer keeps sending RPOS to the
|
||||
same offset means the file is corrupt or the link is
|
||||
bad enough to be unrecoverable. }
|
||||
@@ -472,6 +487,17 @@ begin
|
||||
FTxSeq := 0;
|
||||
FTxAcked := 0;
|
||||
FTxAckedPos := 0;
|
||||
{ Same per-file scratch alloc + position reset that
|
||||
StartNextOutbound does -- FREQ-injected sends share the
|
||||
same EmitData hot path. }
|
||||
if Length(FTxRawBuf) < Integer(FBlockSize) then
|
||||
SetLength(FTxRawBuf, FBlockSize);
|
||||
if Length(FTxZBuf) < Integer(CMZlibBound(FBlockSize)) then
|
||||
SetLength(FTxZBuf, CMZlibBound(FBlockSize));
|
||||
if Length(FTxFrameBuf) < Integer(FBlockSize) + 8 then
|
||||
SetLength(FTxFrameBuf, FBlockSize + 8);
|
||||
FTxItem.Stream.Position := 0;
|
||||
FTxStreamPos := 0;
|
||||
EmitFinfo(FTxItem);
|
||||
FTxState := txAwaitFinfoAck;
|
||||
Log(llInfo, Format('FREQ: serving %s (%d bytes)',
|
||||
@@ -757,6 +783,7 @@ begin
|
||||
end;
|
||||
|
||||
FTxItem.Stream.Position := RPos;
|
||||
FTxStreamPos := RPos;
|
||||
FTxPos := RPos;
|
||||
FTxSeq := 0;
|
||||
FTxAcked := 0;
|
||||
@@ -772,16 +799,14 @@ end;
|
||||
|
||||
procedure TCometXfer.EmitData;
|
||||
var
|
||||
RawBuf: TBytes; { uncompressed payload bytes }
|
||||
Got: LongInt;
|
||||
Got: LongInt;
|
||||
ChunkLen: Integer;
|
||||
UseZlib: Boolean;
|
||||
ZBuf: TBytes; { zlib-compressed staging }
|
||||
ZLen: Cardinal;
|
||||
ZRes: TCometZlibResult;
|
||||
Frame: TBytes; { final on-wire frame body }
|
||||
HdrLen: Integer; { 4 (no zlib) or 5 (zlib enabled) }
|
||||
MaxRaw: Integer; { max raw bytes that fit a frame }
|
||||
UseZlib: Boolean;
|
||||
ZLen: Cardinal;
|
||||
ZRes: TCometZlibResult;
|
||||
HdrLen: Integer; { 4 (no zlib) or 5 (zlib enabled) }
|
||||
MaxRaw: Integer; { max raw bytes that fit a frame }
|
||||
FrameLen: Integer; { final body length we emit }
|
||||
begin
|
||||
if FTxItem.Stream = nil then Exit;
|
||||
UseZlib := (FSharedCaps and COPT_ZLIB) <> 0;
|
||||
@@ -801,9 +826,11 @@ begin
|
||||
ChunkLen := Integer(FTxItem.Size - FTxPos);
|
||||
if ChunkLen <= 0 then Exit;
|
||||
|
||||
SetLength(RawBuf, ChunkLen);
|
||||
FTxItem.Stream.Position := FTxPos;
|
||||
Got := FTxItem.Stream.Read(RawBuf[0], ChunkLen);
|
||||
{ Sequential read from the file stream. StartNextOutbound
|
||||
+ HandleRpos are the only places that re-Seek the stream;
|
||||
in steady state the stream's position already equals
|
||||
FTxStreamPos (= FTxPos), so we can skip the lseek. }
|
||||
Got := FTxItem.Stream.Read(FTxRawBuf[0], ChunkLen);
|
||||
if Got <= 0 then
|
||||
begin
|
||||
Log(llError, Format('TX read failed at %d on %s',
|
||||
@@ -811,42 +838,40 @@ begin
|
||||
CloseTxFile(False, cmSendFailIO);
|
||||
Exit;
|
||||
end;
|
||||
if Got < ChunkLen then
|
||||
SetLength(RawBuf, Got);
|
||||
Inc(FTxStreamPos, Got);
|
||||
|
||||
if UseZlib then
|
||||
begin
|
||||
SetLength(ZBuf, CMZlibBound(Length(RawBuf)));
|
||||
ZLen := Length(ZBuf);
|
||||
ZRes := CMZlibCompress(RawBuf[0], Length(RawBuf),
|
||||
ZBuf[0], ZLen);
|
||||
if ZRes = cmZlibOK then
|
||||
ZLen := Length(FTxZBuf);
|
||||
ZRes := CMZlibCompress(FTxRawBuf[0], Got,
|
||||
FTxZBuf[0], ZLen);
|
||||
if (ZRes = cmZlibOK) and (Integer(ZLen) < Got) then
|
||||
begin
|
||||
{ Compressed AND smaller -- send with comp_type=1. }
|
||||
SetLength(Frame, HdrLen + Integer(ZLen));
|
||||
PutLE32(@Frame[0], LongWord(FTxPos));
|
||||
Frame[4] := 1; { comp_type = ZLIB }
|
||||
Move(ZBuf[0], Frame[5], ZLen);
|
||||
PutLE32(@FTxFrameBuf[0], LongWord(FTxPos));
|
||||
FTxFrameBuf[4] := 1; { comp_type = ZLIB }
|
||||
Move(FTxZBuf[0], FTxFrameBuf[5], ZLen);
|
||||
FrameLen := HdrLen + Integer(ZLen);
|
||||
end
|
||||
else
|
||||
begin
|
||||
{ No gain or buffer issue -- send raw with comp_type=0. }
|
||||
SetLength(Frame, HdrLen + Length(RawBuf));
|
||||
PutLE32(@Frame[0], LongWord(FTxPos));
|
||||
Frame[4] := 0; { comp_type = NONE }
|
||||
Move(RawBuf[0], Frame[5], Length(RawBuf));
|
||||
PutLE32(@FTxFrameBuf[0], LongWord(FTxPos));
|
||||
FTxFrameBuf[4] := 0; { comp_type = NONE }
|
||||
Move(FTxRawBuf[0], FTxFrameBuf[5], Got);
|
||||
FrameLen := HdrLen + Got;
|
||||
end;
|
||||
end
|
||||
else
|
||||
begin
|
||||
SetLength(Frame, 4 + Length(RawBuf));
|
||||
PutLE32(@Frame[0], LongWord(FTxPos));
|
||||
Move(RawBuf[0], Frame[4], Length(RawBuf));
|
||||
PutLE32(@FTxFrameBuf[0], LongWord(FTxPos));
|
||||
Move(FTxRawBuf[0], FTxFrameBuf[4], Got);
|
||||
FrameLen := 4 + Got;
|
||||
end;
|
||||
|
||||
FSession.EmitFrame(NPKT_DATA, FTxSeq, Frame[0], Length(Frame));
|
||||
FSession.EmitFrame(NPKT_DATA, FTxSeq, FTxFrameBuf[0], FrameLen);
|
||||
Inc(FTxSeq);
|
||||
FTxPos := FTxPos + Length(RawBuf);
|
||||
FTxPos := FTxPos + Got;
|
||||
end;
|
||||
|
||||
{ ---- TX state transitions ---- }
|
||||
@@ -889,6 +914,24 @@ begin
|
||||
FTxSeq := 0;
|
||||
FTxAcked := 0;
|
||||
FTxAckedPos := 0;
|
||||
{ Pre-allocate the per-file scratch buffers exactly once.
|
||||
FBlockSize is the max raw payload + small headroom.
|
||||
CMZlibBound covers worst-case zlib expansion. Reused
|
||||
across every DATA frame for this file. }
|
||||
if Length(FTxRawBuf) < Integer(FBlockSize) then
|
||||
SetLength(FTxRawBuf, FBlockSize);
|
||||
if Length(FTxZBuf) < Integer(CMZlibBound(FBlockSize)) then
|
||||
SetLength(FTxZBuf, CMZlibBound(FBlockSize));
|
||||
if Length(FTxFrameBuf) < Integer(FBlockSize) + 8 then
|
||||
SetLength(FTxFrameBuf, FBlockSize + 8);
|
||||
{ Sync our cached read offset with the stream's position.
|
||||
Provider returns a fresh stream positioned at 0; record
|
||||
that so EmitData can skip the per-frame Position setter. }
|
||||
if FTxItem.Stream <> nil then
|
||||
begin
|
||||
FTxItem.Stream.Position := 0;
|
||||
FTxStreamPos := 0;
|
||||
end;
|
||||
EmitFinfo(FTxItem);
|
||||
FTxState := txAwaitFinfoAck;
|
||||
end;
|
||||
@@ -1088,7 +1131,10 @@ begin
|
||||
FTxStart := Int64(Off);
|
||||
FTxAckedPos := Int64(Off);
|
||||
if FTxItem.Stream <> nil then
|
||||
begin
|
||||
FTxItem.Stream.Position := FTxPos;
|
||||
FTxStreamPos := FTxPos;
|
||||
end;
|
||||
FTxState := txData;
|
||||
Log(llDebug, Format('peer accepted %s @ %d',
|
||||
[FTxItem.Name, FTxPos]));
|
||||
@@ -1264,7 +1310,10 @@ begin
|
||||
FTxAcked := 0;
|
||||
FTxAckedPos := 0;
|
||||
if FTxItem.Stream <> nil then
|
||||
begin
|
||||
FTxItem.Stream.Position := 0;
|
||||
FTxStreamPos := 0;
|
||||
end;
|
||||
FTxState := txData;
|
||||
end;
|
||||
NEOFACK_SKIP: CloseTxFile(False, cmSendFailPeerSkipped);
|
||||
@@ -1278,7 +1327,18 @@ end;
|
||||
Pumps the TX state machine forward when there's window
|
||||
capacity. Returns True if work was done this step. }
|
||||
|
||||
const
|
||||
{ Max DATA frames a single Step() call will emit. See the
|
||||
matching comment in the txData branch below for the rationale.
|
||||
4 was tuned empirically: the test_xfer_continuous bidir test
|
||||
starts to fail at >= 5 (RX side starves), and 1-3 give back
|
||||
~30% of the legacy throughput by paying the Step round-trip
|
||||
too often. }
|
||||
MAX_FRAMES_PER_STEP = 4;
|
||||
|
||||
function TCometXfer.Step: Boolean;
|
||||
var
|
||||
MaxBurst: Integer;
|
||||
begin
|
||||
Result := False;
|
||||
EnsureNegotiated;
|
||||
@@ -1293,9 +1353,36 @@ begin
|
||||
begin
|
||||
if (FTxItem.Stream <> nil) and (FTxPos < FTxItem.Size) then
|
||||
begin
|
||||
if WindowOpen then
|
||||
{ Inner-burst emit, capped at MAX_FRAMES_PER_STEP.
|
||||
|
||||
Legacy cometxfer.pas:751-866 streamed frames in a tight
|
||||
inner loop while the window had room, only yielding when
|
||||
the window closed (waiting on a DATAACK) or the file
|
||||
ended. fpc-comet 0.3.0 inadvertently dropped that loop
|
||||
and emitted one frame per Step() call, which paid the
|
||||
outer-driver round-trip for every 16-64 KB. 10 MB
|
||||
sends regressed from ~2 s (Comet 1.01) to ~7 s, so
|
||||
v0.4.0 restored the inner loop.
|
||||
|
||||
v0.4.1 caps the inner loop at MAX_FRAMES_PER_STEP (4).
|
||||
The cap matters for bidirectional sessions: the per-file
|
||||
scratch-buffer optimization (introduced same release)
|
||||
makes EmitData ~10x cheaper, and Pump only reads ONE
|
||||
inbound frame per call, so an unbounded inner loop
|
||||
produced an 8:1 send/recv ratio per cycle that starved
|
||||
the RX side -- visible in test_xfer_continuous as small
|
||||
inbound files not completing before a concurrent big
|
||||
outbound finished. Cap of 4 keeps the emit-burst
|
||||
per Step long enough to amortize Step's overhead
|
||||
(perf preserved), short enough that two Step calls
|
||||
fill a typical 8-frame window with a Pump in between
|
||||
(bidir fairness preserved). }
|
||||
MaxBurst := MAX_FRAMES_PER_STEP;
|
||||
while (MaxBurst > 0) and WindowOpen and
|
||||
(FTxPos < FTxItem.Size) do
|
||||
begin
|
||||
EmitData;
|
||||
Dec(MaxBurst);
|
||||
Result := True;
|
||||
end;
|
||||
end
|
||||
|
||||
Reference in New Issue
Block a user