- ma.api: add per-instance FOpCS critical section to serialise Do* calls (fixes racing writers that dropped 8/100 messages) - .uni adapters: momCreate pre-creates empty format files - example_read/example_write/example_tosser - tests: test_read (samples), test_roundtrip (all 5 storage formats), test_lock (4 threads/100 msgs), test_batch (5 pkts*10 msgs/3 threads) - run_tests.sh: single-command test runner - build.sh: per-target binutils (i386-linux, i386-freebsd12, i386-emx)
140 lines
3.9 KiB
ObjectPascal
140 lines
3.9 KiB
ObjectPascal
{
|
|
example_tosser.pas - minimal concurrent packet tosser.
|
|
|
|
Usage: example_tosser <inbound_dir> <dest_format> <dest_path> [<threads>]
|
|
|
|
Walks *.pkt in <inbound_dir> with N worker threads (default 4),
|
|
reads each message, and writes it into the destination base via
|
|
the unified API. The batch registry caches one destination base
|
|
per path; per-base locks in TMessageBase serialise writes.
|
|
|
|
Example:
|
|
example_tosser /inbound jam /msg/newecho 4
|
|
|
|
Meant as a smoke test / template for real tossers -- feed it a
|
|
directory of captured packets and watch the counts line up.
|
|
}
|
|
|
|
program example_tosser;
|
|
|
|
{$mode objfpc}{$H+}
|
|
|
|
uses
|
|
{$IFDEF UNIX}cthreads,{$ENDIF}
|
|
SysUtils, SyncObjs,
|
|
ma.types, ma.events, ma.api, ma.batch,
|
|
ma.fmt.hudson, ma.fmt.hudson.uni,
|
|
ma.fmt.jam, ma.fmt.jam.uni,
|
|
ma.fmt.squish, ma.fmt.squish.uni,
|
|
ma.fmt.msg, ma.fmt.msg.uni,
|
|
ma.fmt.pkt, ma.fmt.pkt.uni,
|
|
ma.fmt.pcboard, ma.fmt.pcboard.uni,
|
|
ma.fmt.ezycom, ma.fmt.ezycom.uni,
|
|
ma.fmt.goldbase, ma.fmt.goldbase.uni;
|
|
|
|
type
|
|
{ Simple tosser: forwards every message to the single destination
|
|
base the user named on the command line. A real tosser would
|
|
route by area tag. }
|
|
TSimpleTosser = class
|
|
DestFormat: TMsgBaseFormat;
|
|
DestPath: AnsiString;
|
|
Batch: TPacketBatch;
|
|
TotalIn: integer;
|
|
TotalOut: integer;
|
|
CountCS: TRTLCriticalSection;
|
|
procedure OnMessage(const APacketPath: AnsiString;
|
|
var Msg: TUniMessage;
|
|
var Stop: boolean);
|
|
procedure OnLog(Level: TMsgEventType;
|
|
const Source, Msg: AnsiString);
|
|
end;
|
|
|
|
procedure TSimpleTosser.OnMessage(const APacketPath: AnsiString;
|
|
var Msg: TUniMessage;
|
|
var Stop: boolean);
|
|
var
|
|
base: TMessageBase;
|
|
begin
|
|
EnterCriticalSection(CountCS);
|
|
Inc(TotalIn);
|
|
LeaveCriticalSection(CountCS);
|
|
|
|
base := Batch.GetOrCreateBase(DestFormat, DestPath);
|
|
if base = nil then exit;
|
|
if base.WriteMessage(Msg) then begin
|
|
EnterCriticalSection(CountCS);
|
|
Inc(TotalOut);
|
|
LeaveCriticalSection(CountCS);
|
|
end;
|
|
end;
|
|
|
|
procedure TSimpleTosser.OnLog(Level: TMsgEventType;
|
|
const Source, Msg: AnsiString);
|
|
begin
|
|
WriteLn('[', EventTypeToStr(Level), '] ', Source, ': ', Msg);
|
|
end;
|
|
|
|
function ParseFormat(const S: string; out AFormat: TMsgBaseFormat): boolean;
|
|
begin
|
|
Result := True;
|
|
case LowerCase(S) of
|
|
'hudson': AFormat := mbfHudson;
|
|
'jam': AFormat := mbfJam;
|
|
'squish': AFormat := mbfSquish;
|
|
'msg': AFormat := mbfMsg;
|
|
'pcboard': AFormat := mbfPCBoard;
|
|
'ezycom': AFormat := mbfEzyCom;
|
|
'goldbase': AFormat := mbfGoldBase;
|
|
else
|
|
Result := False;
|
|
end;
|
|
end;
|
|
|
|
var
|
|
inbound, fmtName, destPath: string;
|
|
threads: integer;
|
|
tosser: TSimpleTosser;
|
|
processed: longint;
|
|
code: integer;
|
|
begin
|
|
if ParamCount < 3 then begin
|
|
WriteLn('Usage: example_tosser <inbound> <dest_fmt> <dest_path> [threads]');
|
|
Halt(2);
|
|
end;
|
|
|
|
inbound := ParamStr(1);
|
|
fmtName := ParamStr(2);
|
|
destPath := ParamStr(3);
|
|
threads := 4;
|
|
if ParamCount >= 4 then begin
|
|
Val(ParamStr(4), threads, code);
|
|
if code <> 0 then threads := 4;
|
|
end;
|
|
|
|
tosser := TSimpleTosser.Create;
|
|
InitCriticalSection(tosser.CountCS);
|
|
tosser.TotalIn := 0;
|
|
tosser.TotalOut := 0;
|
|
if not ParseFormat(fmtName, tosser.DestFormat) then begin
|
|
WriteLn('bad format: ', fmtName);
|
|
Halt(2);
|
|
end;
|
|
tosser.DestPath := destPath;
|
|
|
|
tosser.Batch := TPacketBatch.Create(inbound, threads);
|
|
tosser.Batch.Processor := @tosser.OnMessage;
|
|
tosser.Batch.Events.OnLog := @tosser.OnLog;
|
|
|
|
processed := tosser.Batch.Run;
|
|
|
|
WriteLn;
|
|
WriteLn('packets processed: ', processed);
|
|
WriteLn('messages read: ', tosser.TotalIn);
|
|
WriteLn('messages written: ', tosser.TotalOut);
|
|
|
|
tosser.Batch.Free;
|
|
DoneCriticalSection(tosser.CountCS);
|
|
tosser.Free;
|
|
end.
|