Issues (9)

src/Consumer.php (5 issues)

Labels
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Simple\Queue;
6
7
use Throwable;
8
use InvalidArgumentException;
9
use Simple\Queue\Transport\TransportInterface;
10
11
/**
12
 * Class Consumer
13
 *
14
 * TODO: 1 - create job instance
15
 * TODO: 2 - move db operations to other class
16
 * TODO: 3 - dispatch job
17
 * TODO: 4 - deserialize data to job and processor
18
 *
19
 * @package Simple\Queue
20
 */
21
class Consumer
22
{
23
    /** Use to mark results as successful */
24
    public const STATUS_ACK = 'ACK';
25
26
    /** Use one more try if necessary */
27
    public const STATUS_REJECT = 'REJECT';
28
29
    /** Use in case of a delayed queue */
30
    public const STATUS_REQUEUE = 'REQUEUE';
31
32
    /** @var TransportInterface */
33
    protected TransportInterface $transport;
34
35
    /** @var Producer */
36
    protected Producer $producer;
37
38
    /** @var Config|null */
39
    private ?Config $config;
40
41
    /**
42
     * Consumer constructor.
43
     * @param TransportInterface $transport
44
     * @param Producer $producer
45
     * @param Config|null $config
46
     */
47 3
    public function __construct(TransportInterface $transport, Producer $producer, ?Config $config = null)
48
    {
49 3
        $this->transport = $transport;
50 3
        $this->producer = $producer;
51 3
        $this->config = $config ?: Config::getDefault();
52 3
    }
53
54
    /**
55
     * The message has been successfully processed and will be removed from the queue
56
     *
57
     * @param Message $message
58
     */
59 3
    public function acknowledge(Message $message): void
60
    {
61 3
        $this->transport->deleteMessage($message);
62 3
    }
63
64
    /**
65
     * Reject message with requeue option
66
     *
67
     * @param Message $message
68
     * @param bool $requeue
69
     */
70 2
    public function reject(Message $message, bool $requeue = false): void
71
    {
72 2
        $this->acknowledge($message);
73
74 2
        if ($requeue) {
75 1
            $redeliveryMessage = $this->producer->makeRedeliveryMessage($message);
76 1
            $this->producer->send($redeliveryMessage);
77
        }
78 2
    }
79
80
    /**
81
     * TODO: pass the processing message status to $eachCallback
82
     *
83
     * @param array $queues
84
     * @param callable|null $eachCallback
85
     */
86
    public function consume(array $queues = [], ?callable $eachCallback = null): void
87
    {
88
        $this->transport->init();
89
90
        while (true) {
91
            if ($message = $this->transport->fetchMessage($queues)) {
92
                try {
93
                    $this->processing($message);
94
                } catch (Throwable $throwable) {
95
                    try {
96
                        $this->processFailureResult($throwable, $message);
97
                    } catch (\Doctrine\DBAL\Exception $exception) {
98
                        // maybe lucky later
99
                    }
100
                }
101
                $eachCallback && $eachCallback($message, $throwable ?? null);
102
                continue;
103
            }
104
            usleep(200000); // 0.2 second
105
        }
106
    }
107
108
    /**
109
     * @param Message $message
110
     */
111
    protected function processing(Message $message): void
112
    {
113
        $this->transport->changeMessageStatus($message, new Status(Status::IN_PROCESS));
114
115
        if ($message->isJob()) {
116
            try {
117
                $job = $this->config->getJob($message->getEvent());
0 ignored issues
show
The method getJob() does not exist on null. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

117
                /** @scrutinizer ignore-call */ 
118
                $job = $this->config->getJob($message->getEvent());

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
It seems like $message->getEvent() can also be of type null; however, parameter $jobName of Simple\Queue\Config::getJob() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

117
                $job = $this->config->getJob(/** @scrutinizer ignore-type */ $message->getEvent());
Loading history...
118
                $result = $job->handle($this->getContext($message));
119
                $this->processSuccessResult($result, $message);
120
            } catch (Throwable $exception) {
121
                $this->processFailureResult($exception, $message);
122
            }
123
124
            return;
125
        }
126
127
        if ($this->config->hasProcessor($message->getQueue())) {
128
            try {
129
                $result = $this->config->getProcessor($message->getQueue())($this->getContext($message));
130
                $this->processSuccessResult($result, $message);
131
            } catch (Throwable $exception) {
132
                $this->processFailureResult($exception, $message);
133
            }
134
135
            return;
136
        }
137
138
        $this->processUndefinedHandlerResult($message);
139
    }
140
141
    /**
142
     * @param Message $message
143
     */
144
    protected function processUndefinedHandlerResult(Message $message): void
145
    {
146
        MessageHydrator::changeProperty($message, 'status', new Status(Status::UNDEFINED_HANDLER));
147
        MessageHydrator::changeProperty($message, 'error', 'Could not find any job or processor.');
148
149
        $this->reject($message, true);
150
    }
151
152
    /**
153
     * @param Throwable $exception
154
     * @param Message $message
155
     */
156
    protected function processFailureResult(Throwable $exception, Message $message): void
157
    {
158
        $newStatus = Status::REDELIVERED;
159
160
        $numberOfAttemptsBeforeFailure = $this->config->getNumberOfAttemptsBeforeFailure();
161
162
        if ($message->isJob()) {
163
            $job = $this->config->getJob($message->getEvent());
0 ignored issues
show
It seems like $message->getEvent() can also be of type null; however, parameter $jobName of Simple\Queue\Config::getJob() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

163
            $job = $this->config->getJob(/** @scrutinizer ignore-type */ $message->getEvent());
Loading history...
164
            if ($job->attempts()) {
0 ignored issues
show
Are you sure the usage of $job->attempts() targeting Simple\Queue\Job::attempts() seems to always return null.

This check looks for function or method calls that always return null and whose return value is used.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
if ($a->getObject()) {

The method getObject() can return nothing but null, so it makes no sense to use the return value.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
165
                $numberOfAttemptsBeforeFailure = $job->attempts();
0 ignored issues
show
Are you sure the assignment to $numberOfAttemptsBeforeFailure is correct as $job->attempts() targeting Simple\Queue\Job::attempts() seems to always return null.

This check looks for function or method calls that always return null and whose return value is assigned to a variable.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
$object = $a->getObject();

The method getObject() can return nothing but null, so it makes no sense to assign that value to a variable.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
166
            }
167
        }
168
169
        if (($message->getAttempts() + 1) >= $numberOfAttemptsBeforeFailure) {
170
            $newStatus = Status::FAILURE;
171
        }
172
173
        MessageHydrator::changeProperty($message, 'status', new Status($newStatus));
174
        MessageHydrator::changeProperty($message, 'error', (string)$exception);
175
176
        $this->reject($message, true);
177
    }
178
179
    /**
180
     * @param string $status
181
     * @param Message $message
182
     */
183
    protected function processSuccessResult(string $status, Message $message): void
184
    {
185
        if ($status === self::STATUS_ACK) {
186
            $this->acknowledge($message);
187
188
            return;
189
        }
190
191
        if ($status === self::STATUS_REJECT) {
192
            $this->reject($message);
193
194
            return;
195
        }
196
197
        if ($status === self::STATUS_REQUEUE) {
198
            $this->reject($message, true);
199
200
            return;
201
        }
202
203
        throw new InvalidArgumentException(sprintf('Unsupported result status: "%s".', $status));
204
    }
205
206
    /**
207
     * @param Message $message
208
     * @return Context
209
     */
210
    protected function getContext(Message $message): Context
211
    {
212
        $data = $this->config->getSerializer()->deserialize($message->getBody());
213
214
        return new Context(
215
            $this->producer,
216
            $message,
217
            is_array($data) ? $data : [$data]
218
        );
219
    }
220
}
221