Files
fpc-msgbase/examples/example_tosser.pas
Ken Johnson c68a225ad9 Tests + examples: 11 tests passing, full 6-target build matrix
- ma.api: add per-instance FOpCS critical section to serialise Do*
  calls (fixes racing writers that dropped 8/100 messages)
- .uni adapters: momCreate pre-creates empty format files
- example_read/example_write/example_tosser
- tests: test_read (samples), test_roundtrip (all 5 storage formats),
  test_lock (4 threads/100 msgs), test_batch (5 pkts*10 msgs/3 threads)
- run_tests.sh: single-command test runner
- build.sh: per-target binutils (i386-linux, i386-freebsd12, i386-emx)
2026-04-15 08:29:37 -07:00

140 lines
3.9 KiB
ObjectPascal

{
example_tosser.pas - minimal concurrent packet tosser.
Usage: example_tosser <inbound_dir> <dest_format> <dest_path> [<threads>]
Walks *.pkt in <inbound_dir> with N worker threads (default 4),
reads each message, and writes it into the destination base via
the unified API. The batch registry caches one destination base
per path; per-base locks in TMessageBase serialise writes.
Example:
example_tosser /inbound jam /msg/newecho 4
Meant as a smoke test / template for real tossers -- feed it a
directory of captured packets and watch the counts line up.
}
program example_tosser;
{$mode objfpc}{$H+}
uses
{$IFDEF UNIX}cthreads,{$ENDIF}
SysUtils, SyncObjs,
ma.types, ma.events, ma.api, ma.batch,
ma.fmt.hudson, ma.fmt.hudson.uni,
ma.fmt.jam, ma.fmt.jam.uni,
ma.fmt.squish, ma.fmt.squish.uni,
ma.fmt.msg, ma.fmt.msg.uni,
ma.fmt.pkt, ma.fmt.pkt.uni,
ma.fmt.pcboard, ma.fmt.pcboard.uni,
ma.fmt.ezycom, ma.fmt.ezycom.uni,
ma.fmt.goldbase, ma.fmt.goldbase.uni;
type
{ Simple tosser: forwards every message to the single destination
base the user named on the command line. A real tosser would
route by area tag. }
TSimpleTosser = class
DestFormat: TMsgBaseFormat;
DestPath: AnsiString;
Batch: TPacketBatch;
TotalIn: integer;
TotalOut: integer;
CountCS: TRTLCriticalSection;
procedure OnMessage(const APacketPath: AnsiString;
var Msg: TUniMessage;
var Stop: boolean);
procedure OnLog(Level: TMsgEventType;
const Source, Msg: AnsiString);
end;
procedure TSimpleTosser.OnMessage(const APacketPath: AnsiString;
var Msg: TUniMessage;
var Stop: boolean);
var
base: TMessageBase;
begin
EnterCriticalSection(CountCS);
Inc(TotalIn);
LeaveCriticalSection(CountCS);
base := Batch.GetOrCreateBase(DestFormat, DestPath);
if base = nil then exit;
if base.WriteMessage(Msg) then begin
EnterCriticalSection(CountCS);
Inc(TotalOut);
LeaveCriticalSection(CountCS);
end;
end;
procedure TSimpleTosser.OnLog(Level: TMsgEventType;
const Source, Msg: AnsiString);
begin
WriteLn('[', EventTypeToStr(Level), '] ', Source, ': ', Msg);
end;
function ParseFormat(const S: string; out AFormat: TMsgBaseFormat): boolean;
begin
Result := True;
case LowerCase(S) of
'hudson': AFormat := mbfHudson;
'jam': AFormat := mbfJam;
'squish': AFormat := mbfSquish;
'msg': AFormat := mbfMsg;
'pcboard': AFormat := mbfPCBoard;
'ezycom': AFormat := mbfEzyCom;
'goldbase': AFormat := mbfGoldBase;
else
Result := False;
end;
end;
var
inbound, fmtName, destPath: string;
threads: integer;
tosser: TSimpleTosser;
processed: longint;
code: integer;
begin
if ParamCount < 3 then begin
WriteLn('Usage: example_tosser <inbound> <dest_fmt> <dest_path> [threads]');
Halt(2);
end;
inbound := ParamStr(1);
fmtName := ParamStr(2);
destPath := ParamStr(3);
threads := 4;
if ParamCount >= 4 then begin
Val(ParamStr(4), threads, code);
if code <> 0 then threads := 4;
end;
tosser := TSimpleTosser.Create;
InitCriticalSection(tosser.CountCS);
tosser.TotalIn := 0;
tosser.TotalOut := 0;
if not ParseFormat(fmtName, tosser.DestFormat) then begin
WriteLn('bad format: ', fmtName);
Halt(2);
end;
tosser.DestPath := destPath;
tosser.Batch := TPacketBatch.Create(inbound, threads);
tosser.Batch.Processor := @tosser.OnMessage;
tosser.Batch.Events.OnLog := @tosser.OnLog;
processed := tosser.Batch.Run;
WriteLn;
WriteLn('packets processed: ', processed);
WriteLn('messages read: ', tosser.TotalIn);
WriteLn('messages written: ', tosser.TotalOut);
tosser.Batch.Free;
DoneCriticalSection(tosser.CountCS);
tosser.Free;
end.