Completed
Push — master ( 4bcd21...430cdc )
by Dmitry
12:00
created

PublishToQueueListener::handle()   A

Complexity

Conditions 3
Paths 4

Size

Total Lines 13
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
dl 0
loc 13
c 1
b 0
f 0
rs 9.4285
cc 3
eloc 8
nc 4
nop 1
1
<?php
2
3
namespace hiapi\event;
4
5
use League\Event\AbstractListener;
6
use League\Event\EventInterface;
7
use PhpAmqpLib\Connection\AMQPStreamConnection;
8
use PhpAmqpLib\Channel\AMQPChannel;
9
use PhpAmqpLib\Message\AMQPMessage;
10
use Psr\Log\LoggerInterface;
11
use yii\base\InvalidConfigException;
12
13
/**
14
 * Class PublishToQueueListener
15
 *
16
 * @author Dmytro Naumenko <[email protected]>
17
 */
18
class PublishToQueueListener extends AbstractListener
19
{
20
    /**
21
     * @var AMQPStreamConnection
22
     */
23
    protected $amqp;
24
25
    /**
26
     * @var LoggerInterface
27
     */
28
    private $logger;
29
30
    /**
31
     * @var AMQPChannel
32
     */
33
    protected $channel;
34
35
    /**
36
     * @var string the queue name for the published messages
37
     */
38
    public $queue;
39
40
    public function __construct(AMQPStreamConnection $amqp, LoggerInterface $logger)
41
    {
42
        $this->amqp = $amqp;
43
        $this->logger = $logger;
44
    }
45
46
    /**
47
     * Handle an event.
48
     * @param EventInterface $event
49
     * @return void
50
     */
51
    public function handle(EventInterface $event): void
52
    {
53
        if ($this->queue === null) {
54
            throw new \RuntimeException('Property PublishToQueueListener::queue must be set');
55
        }
56
57
        try {
58
            $message = $this->createMessage($event);
59
            $this->getChannel()->basic_publish($message, '', $this->queue);
60
        } catch (InvalidConfigException $exception) {
61
            $this->logger->critical($exception->getMessage());
62
        }
63
    }
64
65
    protected function getChannel(): AMQPChannel
66
    {
67
        if ($this->channel === null) {
68
            $this->channel = $channel = $this->amqp->channel();
69
            $channel->queue_declare($this->queue, false, true, false, false);
70
        }
71
72
        return $this->channel;
73
    }
74
75
    private function createMessage($event): AMQPMessage
76
    {
77
        if (!$event instanceof \JsonSerializable) {
78
            throw new InvalidConfigException('Event "' . get_class($event) . '" can not be sent to queue');
79
        }
80
81
        return new AMQPMessage(json_encode($event->jsonSerialize(), JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES), [
82
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
83
            'content_type' => 'application/json',
84
        ]);
85
    }
86
}
87