<?php namespace Yjtec\LQR\Queue\Connectors; //src/Queue/Connectors use Illuminate\Contracts\Events\Dispatcher; use Illuminate\Queue\Connectors\ConnectorInterface; use Illuminate\Queue\Events\JobFailed; use Illuminate\Queue\Events\WorkerStopping; use Illuminate\Support\Arr; use InvalidArgumentException; use PhpAmqpLib\Connection\AbstractConnection; use PhpAmqpLib\Connection\AMQPLazyConnection; use Yjtec\LQR\Queue\RabbitMQQueue; use Yjtec\LQR\Horizon\RabbitMQQueue as HorizonRabbitMQQueue; class RabbitMQConnector implements ConnectorInterface { /** * @var Dispatcher */ private $dispatcher; public function __construct(Dispatcher $dispatcher) { $this->dispatcher = $dispatcher; } public function connect(array $config) { $connection = $this->createConnection($config); $queue = $this->createQueue( Arr::get($config, 'worker', 'default'), $connection, $config['queue'], Arr::get($config, 'options.queue', []) ); if (! $queue instanceof RabbitMQQueue) { throw new InvalidArgumentException('Invalid worker.'); } if ($queue instanceof HorizonRabbitMQQueue) { $this->dispatcher->listen(JobFailed::class, RabbitMQFailedEvent::class); } $this->dispatcher->listen(WorkerStopping::class, static function () use ($queue): void { $queue->close(); }); return $queue; } protected function createConnection(array $config) { $connection = Arr::get($config, 'connection', AMQPLazyConnection::class); // manually disable heartbeat so long-running tasks will not fail Arr::set($config, 'options.heartbeat', 0); return $connection::create_connection( Arr::shuffle(Arr::get($config, 'hosts', [])), $this->filter(Arr::get($config, 'options', [])) ); } protected function createQueue(string $worker, AbstractConnection $connection, string $queue,array $options=[]) { switch ($worker) { case 'default': return new RabbitMQQueue($connection, $queue,$options); case 'horizon': return new HorizonRabbitMQQueue($connection, $queue,$options); default: return new $worker($connection, $queue,$options); } } private function filter(array $array): array { foreach ($array as $index => &$value) { if (is_array($value)) { $value = $this->filter($value); continue; } // If the value is null then remove it. if ($value === null) { unset($array[$index]); continue; } } return $array; } }