Completed
Push — master ( a4a48b...00b680 )
by Tomas
26:02 queued 10:53
created

RabbitMqDriver::setupPriorityQueue()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 0
cts 3
cp 0
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 2
crap 2
1
<?php
2
declare(strict_types=1);
3
4
namespace Tomaj\Hermes\Driver;
5
6
use Closure;
7
use Exception;
8
use Tomaj\Hermes\Dispatcher;
9
use Tomaj\Hermes\MessageInterface;
10
use Tomaj\Hermes\MessageSerializer;
11
use PhpAmqpLib\Message\AMQPMessage;
12
use PhpAmqpLib\Channel\AMQPChannel;
13
14
/**
15
 * Class RabbitMqDriver
16
 * @package Tomaj\Hermes\Driver
17
 *
18
 * @deprecated use LazyRabbitMqDriver instead
19
 */
20
class RabbitMqDriver implements DriverInterface
21
{
22
    use SerializerAwareTrait;
23
24
    /**
25
     * @var AMQPChannel
26
     */
27
    private $channel;
28
29
    /**
30
     * @var string
31
     */
32
    private $queue;
33
34
    /**
35
     * @var array
36
     */
37 3
    private $amqpMessageProperties = [];
38
39 3
    /**
40 3
     * Create new RabbitMqDriver with provided channel.
41 3
     *
42 3
     * You have to create connection to rabbit, and setup queue outside of this class.
43
     * Handling connection to rabbit is up to you and you have to manage it.
44
     *
45
     * @see examples/rabbitmq folder
46
     *
47 3
     * @param AMQPChannel   $channel
48
     * @param string        $queue
49 3
     * @param array         $amqpMessageProperties
50 3
     */
51 3
    public function __construct(AMQPChannel $channel, string $queue, array $amqpMessageProperties = [])
52
    {
53
        $this->channel = $channel;
54
        $this->queue = $queue;
55
        $this->amqpMessageProperties = $amqpMessageProperties;
56
        $this->serializer = new MessageSerializer();
57
    }
58
59
    /**
60
     * {@inheritdoc}
61
     */
62
    public function send(MessageInterface $message, int $priority = Dispatcher::PRIORITY_MEDIUM): bool
63
    {
64
        $rabbitMessage = new AMQPMessage($this->serializer->serialize($message), $this->amqpMessageProperties);
65
        $this->channel->basic_publish($rabbitMessage, '', $this->queue);
66
        return true;
67
    }
68
69
    public function setupPriorityQueue(string $name, int $priority): void
70
    {
71
        throw new \Exception("AmazonSQS is not supporting priority queues now");
72
    }
73
74
    /**
75
     * {@inheritdoc}
76
     */
77
    public function wait(Closure $callback, array $priorities): void
78
    {
79
        $this->channel->basic_consume(
80
            $this->queue,
81
            '',
82
            false,
83
            false,
84
            false,
85
            false,
86
            function ($rabbitMessage) use ($callback) {
87
                $message = $this->serializer->unserialize($rabbitMessage->body);
88
                $callback($message);
89
                $rabbitMessage->delivery_info['channel']->basic_ack($rabbitMessage->delivery_info['delivery_tag']);
90
            }
91
        );
92
93
        while (count($this->channel->callbacks)) {
94
            $this->channel->wait();
95
        }
96
    }
97
}
98