Documentation

QueueManager
in package

FinalYes

Main QueueManager.

  • Build bus from provided handlers (simple auto-register provided handlers).
  • Manage drivers.
  • Dispatch tasks (store to driver).
  • runWorker() polls driver and dispatches to bus; handles retry with exponential backoff.

Table of Contents

Properties

$bus  : MessageBusInterface
$config  : QueueConfig
$dispatcher  : EventDispatcher
$drivers  : array<string, MessageDriverInterface>
$serializer  : SerializerInterface

Methods

__construct()  : mixed
count()  : int
Count messages in driver
dispatch()  : string
Dispatch a task into given driver (store it).
getDriver()  : MessageDriverInterface
isDriverOnline()  : bool
processScheduler()  : void
Register a ScheduleProviderInterface (Symfony Scheduler) programmatically.
runWorker()  : void
Run worker loop which: - polls the given driver - skips messages whose available_at > now (delayed) - dispatches to the MessageBus - on exception applies retry policy and requeues with exponential backoff
transferAll()  : array<string, string>
Transfer all messages between drivers (with optional transform callback).
createDriverFromDsn()  : MessageDriverInterface
Create driver based on DSN. Supported: redis://..., doctrine:// or mysql://

Properties

Methods

__construct()

public __construct(QueueConfig|array<string|int, mixed> $cfg[, array<string, callable> $handlers = [] ]) : mixed
Parameters
$cfg : QueueConfig|array<string|int, mixed>
$handlers : array<string, callable> = []

Map of message class => handler callable

count()

Count messages in driver

public count([string $driver = 'default' ]) : int
Parameters
$driver : string = 'default'
Return values
int

dispatch()

Dispatch a task into given driver (store it).

public dispatch(TaskInterface $task[, string $driver = 'default' ][, bool $prepend = false ]) : string
Parameters
$task : TaskInterface
$driver : string = 'default'
$prepend : bool = false
Return values
string

isDriverOnline()

public isDriverOnline([string $driver = 'default' ]) : bool
Parameters
$driver : string = 'default'
Return values
bool

processScheduler()

Register a ScheduleProviderInterface (Symfony Scheduler) programmatically.

public processScheduler([string $driver = 'default' ]) : void

Accept any ScheduleProviderInterface implementation (or add providers via config).

Parameters
$driver : string = 'default'

runWorker()

Run worker loop which: - polls the given driver - skips messages whose available_at > now (delayed) - dispatches to the MessageBus - on exception applies retry policy and requeues with exponential backoff

public runWorker([string $driver = 'default' ][, array{sleep_ms?: int, limit?: int} $opts = [] ]) : void

This is a simple single-process worker. Use Supervisor/systemd to run many processes.

Parameters
$driver : string = 'default'
$opts : array{sleep_ms?: int, limit?: int} = []

transferAll()

Transfer all messages between drivers (with optional transform callback).

public transferAll(string $from, string $to[, bool $prepend = false ][, callable|null $transform = null ]) : array<string, string>
Parameters
$from : string
$to : string
$prepend : bool = false
$transform : callable|null = null

(array $envelope): array

Return values
array<string, string>

map oldId => newId

createDriverFromDsn()

Create driver based on DSN. Supported: redis://..., doctrine:// or mysql://

private createDriverFromDsn(string $dsn[, array<string|int, mixed> $options = [] ]) : MessageDriverInterface
Parameters
$dsn : string
$options : array<string|int, mixed> = []
Return values
MessageDriverInterface
On this page

Search results