Completed
Push — master ( a4a48b...00b680 )
by Tomas
26:02 queued 10:53
created

LazyRabbitMqDriver::setupPriorityQueue()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 0
cts 3
cp 0
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 2
crap 2
1
<?php
2
declare(strict_types=1);
3
4
namespace Tomaj\Hermes\Driver;
5
6
use Closure;
7
use PhpAmqpLib\Channel\AMQPChannel;
8
use PhpAmqpLib\Connection\AMQPLazyConnection;
9
use PhpAmqpLib\Message\AMQPMessage;
10
use Tomaj\Hermes\Dispatcher;
11
use Tomaj\Hermes\MessageInterface;
12
use Tomaj\Hermes\MessageSerializer;
13
use Tomaj\Hermes\Restart\RestartException;
14
15
class LazyRabbitMqDriver implements DriverInterface
16
{
17
    use MaxItemsTrait;
18
    use RestartTrait;
19
    use SerializerAwareTrait;
20
21
    /** @var AMQPLazyConnection */
22
    private $connection;
23
    
24
    /** @var AMQPChannel */
25
    private $channel;
26
27
    /** @var string */
28
    private $queue;
29
30
    /** @var array */
31
    private $amqpMessageProperties = [];
32
33
    /** @var integer */
34
    private $refreshInterval;
35
36
    /** @var string */
37
    private $consumerTag;
38
39
    /**
40
     * @param AMQPLazyConnection $connection
41
     * @param string $queue
42
     * @param array $amqpMessageProperties
43
     * @param int $refreshInterval
44
     * @param string $consumerTag
45
     */
46
    public function __construct(AMQPLazyConnection $connection, string $queue, array $amqpMessageProperties = [], int $refreshInterval = 0, string $consumerTag = 'hermes')
47
    {
48
        $this->connection = $connection;
49
        $this->queue = $queue;
50
        $this->amqpMessageProperties = $amqpMessageProperties;
51
        $this->refreshInterval = $refreshInterval;
52
        $this->consumerTag = $consumerTag;
53
        $this->serializer = new MessageSerializer();
54
    }
55
56
    /**
57
     * {@inheritdoc}
58
     */
59
    public function send(MessageInterface $message, int $priority = Dispatcher::PRIORITY_MEDIUM): bool
60
    {
61
        $rabbitMessage = new AMQPMessage($this->serializer->serialize($message), $this->amqpMessageProperties);
62
        $this->getChannel()->basic_publish($rabbitMessage, '', $this->queue);
63
        return true;
64
    }
65
66
    public function setupPriorityQueue(string $name, int $priority): void
67
    {
68
        throw new \Exception("LazyRabbitMqDriver is not supporting priority queues now");
69
    }
70
71
    /**
72
     * {@inheritdoc}
73
     * @throws RestartException
74
     * @throws \Exception
75
     */
76
    public function wait(Closure $callback, array $priorities = []): void
77
    {
78
        while (true) {
79
            $this->getChannel()->basic_consume(
80
                $this->queue,
81
                $this->consumerTag,
82
                false,
83
                true,
84
                false,
85
                false,
86
                function ($rabbitMessage) use ($callback) {
87
                    $message = $this->serializer->unserialize($rabbitMessage->body);
88
                    $callback($message);
89
                }
90
            );
91
92
            while (count($this->getChannel()->callbacks)) {
93
                $this->getChannel()->wait(null, true);
94
                $this->checkRestart();
95
                if (!$this->shouldProcessNext()) {
96
                    break 2;
97
                }
98
                if ($this->refreshInterval) {
99
                    sleep($this->refreshInterval);
100
                }
101
            }
102
        }
103
104
        $this->getChannel()->close();
105
        $this->connection->close();
106
    }
107
    
108
    private function getChannel(): AMQPChannel
109
    {
110
        if ($this->channel !== null) {
111
            return $this->channel;
112
        }
113
        $this->channel = $this->connection->channel();
114
        $this->channel->queue_declare($this->queue, false, false, false, false);
115
        return $this->channel;
116
    }
117
}
118