Files
fpc-msgbase/src/ma.batch.pas
Ken Johnson b79e7fb31d Trim verbose block comments, add docs/API.md, sentinel cleanup on release
- ma.lock.Release now unlinks the .lck sentinel after releasing
  the OS lock (SysUtils.DeleteFile to avoid Windows API collision)
- Unit headers trimmed to standard dev-comment form (purpose only)
- docs/API.md: complete API reference with runnable examples,
  event types, locking semantics, tosser wiring, native-backend
  drop-down pattern, per-format cheat-sheet
- README links to the new doc
2026-04-15 09:34:05 -07:00

334 lines
8.2 KiB
ObjectPascal

{ ma.batch - concurrent packet tosser. Worker pool pulls
filenames from a queue, hands each message to a caller-supplied
processor. GetOrCreateBase caches one TMessageBase per area so
writes serialise through that instance's layer-1 lock. }
unit ma.batch;
{$mode objfpc}{$H+}
interface
uses
Classes, SysUtils,
ma.types, ma.events, ma.api,
ma.fmt.pkt, ma.fmt.pkt.uni;
type
TPacketProcessor = procedure(const APacketPath: AnsiString;
var Msg: TUniMessage;
var Stop: boolean) of object;
TPacketBatch = class;
TPacketWorker = class(TThread)
private
FBatch: TPacketBatch;
protected
procedure Execute; override;
public
constructor Create(ABatch: TPacketBatch);
end;
TBaseRegistryEntry = class
Key: AnsiString;
Base: TMessageBase;
CS: TRTLCriticalSection;
end;
TCountedEvent = class
private
FCount: integer;
FCS: TRTLCriticalSection;
FEvent: PRTLEvent;
public
constructor Create(Initial: integer);
destructor Destroy; override;
procedure Signal;
procedure WaitFor;
end;
TPacketBatch = class
private
FPacketDir: AnsiString;
FPacketMask: AnsiString;
FThreadCount: integer;
FQueue: TStringList;
FQueueCS: TRTLCriticalSection;
FRegistryCS: TRTLCriticalSection;
FRegistry: TStringList; { owns TBaseRegistryEntry in Objects[] }
FEvents: TMessageEvents;
FProcessor: TPacketProcessor;
FStopFlag: boolean;
FWorkersDone: TCountedEvent;
function PopPacket(out Path: AnsiString): boolean;
procedure ProcessPacket(const APath: AnsiString);
public
constructor Create(const APacketDir: AnsiString;
AThreadCount: integer = 4);
destructor Destroy; override;
{ Blocks until the queue is drained. Builds the queue by
scanning APacketDir + FPacketMask once, then launches workers.
Returns the number of packets processed. }
function Run: longint;
{ Request cooperative stop. Workers finish their current
packet then exit. }
procedure RequestStop;
{ Look up an existing base or create + cache one for this
format + path pair. Thread-safe; the returned base is
shared across workers, so callers must serialise writes
against its lock (WriteMessage already does this internally
via the base's in-process critical section). }
function GetOrCreateBase(AFormat: TMsgBaseFormat;
const APath: AnsiString): TMessageBase;
property PacketDir: AnsiString read FPacketDir;
property PacketMask: AnsiString read FPacketMask write FPacketMask;
property ThreadCount: integer read FThreadCount write FThreadCount;
property Processor: TPacketProcessor read FProcessor write FProcessor;
property Events: TMessageEvents read FEvents;
end;
implementation
{ ---------- TCountedEvent ---------- }
constructor TCountedEvent.Create(Initial: integer);
begin
inherited Create;
FCount := Initial;
InitCriticalSection(FCS);
FEvent := RTLEventCreate;
if Initial <= 0 then
RTLEventSetEvent(FEvent);
end;
destructor TCountedEvent.Destroy;
begin
RTLEventDestroy(FEvent);
DoneCriticalSection(FCS);
inherited Destroy;
end;
procedure TCountedEvent.Signal;
begin
EnterCriticalSection(FCS);
try
Dec(FCount);
if FCount <= 0 then
RTLEventSetEvent(FEvent);
finally
LeaveCriticalSection(FCS);
end;
end;
procedure TCountedEvent.WaitFor;
begin
RTLEventWaitFor(FEvent);
end;
{ ---------- TPacketWorker ---------- }
constructor TPacketWorker.Create(ABatch: TPacketBatch);
begin
FBatch := ABatch;
inherited Create(False);
FreeOnTerminate := False;
end;
procedure TPacketWorker.Execute;
var
path: AnsiString;
begin
try
while FBatch.PopPacket(path) do begin
if FBatch.FStopFlag then break;
try
FBatch.ProcessPacket(path);
except
on E: Exception do
FBatch.Events.FireSimple(metPacketError, path, E.Message);
end;
end;
finally
FBatch.FWorkersDone.Signal;
end;
end;
{ ---------- TPacketBatch ---------- }
constructor TPacketBatch.Create(const APacketDir: AnsiString;
AThreadCount: integer);
begin
inherited Create;
FPacketDir := APacketDir;
FPacketMask := '*.pkt';
FThreadCount := AThreadCount;
if FThreadCount < 1 then FThreadCount := 1;
FQueue := TStringList.Create;
FRegistry := TStringList.Create;
FRegistry.Sorted := True;
FRegistry.Duplicates := dupError;
InitCriticalSection(FQueueCS);
InitCriticalSection(FRegistryCS);
FEvents := TMessageEvents.Create(Self);
FStopFlag := False;
FWorkersDone := TCountedEvent.Create(0);
end;
destructor TPacketBatch.Destroy;
var
i: integer;
entry: TBaseRegistryEntry;
begin
for i := 0 to FRegistry.Count - 1 do begin
entry := TBaseRegistryEntry(FRegistry.Objects[i]);
if entry.Base <> nil then
entry.Base.Free;
DoneCriticalSection(entry.CS);
entry.Free;
end;
FRegistry.Free;
FQueue.Free;
DoneCriticalSection(FQueueCS);
DoneCriticalSection(FRegistryCS);
FEvents.Free;
FWorkersDone.Free;
inherited Destroy;
end;
function TPacketBatch.PopPacket(out Path: AnsiString): boolean;
begin
Result := False;
EnterCriticalSection(FQueueCS);
try
if FQueue.Count = 0 then exit;
Path := FQueue[0];
FQueue.Delete(0);
Result := True;
finally
LeaveCriticalSection(FQueueCS);
end;
end;
procedure TPacketBatch.ProcessPacket(const APath: AnsiString);
var
pkt: TPktFile;
msg: TUniMessage;
nat: TPktMessage;
stop: boolean;
begin
FEvents.FireSimple(metPacketStart, APath, '');
try
pkt := TPktFile.Open(APath);
except
on E: Exception do begin
FEvents.FireSimple(metPacketError, APath,
'open failed: ' + E.Message);
exit;
end;
end;
try
stop := False;
while (not stop) and (not pkt.AtEnd) do begin
if not pkt.ReadMessage(nat) then break;
msg := PktToUni(nat);
FEvents.FireSimple(metPacketMessage, APath, msg.Subject);
if Assigned(FProcessor) then
FProcessor(APath, msg, stop);
if FStopFlag then break;
end;
finally
pkt.Free;
FEvents.FireSimple(metPacketEnd, APath, '');
end;
end;
function TPacketBatch.Run: longint;
var
sr: TSearchRec;
dir: AnsiString;
i: integer;
workers: array of TPacketWorker;
startedCount: longint;
begin
FStopFlag := False;
FQueue.Clear;
dir := IncludeTrailingPathDelimiter(FPacketDir);
if FindFirst(dir + FPacketMask, faAnyFile, sr) = 0 then
try
repeat
if (sr.Attr and faDirectory) = 0 then
FQueue.Add(dir + sr.Name);
until FindNext(sr) <> 0;
finally
FindClose(sr);
end;
Result := FQueue.Count;
if Result = 0 then exit;
FWorkersDone.Free;
FWorkersDone := TCountedEvent.Create(FThreadCount);
startedCount := FThreadCount;
SetLength(workers, startedCount);
for i := 0 to startedCount - 1 do
workers[i] := TPacketWorker.Create(Self);
FWorkersDone.WaitFor;
for i := 0 to startedCount - 1 do begin
workers[i].WaitFor;
workers[i].Free;
end;
end;
procedure TPacketBatch.RequestStop;
begin
FStopFlag := True;
end;
function TPacketBatch.GetOrCreateBase(AFormat: TMsgBaseFormat;
const APath: AnsiString): TMessageBase;
var
key: AnsiString;
idx: integer;
entry: TBaseRegistryEntry;
begin
Result := nil;
key := IntToStr(ord(AFormat)) + '|' + APath;
EnterCriticalSection(FRegistryCS);
try
if FRegistry.Find(key, idx) then
entry := TBaseRegistryEntry(FRegistry.Objects[idx])
else begin
entry := TBaseRegistryEntry.Create;
entry.Key := key;
InitCriticalSection(entry.CS);
entry.Base := MessageBaseOpen(AFormat, APath, momReadWrite);
if entry.Base = nil then begin
DoneCriticalSection(entry.CS);
entry.Free;
exit;
end;
if not entry.Base.Open then begin
entry.Base.Free;
DoneCriticalSection(entry.CS);
entry.Free;
exit;
end;
FRegistry.AddObject(key, entry);
end;
Result := entry.Base;
finally
LeaveCriticalSection(FRegistryCS);
end;
end;
end.