Topic::install()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 2
c 1
b 0
f 0
dl 0
loc 5
rs 10
cc 1
nc 1
nop 0
1
<?php
2
3
declare(strict_types=1);
4
5
namespace BinaryCube\CarrotMQ\Entity;
6
7
use Interop\Amqp\AmqpTopic;
8
use Psr\Log\LoggerInterface;
9
use Interop\Amqp\Impl\AmqpBind;
10
use BinaryCube\CarrotMQ\Connection;
11
use BinaryCube\CarrotMQ\Support\Collection;
12
use BinaryCube\CarrotMQ\Exception\Exception;
13
14
use function vsprintf;
15
use function filter_var;
16
use function array_reduce;
17
use function array_filter;
18
use function array_intersect_key;
19
20
/**
21
 * Class Topic
22
 */
23
final class Topic extends Entity
24
{
25
26
    const
27
        TYPE_DIRECT  = AmqpTopic::TYPE_DIRECT,
28
        TYPE_FANOUT  = AmqpTopic::TYPE_FANOUT,
29
        TYPE_TOPIC   = AmqpTopic::TYPE_TOPIC,
30
        TYPE_HEADERS = AmqpTopic::TYPE_HEADERS;
31
32
    /**
33
     * @const array Default exchange parameters
34
     */
35
    const DEFAULTS = [
36
        'type'                         => self::TYPE_DIRECT,
37
38
        'passive'                      => false,
39
        'durable'                      => true,
40
        'auto_delete'                  => false,
41
        'internal'                     => false,
42
        'nowait'                       => false,
43
44
        'arguments'                    => [],
45
46
        'bind'                         => [],
47
48
        'auto_create'                  => true,
49
50
        'throw_exception_on_redeclare' => true,
51
        'throw_exception_on_bind_fail' => true,
52
    ];
53
54
    /**
55
     * Constructor.
56
     *
57
     * @param string               $id
58
     * @param string               $name
59
     * @param Connection           $connection
60
     * @param array                $config
61
     * @param LoggerInterface|null $logger
62
     */
63
    public function __construct(
64
        string $id,
65
        string $name,
66
        Connection $connection,
67
        array $config = [],
68
        ?LoggerInterface $logger = null
69
    ) {
70
        parent::__construct($id, $name, $connection, $config, $logger);
71
72
        $this->config = Collection::make(static::DEFAULTS)->merge($config)->all();
73
    }
74
75
    /**
76
     * @return AmqpTopic
77
     */
78
    public function model(): AmqpTopic
79
    {
80
        return $this->context()->createTopic($this->name());
0 ignored issues
show
Bug Best Practice introduced by
The expression return $this->context()-...ateTopic($this->name()) returns the type Interop\Queue\Topic which includes types incompatible with the type-hinted return Interop\Amqp\AmqpTopic.
Loading history...
81
    }
82
83
    /**
84
     * @return $this
85
     *
86
     * @throws Exception
87
     */
88
    public function create()
89
    {
90
        $exchange = $this->model();
91
92
        $properties = [
93
            'passive'    => AmqpTopic::FLAG_PASSIVE,
94
            'durable'    => AmqpTopic::FLAG_DURABLE,
95
            'autoDelete' => AmqpTopic::FLAG_AUTODELETE,
96
            'internal'   => AmqpTopic::FLAG_INTERNAL,
97
            'nowait'     => AmqpTopic::FLAG_NOWAIT,
98
        ];
99
100
        $flags = array_reduce(
101
            array_intersect_key(
102
                $properties,
103
                array_filter(
104
                    $this->config,
105
                    function ($value) {
106
                        return $value === true;
107
                    }
108
                )
109
            ),
110
            function ($a, $b) {
111
                return ($a | $b);
112
            },
113
            (AmqpTopic::FLAG_NOPARAM)
114
        );
115
116
        $exchange->setType($this->config['type']);
117
        $exchange->setFlags($flags);
118
119
        if (! empty($this->config['arguments'])) {
120
            $exchange->setArguments($this->config['arguments']);
121
        }
122
123
        try {
124
            $this->context()->declareTopic($exchange);
125
        } catch (\Exception $exception) {
126
            if (true === $this->config['throw_exception_on_redeclare']) {
127
                throw new Exception($exception->getMessage(), $exception->getCode());
128
            }
129
        }
130
131
        $this->logger->debug(vsprintf('Topic "%s" ("%s") has been created', [$this->id(), $this->name()]));
132
133
        return $this;
134
    }
135
136
    /**
137
     * @return $this
138
     */
139
    public function delete()
140
    {
141
        $this->context()->deleteTopic($this->model());
142
143
        $this->logger->debug(vsprintf('Topic "%s" ("%s") has been deleted', [$this->id(), $this->name()]));
144
145
        return $this;
146
    }
147
148
    /**
149
     * @return $this
150
     *
151
     * @throws Exception
152
     */
153
    public function bind()
154
    {
155
        if (empty($this->config['bind'])) {
156
            return $this;
157
        }
158
159
        $default = [
160
            'queue'       => '',
161
            'topic'       => '',
162
            'routing_key' => '',
163
        ];
164
165
        foreach ($this->config['bind'] as $bind) {
166
            try {
167
                $bind = Collection::make($default)->merge($bind)->all();
168
169
                if (! empty($bind['queue'])) {
170
                    $queue     = $this->context()->createQueue($bind['queue']);
171
                    $queueBind = new AmqpBind($this->model(), $queue, $bind['routing_key']);
172
173
                    $this->context()->bind($queueBind);
174
                }
175
176
                if (! empty($bind['topic'])) {
177
                    $topic        = $this->context()->createTopic($bind['topic']);
178
                    $exchangeBind = new AmqpBind($this->model(), $topic, $bind['routing_key']);
179
180
                    $this->context()->bind($exchangeBind);
181
                }
182
            } catch (\Exception $exception) {
183
                if (true === $this->config['throw_exception_on_bind_fail']) {
184
                    throw new Exception($exception->getMessage(), $exception->getCode());
185
                }
186
            }//end try
187
        }//end foreach
188
189
        $this->logger->debug(vsprintf('Setup Topic Binds for "%s" - "%s"', [$this->id(), $this->name()]));
190
191
        return $this;
192
    }
193
194
    /**
195
     * @return boolean
196
     */
197
    public function exists(): bool
198
    {
199
        $result = false;
200
201
        try {
202
            $exchange = $this->model();
203
204
            $exchange->setFlags(AmqpTopic::FLAG_PASSIVE);
205
206
            $this->context()->declareTopic($exchange);
207
208
            $result = true;
209
        } catch (\Exception $exception) {
210
            // Do nothing.
211
        }
212
213
        return $result;
214
    }
215
216
    /**
217
     * @return $this
218
     */
219
    public function purge()
220
    {
221
        return $this;
222
    }
223
224
    /**
225
     * @return boolean
226
     */
227
    public function canAutoCreate(): bool
228
    {
229
        return filter_var($this->config['auto_create'], FILTER_VALIDATE_BOOLEAN);
230
    }
231
232
    /**
233
     * @return $this
234
     *
235
     * @throws Exception
236
     */
237
    public function install()
238
    {
239
        $this->create()->bind();
240
241
        return $this;
242
    }
243
244
}
245