RabbitmqDriver   A
last analyzed

Complexity

Total Complexity 9

Size/Duplication

Total Lines 97
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 2

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
wmc 9
lcom 1
cbo 2
dl 0
loc 97
ccs 36
cts 36
cp 1
rs 10
c 0
b 0
f 0

6 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 6 1
A getChannel() 0 4 1
A getTimeout() 0 4 1
A send() 0 13 1
B receive() 0 25 3
A declareQueue() 0 10 2
1
<?php
2
3
namespace Tasksuki\Component\RabbitmqDriver;
4
5
use Throwable;
6
use PhpAmqpLib\Channel\AMQPChannel;
7
use PhpAmqpLib\Channel\AbstractChannel;
8
use PhpAmqpLib\Message\AMQPMessage;
9
use Tasksuki\Component\Driver\DriverInterface;
10
11
/**
12
 * Class RabbitmqDriver
13
 *
14
 * @package Tasksuki\Component\RabbitmqDriver
15
 * @author  Aurimas Niekis <[email protected]>
16
 */
17
class RabbitmqDriver implements DriverInterface
18
{
19
    /**
20
     * @var AMQPChannel
21
     */
22
    private $channel;
23
24
    /**
25
     * @var array
26
     */
27
    private $queuesDeclared;
28
29
    /**
30
     * @var int
31
     */
32
    private $timeout;
33
34 7
    public function __construct(AbstractChannel $channel, int $timeout = 0)
35
    {
36 7
        $this->channel        = $channel;
37 7
        $this->queuesDeclared = [];
38 7
        $this->timeout        = $timeout;
39 7
    }
40
41
    /**
42
     * @return AMQPChannel
43
     */
44 7
    public function getChannel(): AMQPChannel
45
    {
46 7
        return $this->channel;
47
    }
48
49
    /**
50
     * @return int
51
     */
52 5
    public function getTimeout(): int
53
    {
54 5
        return $this->timeout;
55
    }
56
57
    /**
58
     * @inheritdoc
59
     */
60 5
    public function send(string $name, string $message): bool
61
    {
62 5
        $this->declareQueue($name);
63
64 5
        $msg = new AMQPMessage(
65
            $message,
66 5
            ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
67
        );
68
69 5
        $this->getChannel()->basic_publish($msg, '', $name);
70
71 5
        return true;
72
    }
73
74
    /**
75
     * @inheritdoc
76
     */
77 6
    public function receive(string $name, callable $handler)
78
    {
79 6
        $this->declareQueue($name);
80
81 6
        $callback = function (AMQPMessage $message) use ($handler) {
82
            try {
83 6
                $data = $message->body;
84
85 6
                $handler($data);
86
87 5
                $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
88 1
            } catch (Throwable $e) {
89 1
                $message->delivery_info['channel']->basic_reject($message->delivery_info['delivery_tag'], true);
90
91 1
                throw $e;
92
            }
93 6
        };
94
95 6
        $this->getChannel()->basic_qos(null, 1, null);
96 6
        $this->getChannel()->basic_consume($name, '', false, false, false, false, $callback);
97
98 5
        if (count($this->getChannel()->callbacks) > 0) {
99 5
            $this->getChannel()->wait(null, true, $this->getTimeout());
100
        }
101 5
    }
102
103 7
    private function declareQueue(string $name)
104
    {
105 7
        if (isset($this->queuesDeclared[$name])) {
106 5
            return;
107
        }
108
109 7
        $this->getChannel()->queue_declare($name, false, true, false, false);
110
111 7
        $this->queuesDeclared[$name] = true;
112
    }
113
}