QueueManager
in package
Main QueueManager.
- Build bus from provided handlers (simple auto-register provided handlers).
- Manage drivers.
- Dispatch tasks (store to driver).
- runWorker() polls driver and dispatches to bus; handles retry with exponential backoff.
Table of Contents
Properties
- $bus : MessageBusInterface
- $config : QueueConfig
- $dispatcher : EventDispatcher
- $drivers : array<string, MessageDriverInterface>
- $serializer : SerializerInterface
Methods
- __construct() : mixed
- count() : int
- Count messages in driver
- dispatch() : string
- Dispatch a task into given driver (store it).
- getDriver() : MessageDriverInterface
- isDriverOnline() : bool
- processScheduler() : void
- Register a ScheduleProviderInterface (Symfony Scheduler) programmatically.
- runWorker() : void
- Run worker loop which: - polls the given driver - skips messages whose available_at > now (delayed) - dispatches to the MessageBus - on exception applies retry policy and requeues with exponential backoff
- transferAll() : array<string, string>
- Transfer all messages between drivers (with optional transform callback).
- createDriverFromDsn() : MessageDriverInterface
- Create driver based on DSN. Supported: redis://..., doctrine:// or mysql://
Properties
$bus
private
MessageBusInterface
$bus
$config
private
QueueConfig
$config
$dispatcher
private
EventDispatcher
$dispatcher
$drivers
private
array<string, MessageDriverInterface>
$drivers
= []
$serializer
private
SerializerInterface
$serializer
Methods
__construct()
public
__construct(QueueConfig|array<string|int, mixed> $cfg[, array<string, callable> $handlers = [] ]) : mixed
Parameters
- $cfg : QueueConfig|array<string|int, mixed>
- $handlers : array<string, callable> = []
-
Map of message class => handler callable
count()
Count messages in driver
public
count([string $driver = 'default' ]) : int
Parameters
- $driver : string = 'default'
Return values
intdispatch()
Dispatch a task into given driver (store it).
public
dispatch(TaskInterface $task[, string $driver = 'default' ][, bool $prepend = false ]) : string
Parameters
- $task : TaskInterface
- $driver : string = 'default'
- $prepend : bool = false
Return values
stringgetDriver()
public
getDriver(string $name) : MessageDriverInterface
Parameters
- $name : string
Return values
MessageDriverInterfaceisDriverOnline()
public
isDriverOnline([string $driver = 'default' ]) : bool
Parameters
- $driver : string = 'default'
Return values
boolprocessScheduler()
Register a ScheduleProviderInterface (Symfony Scheduler) programmatically.
public
processScheduler([string $driver = 'default' ]) : void
Accept any ScheduleProviderInterface implementation (or add providers via config).
Parameters
- $driver : string = 'default'
runWorker()
Run worker loop which: - polls the given driver - skips messages whose available_at > now (delayed) - dispatches to the MessageBus - on exception applies retry policy and requeues with exponential backoff
public
runWorker([string $driver = 'default' ][, array{sleep_ms?: int, limit?: int} $opts = [] ]) : void
This is a simple single-process worker. Use Supervisor/systemd to run many processes.
Parameters
- $driver : string = 'default'
- $opts : array{sleep_ms?: int, limit?: int} = []
transferAll()
Transfer all messages between drivers (with optional transform callback).
public
transferAll(string $from, string $to[, bool $prepend = false ][, callable|null $transform = null ]) : array<string, string>
Parameters
- $from : string
- $to : string
- $prepend : bool = false
- $transform : callable|null = null
-
(array $envelope): array
Return values
array<string, string> —map oldId => newId
createDriverFromDsn()
Create driver based on DSN. Supported: redis://..., doctrine:// or mysql://
private
createDriverFromDsn(string $dsn[, array<string|int, mixed> $options = [] ]) : MessageDriverInterface
Parameters
- $dsn : string
- $options : array<string|int, mixed> = []