RabbitMQDataManager::getConnection()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 0
cts 2
cp 0
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 0
crap 2
1
<?php
2
3
namespace UniMan\Drivers\RabbitMQ;
4
5
use Nette\Localization\ITranslator;
6
use PhpAmqpLib\Connection\AMQPStreamConnection;
7
use UniMan\Core\DataManager\AbstractDataManager;
8
use UniMan\Core\Utils\Filter;
9
use UniMan\Core\Utils\Multisort;
10
11
class RabbitMQDataManager extends AbstractDataManager
12
{
13
    private $translator;
14
15
    /** @var AMQPStreamConnection */
16
    private $connection;
17
18
    private $credentials = [];
19
20
    private $client;
21
22
    private $vhost;
23
24 2
    public function __construct(array $credentials, RabbitMQManagementApiClient $client, ITranslator $translator)
25
    {
26 2
        $this->credentials = $credentials;
27 2
        $this->client = $client;
28 2
        $this->translator = $translator;
29 2
    }
30
31
    public function getConnection()
32
    {
33
        return $this->connection;
34
    }
35
36
    public function databases(array $sorting = [])
37
    {
38
        $vhosts = [];
39
        foreach ($this->client->getVhosts($this->credentials['user']) as $vhost) {
40
            $vhosts[$vhost['name']] = [
41
                'vhost' => $vhost['name'],
42
                'queues' => count($this->client->getQueues($vhost['name'])),
43
                'messages' => isset($vhost['messages']) ? $vhost['messages'] : 0,
44
            ];
45
        }
46
        return Multisort::sort($vhosts, $sorting);
47
    }
48
49
    protected function getDatabaseNameColumn()
50
    {
51
        return 'vhost';
52
    }
53
54
    public function selectDatabase($vhost)
55
    {
56
        $this->vhost = $vhost;
57
        $this->connection = new AMQPStreamConnection(
58
            $this->credentials['host'],
59
            $this->credentials['port'],
60
            $this->credentials['user'],
61
            $this->credentials['password'],
62
            $vhost
63
        );
64
        return $this->connection;
65
    }
66
67
    public function tables(array $sorting = [])
68
    {
69
        $tables = [
70
            RabbitMQDriver::TYPE_QUEUE => [],
71
        ];
72
        foreach ($this->client->getQueues($this->vhost) as $queue) {
73
            $tables[RabbitMQDriver::TYPE_QUEUE][$queue['name']] = [
74
                'queue' => $queue['name'],
75
                'number_of_items' => $queue['messages'],
76
                'size' => $queue['message_bytes'],
77
            ];
78
        }
79
        return [
80
            RabbitMQDriver::TYPE_QUEUE => Multisort::sort($tables[RabbitMQDriver::TYPE_QUEUE], $sorting),
81
        ];
82
    }
83
84
    public function itemsCount($type, $table, array $filter = [])
85
    {
86
        if ($type != RabbitMQDriver::TYPE_QUEUE) {
87
            return 0;
88
        }
89
        if (!$filter) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $filter of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
90
            foreach ($this->client->getQueues($this->vhost) as $queue) {
91
                if ($queue['name'] == $table) {
92
                    return $queue['messages'];
93
                }
94
            }
95
        }
96
        $totalCount = 0;
97
        foreach ($this->client->getMessages($this->vhost, $table) as $message) {
98
            $item = [
99
                'message_body' => $message['payload'],
100
                'length' => $message['payload_bytes'],
101
                'is_truncated' => false ? $this->translator->translate('core.yes') : $this->translator->translate('core.no'),
102
                'content_encoding' => $message['payload_encoding'],
103
                'redelivered' => $message['redelivered'] ? $this->translator->translate('core.yes') : $this->translator->translate('core.no'),
104
            ];
105
            if (Filter::apply($item, $filter)) {
106
                $totalCount++;
107
            }
108
        }
109
110
        return $totalCount;
111
    }
112
113
    public function items($type, $table, $page, $onPage, array $filter = [], array $sorting = [])
114
    {
115
        if ($type != RabbitMQDriver::TYPE_QUEUE) {
116
            return [];
117
        }
118
        $items = [];
119
        foreach ($this->client->getMessages($this->vhost, $table) as $message) {
120
            $item = [
121
                'message_body' => $message['payload'],
122
                'length' => $message['payload_bytes'],
123
                'is_truncated' => false ? $this->translator->translate('core.yes') : $this->translator->translate('core.no'),
124
                'content_encoding' => $message['payload_encoding'],
125
                'redelivered' => $message['redelivered'] ? $this->translator->translate('core.yes') : $this->translator->translate('core.no'),
126
            ];
127
            if (!Filter::apply($item, $filter)) {
128
                continue;
129
            }
130
            $items[$message['payload']] = $item;
131
        }
132
        $items = Multisort::sort($items, $sorting);
133
        return array_slice($items, ($page - 1) * $onPage, $onPage, true);
134
    }
135
136
    public function deleteTable($type, $queue)
137
    {
138
        $channel = $this->connection->channel();
139
        $channel->queue_delete($queue);
140
        $channel->close();
141
        $this->connection->close();
142
        return true;
143
    }
144
}
145