NAME Data::PubSub::Shared - High-performance shared-memory pub/sub for Linux SYNOPSIS use Data::PubSub::Shared; # Publisher my $ps = Data::PubSub::Shared::Int->new('/tmp/ps.shm', 1024); $ps->publish(42); $ps->publish_multi(1, 2, 3); # Subscriber (same or different process) my $ps2 = Data::PubSub::Shared::Int->new('/tmp/ps.shm', 1024); my $sub = $ps2->subscribe; # future messages only my $sub2 = $ps2->subscribe_all; # from oldest available # Polling my $val = $sub->poll; # non-blocking, undef if empty my @v = $sub->drain; # all available my $val = $sub->poll_wait(1.5); # blocking with timeout # Callback-based (no per-message method dispatch) $sub->poll_cb(sub { process($_[0]) }); # String variant my $sps = Data::PubSub::Shared::Str->new('/tmp/ps.shm', 1024); $sps->publish("hello world"); # Compact variants (half the memory, same API) my $ps32 = Data::PubSub::Shared::Int32->new(undef, 65536); my $ps16 = Data::PubSub::Shared::Int16->new(undef, 65536); # Multiprocess if (fork() == 0) { my $child = Data::PubSub::Shared::Int->new('/tmp/ps.shm', 1024); my $sub = $child->subscribe; while (defined(my $v = $sub->poll_wait(1))) { print "got: $v\n"; } exit; } $ps->publish(99); wait; DESCRIPTION Broadcast pub/sub over shared memory (mmap(MAP_SHARED)). Publishers write to a ring buffer; each subscriber independently reads with its own cursor. Messages are never consumed -- the ring overwrites old data when it wraps. Slow subscribers auto-recover by resetting to the oldest available position. Linux-only. Requires 64-bit Perl. Features * File-backed, anonymous, or memfd-backed mmap * Lock-free MPMC publish for integer variants * Lock-free subscribers for all variants (seqlock) * Variable-length Str messages (circular arena) * Futex-based blocking poll with timeout * PID-based stale lock recovery (Str) * Batch operations: publish_multi, drain, poll_cb, poll_wait_multi * Per-subscriber overflow counting * Keyword API via XS::Parse::Keyword Variants Data::PubSub::Shared::Int -- int64, 16 bytes/slot Lock-free MPMC publish via atomic fetch-and-add. Seqlock-protected subscribers. Best for counters, timestamps, event IDs. Data::PubSub::Shared::Int32 -- int32, 8 bytes/slot Data::PubSub::Shared::Int16 -- int16, 8 bytes/slot Compact variants -- half the memory, 2x cache density. Same lock-free algorithm. Values silently truncated to type range (C cast semantics). Best for status codes, small enums, sensor readings. Data::PubSub::Shared::Str -- variable-length strings Mutex-protected publish, lock-free subscribers. Messages stored in a circular arena (max capped at "msg_size", default 256 bytes). UTF-8 flag preserved. Best for log lines, JSON, serialized payloads. Int vs Str Int (including Int32/Int16): lock-free, zero contention between publishers. Use when the payload fits in an integer. Str: mutex serializes publishers, but subscribers are still lock-free. Use for arbitrary byte strings. API Constructor my $ps = Data::PubSub::Shared::Int->new($path, $capacity); my $ps = Data::PubSub::Shared::Str->new($path, $capacity); my $ps = Data::PubSub::Shared::Str->new($path, $capacity, $msg_size); $capacity is rounded up to the next power of 2. When opening an existing file, parameters are read from the stored header. Pass "undef" for $path for anonymous (fork-inherited) pub/sub. Replace "Int" with "Int32", "Int16", or "Str" as needed. memfd my $ps = Data::PubSub::Shared::Int->new_memfd($name, $capacity); my $fd = $ps->memfd; my $ps2 = Data::PubSub::Shared::Int->new_from_fd($fd); No filesystem path -- backed by memfd_create(2). Share via fork() inheritance or "SCM_RIGHTS" fd passing. The fd is dup'd internally by "new_from_fd". Publishing $ps->publish($value); # always succeeds my $n = $ps->publish_multi(@values); # batch (max 8192 values) $ps->publish_notify($value); # publish + eventfd notify Int: "publish_multi" claims all slots in one atomic fetch-add, then writes values and wakes subscribers once. Str: holds mutex for the entire batch. Subscribing my $sub = $ps->subscribe; # future messages only my $sub = $ps->subscribe_all; # from oldest available Subscribers are process-local. Each process creates its own. Polling my $val = $sub->poll; # non-blocking my @v = $sub->poll_multi($n); # up to $n my @v = $sub->drain; # all available my @v = $sub->drain($max); # up to $max my $val = $sub->poll_wait; # block forever my $val = $sub->poll_wait($timeout); # block with timeout my @v = $sub->poll_wait_multi($n, $timeout); # block for >=1 Callback Polling my $n = $sub->poll_cb(\&handler); Calls handler($msg) for each available message without returning to Perl between messages. Returns count processed. Event Loop Integration my $fd = $ps->eventfd; # create eventfd $ps->notify; # signal after publish $ps->eventfd_consume; # drain notification counter # Combined: consume eventfd + drain messages my @v = $sub->drain_notify; my @v = $sub->drain_notify($max); # EV example my $w = EV::io $fd, EV::READ, sub { my @msgs = $sub->drain_notify; process($_) for @msgs; }; Subscribers inherit the handle's eventfd at creation time. Use "$sub->eventfd_set($fd)" to set manually after creation. Status my $n = $sub->lag; # messages behind my $n = $sub->overflow_count; # total messages lost to overflow my $o = $sub->has_overflow; # true if currently overflowed my $c = $sub->cursor; # read position $sub->cursor($pos); # seek my $p = $sub->write_pos; # publisher position Cursor Management $sub->reset; # jump to latest (future messages only) $sub->reset_oldest; # jump to oldest available If a subscriber falls behind by more than "capacity" messages, "poll" auto-recovers by resetting to the oldest available position. Lost messages are counted in "overflow_count". Handle Management $ps->clear; # reset ring to initial state $ps->sync; # msync to disk $ps->unlink; # remove backing file Class->unlink($path); # class method form my $p = $ps->path; # undef for anonymous/memfd my $s = $ps->stats; # diagnostic hashref Keyword API use Data::PubSub::Shared::Int; ps_int_publish $ps, $value; my $val = ps_int_poll $sub; my $n = ps_int_lag $sub; Replace "int" with "int32", "int16", or "str". Keywords are lexically scoped. Crash Safety Str mode: futex mutex with PID tracking. If a publisher dies holding the mutex, other publishers recover within 2 seconds. Int modes are lock-free and need no recovery. BENCHMARKS Single-process, 1M items, Linux x86_64. Run "perl -Mblib bench/throughput.pl" to reproduce. PUBLISH + POLL (interleaved) Int 5.0M/s (16 bytes/slot) Int32 5.9M/s (8 bytes/slot) Int16 5.7M/s (8 bytes/slot) Str 2.5M/s (~30B messages) BATCH (100/batch) Int publish_multi: 170M/s Str publish_multi: 42M/s Fan-out: publish throughput is independent of subscriber count. SEE ALSO Data::Queue::Shared, Data::Buffer::Shared, Data::HashMap::Shared AUTHOR vividsnow LICENSE This is free software; you can redistribute it and/or modify it under the same terms as Perl itself.