Test Failed
Push — master ( 7d30ec...97ddd4 )
by Hirofumi
02:25
created

ConsumeStoredJobService::execute()   F

Complexity

Conditions 18
Paths 505

Size

Total Lines 103

Duplication

Lines 21
Ratio 20.39 %

Code Coverage

Tests 5
CRAP Score 284.2849

Importance

Changes 0
Metric Value
dl 21
loc 103
ccs 5
cts 79
cp 0.0633
rs 1.11
c 0
b 0
f 0
cc 18
nc 505
nop 3
crap 284.2849

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace Shippinno\Job\Application\Messaging;
4
5
use Closure;
6
use Enqueue\Sqs\SqsMessage;
7
use Interop\Queue\PsrContext;
8
use Interop\Queue\PsrMessage;
9
use Psr\Log\LoggerAwareTrait;
10
use Psr\Log\LoggerInterface;
11
use Psr\Log\NullLogger;
12
use Shippinno\Job\Application\Job\JobRunnerRegistry;
13
use Shippinno\Job\Domain\Model\AbandonedJobMessage;
14
use Shippinno\Job\Domain\Model\AbandonedJobMessageStore;
15
use Shippinno\Job\Domain\Model\JobFailedException;
16
use Shippinno\Job\Domain\Model\JobRunnerNotRegisteredException;
17
use Shippinno\Job\Domain\Model\JobSerializer;
18
use Shippinno\Job\Domain\Model\JobStore;
19
use Shippinno\Job\Domain\Model\StoredJobSerializer;
20
21
class ConsumeStoredJobService
22
{
23
    use LoggerAwareTrait;
24
25
    /**
26
     * @var StoredJobSerializer
27
     */
28
    private $storedJobSerializer;
29
30
    /**
31
     * @var PsrContext
32
     */
33
    private $context;
34
35
    /**
36
     * @var JobSerializer
37
     */
38
    private $jobSerializer;
39
40
    /**
41
     * @var JobRunnerRegistry
42
     */
43
    private $jobRunnerRegistry;
44
45
    /**
46
     * @var JobStore
47
     */
48
    private $jobStore;
49
50
    /**
51
     * @var AbandonedJobMessageStore
52
     */
53
    private $abandonedJobMessageStore;
54
55
    /**
56
     * @var JobFlightManager
57
     */
58
    private $jobFlightManager;
59
60
    /**
61
     * @param PsrContext $context
62
     * @param StoredJobSerializer $storedJobSerializer
63
     * @param JobSerializer $jobSerializer
64
     * @param JobRunnerRegistry $jobRunnerRegistry
65
     * @param JobStore $jobStore
66
     * @param AbandonedJobMessageStore $abandonedJobMessageStore
67
     * @param JobFlightManager|null $jobFlightManager
68
     * @param LoggerInterface|null $logger
69
     */
70 1
    public function __construct(
71
        PsrContext $context,
72
        StoredJobSerializer $storedJobSerializer,
73
        JobSerializer $jobSerializer,
74
        JobRunnerRegistry $jobRunnerRegistry,
75
        JobStore $jobStore,
76
        AbandonedJobMessageStore $abandonedJobMessageStore,
77
        JobFlightManager $jobFlightManager = null,
78
        LoggerInterface $logger = null
79
    ) {
80 1
        $this->context = $context;
81 1
        $this->storedJobSerializer = $storedJobSerializer;
82 1
        $this->jobSerializer = $jobSerializer;
83 1
        $this->jobRunnerRegistry = $jobRunnerRegistry;
84 1
        $this->jobStore = $jobStore;
85 1
        $this->abandonedJobMessageStore = $abandonedJobMessageStore;
86 1
        $this->jobFlightManager = $jobFlightManager ?: new NullJobFlightManager;
87 1
        $this->setLogger($logger ?: new NullLogger);
88 1
    }
89
90
    /**
91
     * @param string $queueName
92
     * @param Closure|null $persist
93
     */
94 1
    public function execute(string $queueName, Closure $persist = null, Closure $clear = null): void
95
    {
96 1
        $consumer = $this->context->createConsumer($this->context->createQueue($queueName));
97 1
        $message = $consumer->receive(5000);
98 1
        if (null === $message) {
99 1
            return;
100
        }
101
        $storedJob = $this->storedJobSerializer->deserialize($message->getBody());
102
        if (null === $message->getMessageId()) {
103
            $this->logger->alert('Message without ID. Filling it.', ['message' => $message->getBody()]);
104
            $message->setMessageId($storedJob->id());
105
        }
106
        $this->jobFlightManager->arrived($message->getMessageId());
107
        $job = $this->jobSerializer->deserialize($storedJob->body(), $storedJob->name());
108
        try {
109
            $jobRunner = $this->jobRunnerRegistry->get(get_class($job));
110
        } catch (JobRunnerNotRegisteredException $e) {
111
            $this->abandonedJobMessageStore->add(
112
                new AbandonedJobMessage($queueName, $message->getBody(), $e->__toString())
113
            );
114
            $this->logger->alert(
115
                'No JobRunner is registered. Message is abandoned. Rejecting the message.',
116
                ['message' => $message->getBody()]
117
            );
118
            $this->jobFlightManager->rejected($message->getMessageId());
119
            $consumer->reject($message);
120
            return;
121
        }
122
        try {
123
            $jobRunner->run($job);
124 View Code Duplication
            if (!is_null($persist) && !$persist()) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
125
                $this->logger->info(
126
                    'Persistence failed after the job. Requeueing the message.',
127
                    ['message' => $message->getBody()]
128
                );
129
                $newMessageId = $message->getMessageId() . '+';
130
                $this->jobFlightManager->requeued($message->getMessageId(), $newMessageId);
131
                $message->setMessageId($newMessageId);
132
                $consumer->reject($message, true);
133
                return;
134
            }
135
            $dependentJobs = $job->dependentJobs();
136
            if (count($dependentJobs) > 0) {
137
                foreach ($dependentJobs as $dependentJob) {
138
                    $this->jobStore->append($dependentJob);
139
                }
140
            }
141
            $this->logger->info('Acknowledging message.', ['message' => $message->getBody()]);
142
            $this->jobFlightManager->acknowledged($message->getMessageId());
143
            $consumer->acknowledge($message);
144
        } catch (JobFailedException $e) {
145
            if ($job->isExpendable()) {
146
                $this->logger->debug(
147
                    'Expendable job failed. Acknowledging and letting it go.',
148
                    ['message' => $message->getBody()]
149
                );
150
                $this->jobFlightManager->letGo($message->getMessageId());
151
                $consumer->acknowledge($message);
152
                return;
153
            }
154
            $attempts = $message->getProperty('attempts', 0) + 1;
155
            if ($attempts >= $job->maxAttempts()) {
156
                if (!is_null($clear)) {
157
                    $clear();
158
                }
159
                $this->abandonedJobMessageStore->add(
160
                    new AbandonedJobMessage($queueName, $message->getBody(), $e->__toString())
161
                );
162
                $this->logger->info(
163
                    'Rejecting the message reaching the max attempts.',
164
                    ['message' => $message->getBody()]
165
                );
166
                $this->jobFlightManager->rejected($message->getMessageId());
167
                $consumer->reject($message);
168 View Code Duplication
                if (!is_null($persist) && !$persist()) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
169
                    $this->logger->info(
170
                        'Failed after the job. Requeueing the message.',
171
                        ['message' => $message->getBody()]
172
                    );
173
                    $newMessageId = $message->getMessageId() . '+';
174
                    $this->jobFlightManager->requeued($message->getMessageId(), $newMessageId);
175
                    $message->setMessageId($newMessageId);
176
                    $consumer->reject($message, true);
177
                }
178
                return;
179
            }
180
            $message->setProperty('attempts', $attempts);
181
            if ($job->reattemptDelay() > 0) {
182
                $message = $this->delayMessage($message, $job->reattemptDelay());
183
            }
184
            if (method_exists($message, 'setMessageDeduplicationId')) {
185
                $message->setMessageDeduplicationId(uniqid());
186
            }
187
            if (method_exists($message, 'setMessageGroupId')) {
188
                $message->setMessageGroupId(is_null($storedJob->fifoGroupId()) ? uniqid() : $storedJob->fifoGroupId());
189
            }
190
            $this->logger->info('Requeueing the message.', ['message' => $message->getBody()]);
191
            $newMessageId = $message->getMessageId() . '+';
192
            $this->jobFlightManager->requeued($message->getMessageId(), $newMessageId);
193
            $message->setMessageId($newMessageId);
194
            $consumer->reject($message, true);
195
        }
196
    }
197
198
    /**
199
     * @param PsrMessage $message
200
     * @param int $delay
201
     * @return PsrMessage
202
     */
203
    protected function delayMessage(PsrMessage $message, int $delay)
204
    {
205
        if ($message instanceof SqsMessage) {
206
            $message->setDelaySeconds($delay);
207
        }
208
209
        return $message;
210
    }
211
}
212