Issues (26)

src/Entity/Queue.php (1 issue)

1
<?php
2
3
declare(strict_types=1);
4
5
namespace BinaryCube\CarrotMQ\Entity;
6
7
use Psr\Log\LoggerInterface;
8
use Interop\Amqp\AmqpQueue;
9
use Interop\Amqp\AmqpContext;
10
use Interop\Amqp\Impl\AmqpBind;
11
use BinaryCube\CarrotMQ\Connection;
12
use BinaryCube\CarrotMQ\Support\Collection;
13
use BinaryCube\CarrotMQ\Exception\Exception;
14
15
use function vsprintf;
16
use function filter_var;
17
use function array_filter;
18
use function array_reduce;
19
use function array_intersect_key;
20
21
/**
22
 * Class Queue
23
 */
24
final class Queue extends Entity
25
{
26
27
    /**
28
     * @const array Default queue parameters
29
     */
30
    const DEFAULTS = [
31
        'passive'                      => false,
32
        'durable'                      => false,
33
        'exclusive'                    => false,
34
        'auto_delete'                  => false,
35
        'nowait'                       => false,
36
37
        'arguments'                    => [],
38
39
        'bind'                         => [],
40
41
        'auto_create'                  => true,
42
43
        'throw_exception_on_redeclare' => true,
44
        'throw_exception_on_bind_fail' => true,
45
    ];
46
47
    /**
48
     * Constructor.
49
     *
50
     * @param string               $id
51
     * @param string               $name
52
     * @param Connection           $connection
53
     * @param array                $config
54
     * @param LoggerInterface|null $logger
55
     */
56
    public function __construct(
57
        string $id,
58
        string $name,
59
        Connection $connection,
60
        array $config = [],
61
        ?LoggerInterface $logger = null
62
    ) {
63
        parent::__construct($id, $name, $connection, $config, $logger);
64
65
        $this->config = Collection::make(static::DEFAULTS)->merge($config)->all();
66
    }
67
68
    /**
69
     * @return AmqpQueue
70
     */
71
    public function model(): AmqpQueue
72
    {
73
        return $this->context()->createQueue($this->name());
0 ignored issues
show
Bug Best Practice introduced by
The expression return $this->context()-...ateQueue($this->name()) returns the type Interop\Queue\Queue which includes types incompatible with the type-hinted return Interop\Amqp\AmqpQueue.
Loading history...
74
    }
75
76
    /**
77
     * @return $this
78
     *
79
     * @throws Exception
80
     */
81
    public function create()
82
    {
83
        $queue = $this->model();
84
85
        $properties = [
86
            'passive'     => AmqpQueue::FLAG_PASSIVE,
87
            'durable'     => AmqpQueue::FLAG_DURABLE,
88
            'exclusive'   => AmqpQueue::FLAG_EXCLUSIVE,
89
            'auto_delete' => AmqpQueue::FLAG_AUTODELETE,
90
            'nowait'      => AmqpQueue::FLAG_NOWAIT,
91
        ];
92
93
        $flags = array_reduce(
94
            array_intersect_key(
95
                $properties,
96
                array_filter(
97
                    $this->config,
98
                    function ($value) {
99
                        return $value === true;
100
                    }
101
                )
102
            ),
103
            function ($a, $b) {
104
                return ($a | $b);
105
            },
106
            (AmqpQueue::FLAG_NOPARAM)
107
        );
108
109
        $queue->setFlags($flags);
110
111
        if (! empty($this->config['arguments'])) {
112
            $queue->setArguments($this->config['arguments']);
113
        }
114
115
        try {
116
            $this->context()->declareQueue($queue);
117
        } catch (\Exception $exception) {
118
            if (true === $this->config['throw_exception_on_redeclare']) {
119
                throw new Exception($exception->getMessage(), $exception->getCode());
120
            }
121
        }
122
123
        $this->logger->debug(vsprintf('Queue "%s" ("%s") has been created', [$this->id(), $this->name()]));
124
125
        return $this;
126
    }
127
128
    /**
129
     * @return $this
130
     */
131
    public function delete()
132
    {
133
        $this->context()->deleteQueue($this->model());
134
135
        $this->logger->debug(vsprintf('Queue "%s" ("%s") has been deleted', [$this->id(), $this->name()]));
136
137
        return $this;
138
    }
139
140
    /**
141
     * @return $this
142
     *
143
     * @throws Exception
144
     */
145
    public function bind()
146
    {
147
        if (empty($this->config['bind'])) {
148
            return $this;
149
        }
150
151
        $default = [
152
            'topic'       => '',
153
            'routing_key' => '',
154
        ];
155
156
        foreach ($this->config['bind'] as $bind) {
157
            try {
158
                $bind = Collection::make($default)->merge($bind)->all();
159
160
                if (empty($bind['topic'])) {
161
                    return $this;
162
                }
163
164
                $exchange = $this->context()->createTopic($bind['topic']);
165
                $bind     = new AmqpBind($this->model(), $exchange, $bind['routing_key']);
166
167
                $this->context()->bind($bind);
168
            } catch (\Exception $exception) {
169
                if (true === $this->config['throw_exception_on_bind_fail']) {
170
                    throw new Exception($exception->getMessage(), $exception->getCode());
171
                }
172
            }//end try
173
        }//end foreach
174
175
        $this->logger->debug(vsprintf('Setup Queue Binds for "%s" ("%s")', [$this->id(), $this->name()]));
176
177
        return $this;
178
    }
179
180
    /**
181
     * @return boolean
182
     */
183
    public function exists(): bool
184
    {
185
        $result = false;
186
187
        try {
188
            $queue = $this->model();
189
190
            $queue->setFlags(AmqpQueue::FLAG_PASSIVE);
191
192
            $this->context()->deleteQueue($queue);
193
194
            $result = true;
195
        } catch (\Exception $exception) {
196
            // Do nothing.
197
        }
198
199
        return $result;
200
    }
201
202
    /**
203
     * @return $this
204
     */
205
    public function purge()
206
    {
207
        try {
208
            $this->context()->purgeQueue($this->model());
209
        } catch (\Exception $exception) {
210
            // Do nothing.
211
        }
212
213
        $this->logger->debug(vsprintf('Queue "%s"("%s") has been purged', [$this->id(), $this->name()]));
214
215
        return $this;
216
    }
217
218
    /**
219
     * @return boolean
220
     */
221
    public function canAutoCreate(): bool
222
    {
223
        return filter_var($this->config['auto_create'], FILTER_VALIDATE_BOOLEAN);
224
    }
225
226
    /**
227
     * @return $this
228
     *
229
     * @throws Exception
230
     */
231
    public function install()
232
    {
233
        $this->create()->bind();
234
235
        return $this;
236
    }
237
238
}
239