{ test_batch.pas - concurrent packet batch toss. Generates K synthetic packets with M messages each, runs them through a TPacketBatch with N worker threads writing into a shared destination JAM base, then verifies the destination holds K*M messages (no drops, no duplicates, no corruption). } program test_batch; {$mode objfpc}{$H+} uses {$IFDEF UNIX}cthreads,{$ENDIF} SysUtils, Classes, SyncObjs, testutil, ma.types, ma.events, ma.api, ma.batch, ma.fmt.jam, ma.fmt.jam.uni, ma.fmt.pkt, ma.fmt.pkt.uni; const SCRATCH = '/tmp/ma_batch'; INBOUND = SCRATCH + '/inbound'; DESTBASE = SCRATCH + '/echo'; PACKETS = 5; PER_PKT = 10; THREADS = 3; type TBatchRunner = class DestPath: AnsiString; Batch: TPacketBatch; Written: longint; CS: TRTLCriticalSection; procedure OnMessage(const APacketPath: AnsiString; var Msg: TUniMessage; var Stop: boolean); end; procedure TBatchRunner.OnMessage(const APacketPath: AnsiString; var Msg: TUniMessage; var Stop: boolean); var base: TMessageBase; begin base := Batch.GetOrCreateBase(mbfJam, DestPath); if base = nil then exit; if base.WriteMessage(Msg) then begin EnterCriticalSection(CS); Inc(Written); LeaveCriticalSection(CS); end; end; procedure GeneratePackets; var i, j: integer; pkt: TPktFile; hdr: TPktHeaderInfo; msg: TPktMessage; fname: string; begin ForceDirectories(INBOUND); for i := 1 to PACKETS do begin fname := Format('%s/pkt%2.2d.pkt', [INBOUND, i]); if FileExists(fname) then DeleteFile(fname); hdr := TPktFile.BuildHeaderInfo(1, 1, 1, 0, 1, 1, 2, 0, ''); pkt := TPktFile.CreateNew(fname, hdr); try for j := 1 to PER_PKT do begin FillChar(msg, SizeOf(msg), 0); msg.OrigNode := 1; msg.OrigNet := 1; msg.DestNode := 2; msg.DestNet := 1; msg.Attr := 0; msg.Cost := 0; msg.DateTime := '01 Apr 26 12:00:00'; msg.WhoTo := 'All'; msg.WhoFrom := Format('pkt%d', [i]); msg.Subject := Format('pkt=%d msg=%d', [i, j]); msg.Body := Format('AREA:TEST'#13'body %d/%d', [i, j]) + #13; pkt.WriteMessage(msg); end; pkt.WriteTerminator; finally pkt.Free; end; end; end; procedure RunBatch; var runner: TBatchRunner; processed: longint; base: TMessageBase; begin TestBegin(Format('Batch toss: %d pkts * %d msgs, %d threads', [PACKETS, PER_PKT, THREADS])); { Fresh scratch } ForceDirectories(SCRATCH); if FileExists(DESTBASE + '.jhr') then DeleteFile(DESTBASE + '.jhr'); if FileExists(DESTBASE + '.jdt') then DeleteFile(DESTBASE + '.jdt'); if FileExists(DESTBASE + '.jdx') then DeleteFile(DESTBASE + '.jdx'); if FileExists(DESTBASE + '.jlr') then DeleteFile(DESTBASE + '.jlr'); { Pre-create an empty JAM base so GetOrCreateBase(momReadWrite) has files to open. } base := MessageBaseOpen(mbfJam, DESTBASE, momCreate); try AssertTrue('Pre-create dest base', base.Open); finally base.Close; base.Free; end; GeneratePackets; runner := TBatchRunner.Create; InitCriticalSection(runner.CS); runner.DestPath := DESTBASE; runner.Written := 0; runner.Batch := TPacketBatch.Create(INBOUND, THREADS); runner.Batch.Processor := @runner.OnMessage; processed := runner.Batch.Run; AssertEquals('Packets processed', PACKETS, processed); AssertEquals('Messages written', PACKETS * PER_PKT, runner.Written); runner.Batch.Free; DoneCriticalSection(runner.CS); runner.Free; { Reopen dest base; verify count matches what we claim we wrote. } base := MessageBaseOpen(mbfJam, DESTBASE, momReadOnly); try AssertTrue('Open dest (read)', base.Open); AssertEquals('Dest message count', PACKETS * PER_PKT, base.MessageCount); finally base.Close; base.Free; end; TestOK; end; begin WriteLn('fpc-msgbase: concurrent batch test'); WriteLn; RunBatch; Halt(TestsSummary); end.