{ test_lock.pas - in-process locking test. Spawns N worker threads that all open the SAME TMessageBase instance for write and append messages concurrently. After the threads join, reopens the base and verifies the total matches N * per-thread count with no corruption. This exercises layer-1 (TRTLCriticalSection per TMessageBase instance) in mb.lock. Cross-process coordination via the sentinel file is covered implicitly: each WriteMessage takes the instance's lock on entry, so concurrent writers serialise. } program test_lock; {$mode objfpc}{$H+} uses {$IFDEF UNIX}cthreads,{$ENDIF} SysUtils, Classes, SyncObjs, testutil, mb.address, mb.types, mb.events, mb.api, mb.fmt.jam, mb.fmt.jam.uni; const SCRATCH = '/tmp/ma_lock'; PATH = SCRATCH + '/shared'; THREADS = 4; PER_THR = 25; type TWriterThread = class(TThread) private FBase: TMessageBase; FTag: integer; protected procedure Execute; override; public constructor Create(ABase: TMessageBase; ATag: integer); end; constructor TWriterThread.Create(ABase: TMessageBase; ATag: integer); begin FBase := ABase; FTag := ATag; FreeOnTerminate := False; inherited Create(False); end; procedure TWriterThread.Execute; var i: integer; msg: TUniMessage; begin for i := 1 to PER_THR do begin msg.Attributes.Clear; msg.Attributes.SetValue('from', 'T' + IntToStr(FTag)); msg.Attributes.SetValue('to', 'All'); msg.Attributes.SetValue('subject', Format('tag=%d seq=%d', [FTag, i])); msg.Attributes.SetDate('date.written', Now); msg.Attributes.SetBool('attr.local', true); msg.Attributes.SetAddr('addr.orig', MakeFTNAddress(1, 1, 1, 0)); msg.Attributes.SetAddr('addr.dest', MakeFTNAddress(1, 1, 2, 0)); msg.Body := Format('body %d/%d', [FTag, i]) + #13; FBase.WriteMessage(msg); end; end; procedure RunConcurrentWriters; var base: TMessageBase; workers: array[0..THREADS - 1] of TWriterThread; i: integer; begin TestBegin(Format('Concurrent writers: %d threads * %d msgs', [THREADS, PER_THR])); ForceDirectories(SCRATCH); if FileExists(PATH + '.jhr') then DeleteFile(PATH + '.jhr'); if FileExists(PATH + '.jdt') then DeleteFile(PATH + '.jdt'); if FileExists(PATH + '.jdx') then DeleteFile(PATH + '.jdx'); if FileExists(PATH + '.jlr') then DeleteFile(PATH + '.jlr'); base := MessageBaseOpen(mbfJam, PATH, momCreate); try AssertTrue('Open', base.Open); for i := 0 to THREADS - 1 do workers[i] := TWriterThread.Create(base, i + 1); for i := 0 to THREADS - 1 do begin workers[i].WaitFor; workers[i].Free; end; AssertEquals('Total messages after concurrent writes', THREADS * PER_THR, base.MessageCount); finally base.Close; base.Free; end; { Reopen and verify structural integrity: every message readable, every expected (tag, seq) pair present. } base := MessageBaseOpen(mbfJam, PATH, momReadOnly); try AssertTrue('Reopen', base.Open); AssertEquals('Count after reopen', THREADS * PER_THR, base.MessageCount); finally base.Close; base.Free; end; TestOK; end; begin WriteLn('fpc-msgbase: in-process locking test'); WriteLn; RunConcurrentWriters; Halt(TestsSummary); end.