Test Failed
Push — master ( 575e5f...bb3910 )
by Hirofumi
02:01
created

ConsumeStoredJobService::execute()   F

Complexity

Conditions 17
Paths 253

Size

Total Lines 98

Duplication

Lines 21
Ratio 21.43 %

Code Coverage

Tests 35
CRAP Score 60.8341

Importance

Changes 0
Metric Value
dl 21
loc 98
ccs 35
cts 75
cp 0.4667
rs 3.0203
c 0
b 0
f 0
cc 17
nc 253
nop 3
crap 60.8341

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace Shippinno\Job\Application\Messaging;
4
5
use Closure;
6
use Enqueue\Sqs\SqsMessage;
7
use Interop\Queue\PsrContext;
8
use Interop\Queue\PsrMessage;
9
use Psr\Log\LoggerAwareTrait;
10
use Psr\Log\LoggerInterface;
11
use Psr\Log\NullLogger;
12
use Shippinno\Job\Application\Job\JobRunnerRegistry;
13
use Shippinno\Job\Domain\Model\AbandonedJobMessage;
14
use Shippinno\Job\Domain\Model\AbandonedJobMessageStore;
15
use Shippinno\Job\Domain\Model\JobFailedException;
16
use Shippinno\Job\Domain\Model\JobRunnerNotRegisteredException;
17
use Shippinno\Job\Domain\Model\JobSerializer;
18
use Shippinno\Job\Domain\Model\JobStore;
19
use Shippinno\Job\Domain\Model\StoredJobSerializer;
20
21
class ConsumeStoredJobService
22
{
23
    use LoggerAwareTrait;
24
25
    /**
26
     * @var StoredJobSerializer
27
     */
28
    private $storedJobSerializer;
29
30
    /**
31
     * @var PsrContext
32
     */
33
    private $context;
34
35
    /**
36
     * @var JobSerializer
37
     */
38
    private $jobSerializer;
39
40
    /**
41
     * @var JobRunnerRegistry
42
     */
43
    private $jobRunnerRegistry;
44
45
    /**
46
     * @var JobStore
47
     */
48
    private $jobStore;
49
50
    /**
51
     * @var AbandonedJobMessageStore
52
     */
53
    private $abandonedJobMessageStore;
54
55
    /**
56
     * @var JobFlightManager
57
     */
58
    private $jobFlightManager;
59
60
    /**
61
     * @param PsrContext $context
62
     * @param StoredJobSerializer $storedJobSerializer
63
     * @param JobSerializer $jobSerializer
64
     * @param JobRunnerRegistry $jobRunnerRegistry
65
     * @param JobStore $jobStore
66
     * @param AbandonedJobMessageStore $abandonedJobMessageStore
67
     * @param JobFlightManager|null $jobFlightManager
68
     * @param LoggerInterface|null $logger
69
     */
70 3
    public function __construct(
71
        PsrContext $context,
72
        StoredJobSerializer $storedJobSerializer,
73
        JobSerializer $jobSerializer,
74
        JobRunnerRegistry $jobRunnerRegistry,
75
        JobStore $jobStore,
76
        AbandonedJobMessageStore $abandonedJobMessageStore,
77
        JobFlightManager $jobFlightManager = null,
78
        LoggerInterface $logger = null
79
    ) {
80 3
        $this->context = $context;
81 3
        $this->storedJobSerializer = $storedJobSerializer;
82 3
        $this->jobSerializer = $jobSerializer;
83 3
        $this->jobRunnerRegistry = $jobRunnerRegistry;
84 3
        $this->jobStore = $jobStore;
85 3
        $this->abandonedJobMessageStore = $abandonedJobMessageStore;
86 3
        $this->jobFlightManager = $jobFlightManager ?: new NullJobFlightManager;
87 3
        $this->setLogger($logger ?: new NullLogger);
88 3
    }
89
90
    /**
91
     * @param string $queueName
92
     * @param Closure|null $persist
93
     */
94 3
    public function execute(string $queueName, Closure $persist = null, Closure $clear = null): void
95
    {
96 3
        $consumer = $this->context->createConsumer($this->context->createQueue($queueName));
97 3
        $message = $consumer->receive(5000);
98 3
        if (null === $message) {
99 1
            return;
100
        }
101 2
        $storedJob = $this->storedJobSerializer->deserialize($message->getBody());
102 2
        $job = $this->jobSerializer->deserialize($storedJob->body(), $storedJob->name());
103
        try {
104 2
            $jobRunner = $this->jobRunnerRegistry->get(get_class($job));
105
        } catch (JobRunnerNotRegisteredException $e) {
106
            $this->abandonedJobMessageStore->add(
107
                new AbandonedJobMessage($queueName, $message->getBody(), $e->__toString())
108
            );
109
            $this->logger->alert(
110
                'No JobRunner is registered. Message is abandoned. Rejecting the message.',
111
                ['message' => $message->getBody()]
112
            );
113
            $this->jobFlightManager->rejected($message->getMessageId());
114
            $consumer->reject($message);
115
            return;
116
        }
117
        try {
118 2
            $jobRunner->run($job);
119 1 View Code Duplication
            if (!is_null($persist) && !$persist()) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
120 1
                $this->logger->info(
121 1
                    'Persistence failed after the job. Requeueing the message.',
122 1
                    ['message' => $message->getBody()]
123
                );
124 1
                $newMessageId = $message->getMessageId() . '+';
125 1
                $this->jobFlightManager->requeued($message->getMessageId(), $newMessageId);
126 1
                $message->setMessageId($newMessageId);
127 1
                $consumer->reject($message, true);
128 1
                return;
129
            }
130
            $dependentJobs = $job->dependentJobs();
131
            if (count($dependentJobs) > 0) {
132
                foreach ($dependentJobs as $dependentJob) {
133
                    $this->jobStore->append($dependentJob);
134
                }
135
            }
136
            $this->logger->info('Acknowledging message.', ['message' => $message->getBody()]);
137
            $this->jobFlightManager->acknowledged($message->getMessageId());
138
            $consumer->acknowledge($message);
139 1
        } catch (JobFailedException $e) {
140 1
            if ($job->isExpendable()) {
141
                $this->logger->debug(
142
                    'Expendable job failed. Acknowledging and letting it go.',
143
                    ['message' => $message->getBody()]
144
                );
145
                $this->jobFlightManager->letGo($message->getMessageId());
146
                $consumer->acknowledge($message);
147
                return;
148
            }
149 1
            $attempts = $message->getProperty('attempts', 0) + 1;
150 1
            if ($attempts >= $job->maxAttempts()) {
151
                if (!is_null($clear)) {
152
                    $clear();
153
                }
154
                $this->abandonedJobMessageStore->add(
155
                    new AbandonedJobMessage($queueName, $message->getBody(), $e->__toString())
156
                );
157
                $this->logger->info(
158
                    'Rejecting the message reaching the max attempts.',
159
                    ['message' => $message->getBody()]
160
                );
161
                $this->jobFlightManager->rejected($message->getMessageId());
162
                $consumer->reject($message);
163 View Code Duplication
                if (!is_null($persist) && !$persist()) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
164
                    $this->logger->info(
165
                        'Failed after the job. Requeueing the message.',
166
                        ['message' => $message->getBody()]
167
                    );
168
                    $newMessageId = $message->getMessageId() . '+';
169
                    $this->jobFlightManager->requeued($message->getMessageId(), $newMessageId);
170
                    $message->setMessageId($newMessageId);
171
                    $consumer->reject($message, true);
172
                }
173
                return;
174
            }
175 1
            $message->setProperty('attempts', $attempts);
176 1
            if ($job->reattemptDelay() > 0) {
177 1
                $message = $this->delayMessage($message, $job->reattemptDelay());
178
            }
179 1
            if (method_exists($message, 'setMessageDeduplicationId')) {
180 1
                $message->setMessageDeduplicationId(uniqid());
181
            }
182 1
            if (method_exists($message, 'setMessageGroupId')) {
183 1
                $message->setMessageGroupId(is_null($storedJob->fifoGroupId()) ? uniqid() : $storedJob->fifoGroupId());
184
            }
185 1
            $this->logger->info('Requeueing the message.', ['message' => $message->getBody()]);
186 1
            $newMessageId = $message->getMessageId() . '+';
187 1
            $this->jobFlightManager->requeued($message->getMessageId(), $newMessageId);
188 1
            $message->setMessageId($newMessageId);
189 1
            $consumer->reject($message, true);
190
        }
191 1
    }
192
193
    /**
194
     * @param PsrMessage $message
195
     * @param int $delay
196
     * @return PsrMessage
197
     */
198 1
    protected function delayMessage(PsrMessage $message, int $delay)
199
    {
200 1
        if ($message instanceof SqsMessage) {
201 1
            $message->setDelaySeconds($delay);
202
        }
203
204 1
        return $message;
205
    }
206
}
207