- ma.api: add per-instance FOpCS critical section to serialise Do* calls (fixes racing writers that dropped 8/100 messages) - .uni adapters: momCreate pre-creates empty format files - example_read/example_write/example_tosser - tests: test_read (samples), test_roundtrip (all 5 storage formats), test_lock (4 threads/100 msgs), test_batch (5 pkts*10 msgs/3 threads) - run_tests.sh: single-command test runner - build.sh: per-target binutils (i386-linux, i386-freebsd12, i386-emx)
124 lines
3.2 KiB
ObjectPascal
124 lines
3.2 KiB
ObjectPascal
{
|
|
test_lock.pas - in-process locking test.
|
|
|
|
Spawns N worker threads that all open the SAME TMessageBase
|
|
instance for write and append messages concurrently. After
|
|
the threads join, reopens the base and verifies the total
|
|
matches N * per-thread count with no corruption.
|
|
|
|
This exercises layer-1 (TRTLCriticalSection per TMessageBase
|
|
instance) in ma.lock. Cross-process coordination via the
|
|
sentinel file is covered implicitly: each WriteMessage takes
|
|
the instance's lock on entry, so concurrent writers serialise.
|
|
}
|
|
|
|
program test_lock;
|
|
|
|
{$mode objfpc}{$H+}
|
|
|
|
uses
|
|
{$IFDEF UNIX}cthreads,{$ENDIF}
|
|
SysUtils, Classes, SyncObjs,
|
|
testutil,
|
|
ma.types, ma.events, ma.api,
|
|
ma.fmt.jam, ma.fmt.jam.uni;
|
|
|
|
const
|
|
SCRATCH = '/tmp/ma_lock';
|
|
PATH = SCRATCH + '/shared';
|
|
THREADS = 4;
|
|
PER_THR = 25;
|
|
|
|
type
|
|
TWriterThread = class(TThread)
|
|
private
|
|
FBase: TMessageBase;
|
|
FTag: integer;
|
|
protected
|
|
procedure Execute; override;
|
|
public
|
|
constructor Create(ABase: TMessageBase; ATag: integer);
|
|
end;
|
|
|
|
constructor TWriterThread.Create(ABase: TMessageBase; ATag: integer);
|
|
begin
|
|
FBase := ABase;
|
|
FTag := ATag;
|
|
FreeOnTerminate := False;
|
|
inherited Create(False);
|
|
end;
|
|
|
|
procedure TWriterThread.Execute;
|
|
var
|
|
i: integer;
|
|
msg: TUniMessage;
|
|
begin
|
|
for i := 1 to PER_THR do begin
|
|
msg.MsgNum := 0;
|
|
msg.WhoFrom := 'T' + IntToStr(FTag);
|
|
msg.WhoTo := 'All';
|
|
msg.Subject := Format('tag=%d seq=%d', [FTag, i]);
|
|
msg.DateWritten := Now;
|
|
msg.DateReceived:= 0;
|
|
msg.Attr := MSG_ATTR_LOCAL;
|
|
msg.OrigAddr := MakeFTNAddress(1, 1, 1, 0);
|
|
msg.DestAddr := MakeFTNAddress(1, 1, 2, 0);
|
|
msg.Cost := 0;
|
|
msg.Body := Format('body %d/%d', [FTag, i]) + #13;
|
|
msg.AreaTag := '';
|
|
FBase.WriteMessage(msg);
|
|
end;
|
|
end;
|
|
|
|
procedure RunConcurrentWriters;
|
|
var
|
|
base: TMessageBase;
|
|
workers: array[0..THREADS - 1] of TWriterThread;
|
|
i: integer;
|
|
begin
|
|
TestBegin(Format('Concurrent writers: %d threads * %d msgs',
|
|
[THREADS, PER_THR]));
|
|
ForceDirectories(SCRATCH);
|
|
if FileExists(PATH + '.jhr') then DeleteFile(PATH + '.jhr');
|
|
if FileExists(PATH + '.jdt') then DeleteFile(PATH + '.jdt');
|
|
if FileExists(PATH + '.jdx') then DeleteFile(PATH + '.jdx');
|
|
if FileExists(PATH + '.jlr') then DeleteFile(PATH + '.jlr');
|
|
|
|
base := MessageBaseOpen(mbfJam, PATH, momCreate);
|
|
try
|
|
AssertTrue('Open', base.Open);
|
|
|
|
for i := 0 to THREADS - 1 do
|
|
workers[i] := TWriterThread.Create(base, i + 1);
|
|
for i := 0 to THREADS - 1 do begin
|
|
workers[i].WaitFor;
|
|
workers[i].Free;
|
|
end;
|
|
|
|
AssertEquals('Total messages after concurrent writes',
|
|
THREADS * PER_THR, base.MessageCount);
|
|
finally
|
|
base.Close;
|
|
base.Free;
|
|
end;
|
|
|
|
{ Reopen and verify structural integrity: every message readable,
|
|
every expected (tag, seq) pair present. }
|
|
base := MessageBaseOpen(mbfJam, PATH, momReadOnly);
|
|
try
|
|
AssertTrue('Reopen', base.Open);
|
|
AssertEquals('Count after reopen', THREADS * PER_THR, base.MessageCount);
|
|
finally
|
|
base.Close;
|
|
base.Free;
|
|
end;
|
|
TestOK;
|
|
end;
|
|
|
|
begin
|
|
WriteLn('message_api: in-process locking test');
|
|
WriteLn;
|
|
RunConcurrentWriters;
|
|
Halt(TestsSummary);
|
|
end.
|