Passed
Push — master ( ce0bdd...5bd05b )
by
unknown
51s queued 12s
created

Consumer::consume()   B

Complexity

Conditions 6
Paths 7

Size

Total Lines 34
Code Lines 21

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 42

Importance

Changes 3
Bugs 1 Features 0
Metric Value
cc 6
eloc 21
c 3
b 1
f 0
nc 7
nop 1
dl 0
loc 34
ccs 0
cts 19
cp 0
crap 42
rs 8.9617
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Simple\Queue;
6
7
use Throwable;
8
use LogicException;
9
use RuntimeException;
10
use DateTimeImmutable;
11
use Doctrine\DBAL\Connection;
12
use InvalidArgumentException;
13
use Doctrine\DBAL\Types\Types;
14
use Doctrine\DBAL\Schema\SchemaException;
15
16
/**
17
 * Class Consumer
18
 * @package Simple\Queue
19
 */
20
class Consumer
21
{
22
    /** Use to mark results as successful */
23
    public const STATUS_ACK = 'ACK';
24
25
    /** Use one more try if necessary */
26
    public const STATUS_REJECT = 'REJECT';
27
28
    /** Use in case of a delayed queue */
29
    public const STATUS_REQUEUE = 'REQUEUE';
30
31
    /** @var Connection */
32
    protected Connection $connection;
33
34
    /** @var Producer */
35
    protected Producer $producer;
36
37
    /** @var Config|null */
38
    private ?Config $config;
39
40
    /** @var array */
41
    protected array $processors = [];
42
43
    /**
44
     * Consumer constructor.
45
     * @param Connection $connection
46
     * @param Producer $producer
47
     * @param Config|null $config
48
     */
49 5
    public function __construct(Connection $connection, Producer $producer, ?Config $config = null)
50
    {
51 5
        $this->connection = $connection;
52 5
        $this->producer = $producer;
53 5
        $this->config = $config ?: Config::getDefault();
54 5
    }
55
56
    /**
57
     * Fetch the next message from the queue
58
     *
59
     * @param array $queues
60
     * @return Message|null
61
     */
62
    public function fetchMessage(array $queues = []): ?Message
63
    {
64
        $nowTime = time();
65
        $endAt = microtime(true) + 0.2; // add 200ms
66
67
        $select = $this->connection->createQueryBuilder()
68
            ->select('*')
69
            ->from(QueueTableCreator::getTableName())
70
            ->andWhere('status IN (:statuses)')
71
            ->andWhere('redelivered_at IS NULL OR redelivered_at <= :redeliveredAt')
72
            ->andWhere('exact_time <= :nowTime')
73
            ->addOrderBy('priority', 'asc')
74
            ->addOrderBy('created_at', 'asc')
75
            ->setParameter('redeliveredAt', new DateTimeImmutable('now'), Types::DATETIME_IMMUTABLE)
76
            ->setParameter('statuses', [Status::NEW, Status::REDELIVERED], Connection::PARAM_STR_ARRAY)
77
            ->setParameter('nowTime', $nowTime, Types::INTEGER)
78
            ->setMaxResults(1);
79
80
        if (count($queues)) {
81
            $select
82
                ->where('queue IN (:queues)')
83
                ->setParameter('queues', $queues, Connection::PARAM_STR_ARRAY);
84
        }
85
86
        while (microtime(true) < $endAt) {
87
            try {
88
                $deliveredMessage = $select->execute()->fetchAssociative();
89
90
                if (empty($deliveredMessage)) {
91
                    continue;
92
                }
93
94
                return MessageHydrator::createMessage($deliveredMessage);
95
            } catch (Throwable $e) {
96
                throw new RuntimeException(sprintf('Error reading queue in consumer: "%s".', $e));
97
            }
98
        }
99
100
        return null;
101
    }
102
103
    /**
104
     * Delete message from queue
105
     *
106
     * @param string $id
107
     * @throws \Doctrine\DBAL\Exception
108
     */
109 1
    protected function deleteMessage(string $id): void
110
    {
111 1
        if (empty($id)) {
112
            throw new LogicException(sprintf('Expected record was removed but it is not. Delivery id: "%s".', $id));
113
        }
114
115 1
        $this->connection->delete(
116 1
            QueueTableCreator::getTableName(),
117 1
            ['id' => $id],
118 1
            ['id' => Types::GUID]
119
        );
120 1
    }
121
122
    /**
123
     * Redelivered a message to the queue
124
     *
125
     * @param Message $message
126
     * @return Message
127
     */
128 2
    protected function forRedeliveryMessage(Message $message): Message
129
    {
130 2
        $redeliveredTime = (new DateTimeImmutable('now'))
131 2
            ->modify(sprintf('+%s seconds', $this->config->getRedeliveryTimeInSeconds()));
0 ignored issues
show
Bug introduced by
The method getRedeliveryTimeInSeconds() does not exist on null. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

131
            ->modify(sprintf('+%s seconds', $this->config->/** @scrutinizer ignore-call */ getRedeliveryTimeInSeconds()));

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
132
133 2
        $redeliveredMessage = (new Message($message->getQueue(), $message->getBody()))
134 2
            ->changePriority($message->getPriority())
135 2
            ->setEvent($message->getEvent())
136 2
            ->setRedeliveredAt($message->getRedeliveredAt() ?: $redeliveredTime);
137
138 2
        return (new MessageHydrator($redeliveredMessage))
139 2
            ->changeStatus(
140 2
                ($message->getStatus() === Status::UNDEFINED_HANDLER) ?
141
                    Status::UNDEFINED_HANDLER :
142 2
                    Status::REDELIVERED
143
            )
144 2
            ->getMessage();
145
    }
146
147
    /**
148
     * The message has been successfully processed and will be removed from the queue
149
     *
150
     * @param Message $message
151
     * @throws \Doctrine\DBAL\Exception
152
     */
153 3
    public function acknowledge(Message $message): void
154
    {
155 3
        $this->deleteMessage($message->getId());
156 3
    }
157
158
    /**
159
     * Reject message with requeue option
160
     *
161
     * @param Message $message
162
     * @param bool $requeue
163
     * @throws \Doctrine\DBAL\Exception
164
     */
165 2
    public function reject(Message $message, bool $requeue = false): void
166
    {
167 2
        $this->acknowledge($message);
168
169 2
        if ($requeue) {
170 1
            $this->producer->send($this->forRedeliveryMessage($message));
171
        }
172 2
    }
173
174
    /**
175
     * Registering a processor for queue
176
     *
177
     * @param string $queue
178
     * @param callable $processor
179
     */
180
    public function bind(string $queue, callable $processor): void
181
    {
182
        if (preg_match('/^[0-9a-zA-Z-._]$/mu', $queue) === false) {
183
            throw new InvalidArgumentException(sprintf('The queue "%s" contains invalid characters.', $queue));
184
        }
185
186
        if (isset($this->processors[$queue])) {
187
            throw new RuntimeException(sprintf('Queue "%s" is already registered in the processors.', $queue));
188
        }
189
190
        $this->processors[$queue] = $processor;
191
    }
192
193
    /**
194
     * @param array $queues
195
     * @throws SchemaException
196
     * @throws \Doctrine\DBAL\Exception
197
     */
198
    public function consume(array $queues = []): void
199
    {
200
        (new QueueTableCreator($this->connection))->createDataBaseTable();
201
202
        // TODO: check which queues are binding
203
204
        while (true) {
205
            if ($message = $this->fetchMessage($queues)) {
206
                try {
207
208
                    // TODO: set IN_PROCESS status
209
210
                    $this->processing($message);
211
                } catch (Throwable $throwable) {
212
                    if ($message->getAttempts() >= $this->config->getNumberOfAttemptsBeforeFailure()) {
213
                        $message = (new MessageHydrator($message))
214
                            ->changeStatus(Status::FAILURE)
215
                            ->setError((string)$throwable)
216
                            ->getMessage();
217
                    } else {
218
                        $message = (new MessageHydrator($message))
219
                            ->setError((string)$throwable)
220
                            ->increaseAttempt()
221
                            ->getMessage();
222
                    }
223
                    try {
224
                        $this->reject($message, true);
225
                    } catch (\Doctrine\DBAL\Exception $exception) {
226
                        // maybe lucky later
227
                    }
228
                }
229
                continue;
230
            }
231
            sleep(1);
232
        }
233
    }
234
235
    /**
236
     * @param Message $message
237
     * @throws \Doctrine\DBAL\Exception
238
     */
239
    protected function processing(Message $message): void
240
    {
241
        if ($message->isJob()) {
242
            if ($this->config->hasJob((string)$message->getEvent())) {
243
                $jobClass = $this->config->getJob((string)$message->getEvent());
244
            } elseif ($message->getEvent() && class_exists($message->getEvent())) {
245
                $jobClass = $message->getEvent();
246
            } else {
247
                $this->rejectByError(
248
                    $message,
249
                    sprintf('Could not find job: "%s".', $message->getEvent()),
250
                    Status::FAILURE
251
                );
252
253
                return;
254
            }
255
256
            if (is_a($jobClass, Job::class)) {
257
                /** @var Job $job */
258
                $job = new $job; // TODO: The job can inject custom dependencies
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $job seems to be never defined.
Loading history...
259
                $result = $job->handle($message, $this->producer);
260
261
                $this->processResult($result, $message);
262
263
                return;
264
            }
265
266
            $this->rejectByError(
267
                $message,
268
                sprintf('The job "%s" does not match the required parameters.', $message->getEvent()),
269
                Status::FAILURE
270
            );
271
272
            return;
273
        }
274
275
        if (isset($this->processors[$message->getQueue()])) {
276
            $result = $this->processors[$message->getQueue()]($message, $this->producer);
277
278
            $this->processResult($result, $message);
279
280
            return;
281
        }
282
283
        $this->rejectByError($message, sprintf('Could not find any job or processor.'), Status::UNDEFINED_HANDLER);
284
    }
285
286
    /**
287
     * @param Message $message
288
     * @param string $error
289
     * @param string $status
290
     * @throws \Doctrine\DBAL\Exception
291
     */
292
    protected function rejectByError(Message $message, string $error, string $status): void
293
    {
294
        $redeliveredMessage = (new MessageHydrator($message))
295
            ->changeStatus($status)
296
            ->setError($error)
297
            ->getMessage();
298
299
        $this->reject($redeliveredMessage, true);
300
    }
301
302
    /**
303
     * @param string $result
304
     * @param Message $message
305
     * @throws \Doctrine\DBAL\Exception
306
     */
307
    protected function processResult(string $result, Message $message): void
308
    {
309
        if ($result === self::STATUS_ACK) {
310
            $this->acknowledge($message);
311
312
            return;
313
        }
314
315
        if ($result === self::STATUS_REJECT) {
316
            $this->reject($message);
317
318
            return;
319
        }
320
321
        if ($result === self::STATUS_REQUEUE) {
322
            $this->reject($message, true);
323
324
            return;
325
        }
326
327
        throw new InvalidArgumentException(sprintf('Unsupported result status: "%s".', $result));
328
    }
329
}
330