Passed
Pull Request — master (#20)
by Anatolyi
02:29
created

Consumer::consume()   A

Complexity

Conditions 6
Paths 8

Size

Total Lines 19
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 42

Importance

Changes 3
Bugs 1 Features 0
Metric Value
cc 6
eloc 12
nc 8
nop 2
dl 0
loc 19
ccs 0
cts 11
cp 0
crap 42
rs 9.2222
c 3
b 1
f 0
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Simple\Queue;
6
7
use Throwable;
8
use InvalidArgumentException;
9
use Simple\Queue\Store\StoreInterface;
10
11
/**
12
 * Class Consumer
13
 *
14
 * TODO: 1 - create job instance
15
 * TODO: 2 - move db operations to other class
16
 * TODO: 3 - dispatch job
17
 * TODO: 4 - deserialize data to job and processor
18
 *
19
 * @package Simple\Queue
20
 */
21
class Consumer
22
{
23
    /** Use to mark results as successful */
24
    public const STATUS_ACK = 'ACK';
25
26
    /** Use one more try if necessary */
27
    public const STATUS_REJECT = 'REJECT';
28
29
    /** Use in case of a delayed queue */
30
    public const STATUS_REQUEUE = 'REQUEUE';
31
32
    /** @var StoreInterface */
33
    protected StoreInterface $store;
34
35
    /** @var Producer */
36
    protected Producer $producer;
37
38
    /** @var Config|null */
39
    private ?Config $config;
40
41
    /**
42
     * Consumer constructor.
43
     * @param StoreInterface $store
44
     * @param Producer $producer
45
     * @param Config|null $config
46
     */
47 3
    public function __construct(StoreInterface $store, Producer $producer, ?Config $config = null)
48
    {
49 3
        $this->store = $store;
50 3
        $this->producer = $producer;
51 3
        $this->config = $config ?: Config::getDefault();
52 3
    }
53
54
    /**
55
     * The message has been successfully processed and will be removed from the queue
56
     *
57
     * @param Message $message
58
     */
59 3
    public function acknowledge(Message $message): void
60
    {
61 3
        $this->store->deleteMessage($message);
62 3
    }
63
64
    /**
65
     * Reject message with requeue option
66
     *
67
     * @param Message $message
68
     * @param bool $requeue
69
     */
70 2
    public function reject(Message $message, bool $requeue = false): void
71
    {
72 2
        $this->acknowledge($message);
73
74 2
        if ($requeue) {
75 1
            $redeliveryMessage = $this->producer->makeRedeliveryMessage($message);
76 1
            $this->producer->send($redeliveryMessage);
77
        }
78 2
    }
79
80
    /**
81
     * TODO: pass the processing message status to $eachCallback
82
     *
83
     * @param array $queues
84
     * @param callable|null $eachCallback
85
     */
86
    public function consume(array $queues = [], ?callable $eachCallback = null): void
87
    {
88
        $this->store->init();
89
90
        while (true) {
91
            if ($message = $this->store->fetchMessage($queues)) {
92
                try {
93
                    $this->processing($message);
94
                } catch (Throwable $throwable) {
95
                    try {
96
                        $this->processFailureResult($throwable, $message);
97
                    } catch (\Doctrine\DBAL\Exception $exception) {
98
                        // maybe lucky later
99
                    }
100
                }
101
                $eachCallback && $eachCallback($message, $throwable ?? null);
102
                continue;
103
            }
104
            usleep(200000); // 0.2 second
105
        }
106
    }
107
108
    /**
109
     * @param Message $message
110
     */
111
    protected function processing(Message $message): void
112
    {
113
        $this->store->changeMessageStatus($message, new Status(Status::IN_PROCESS));
114
115
        if ($message->isJob()) {
116
            try {
117
                $job = $this->config->getJob($message->getEvent());
0 ignored issues
show
Bug introduced by
The method getJob() does not exist on null. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

117
                /** @scrutinizer ignore-call */ 
118
                $job = $this->config->getJob($message->getEvent());

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
Bug introduced by
It seems like $message->getEvent() can also be of type null; however, parameter $jobName of Simple\Queue\Config::getJob() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

117
                $job = $this->config->getJob(/** @scrutinizer ignore-type */ $message->getEvent());
Loading history...
118
119
                $result = $job->handle($message, $this->producer);
120
121
                $this->processSuccessResult($result, $message);
122
            } catch (Throwable $exception) {
123
                $this->processFailureResult($exception, $message);
124
            }
125
126
            return;
127
        }
128
129
        if ($this->config->hasProcessor($message->getQueue())) {
130
            try {
131
                $result = $this->config->getProcessor($message->getQueue())($message, $this->producer);
132
133
                $this->processSuccessResult($result, $message);
134
            } catch (Throwable $exception) {
135
                $this->processFailureResult($exception, $message);
136
            }
137
138
            return;
139
        }
140
141
        $this->processUndefinedHandlerResult($message);
142
    }
143
144
    /**
145
     * @param Message $message
146
     */
147
    protected function processUndefinedHandlerResult(Message $message): void
148
    {
149
        MessageHydrator::changeProperty($message, 'status', new Status(Status::UNDEFINED_HANDLER));
150
        MessageHydrator::changeProperty($message, 'error', 'Could not find any job or processor.');
151
152
        $this->reject($message, true);
153
    }
154
155
    /**
156
     * @param Throwable $exception
157
     * @param Message $message
158
     */
159
    protected function processFailureResult(Throwable $exception, Message $message): void
160
    {
161
        $newStatus = Status::REDELIVERED;
162
163
        $numberOfAttemptsBeforeFailure = $this->config->getNumberOfAttemptsBeforeFailure();
164
165
        if ($message->isJob()) {
166
            $job = $this->config->getJob($message->getEvent());
0 ignored issues
show
Bug introduced by
It seems like $message->getEvent() can also be of type null; however, parameter $jobName of Simple\Queue\Config::getJob() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

166
            $job = $this->config->getJob(/** @scrutinizer ignore-type */ $message->getEvent());
Loading history...
167
            if ($job->attempts()) {
0 ignored issues
show
Bug introduced by
Are you sure the usage of $job->attempts() targeting Simple\Queue\Job::attempts() seems to always return null.

This check looks for function or method calls that always return null and whose return value is used.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
if ($a->getObject()) {

The method getObject() can return nothing but null, so it makes no sense to use the return value.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
168
                $numberOfAttemptsBeforeFailure = $job->attempts();
0 ignored issues
show
Bug introduced by
Are you sure the assignment to $numberOfAttemptsBeforeFailure is correct as $job->attempts() targeting Simple\Queue\Job::attempts() seems to always return null.

This check looks for function or method calls that always return null and whose return value is assigned to a variable.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
$object = $a->getObject();

The method getObject() can return nothing but null, so it makes no sense to assign that value to a variable.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
169
            }
170
        }
171
172
        if (($message->getAttempts() + 1) >= $numberOfAttemptsBeforeFailure) {
173
            $newStatus = Status::FAILURE;
174
        }
175
176
        MessageHydrator::changeProperty($message, 'status', new Status($newStatus));
177
        MessageHydrator::changeProperty($message, 'error', (string)$exception);
178
179
        $this->reject($message, true);
180
    }
181
182
    /**
183
     * @param string $status
184
     * @param Message $message
185
     */
186
    protected function processSuccessResult(string $status, Message $message): void
187
    {
188
        if ($status === self::STATUS_ACK) {
189
            $this->acknowledge($message);
190
191
            return;
192
        }
193
194
        if ($status === self::STATUS_REJECT) {
195
            $this->reject($message);
196
197
            return;
198
        }
199
200
        if ($status === self::STATUS_REQUEUE) {
201
            $this->reject($message, true);
202
203
            return;
204
        }
205
206
        throw new InvalidArgumentException(sprintf('Unsupported result status: "%s".', $status));
207
    }
208
}
209