Test Failed
Push — master ( 6731a8...287bb0 )
by Hirofumi
08:50
created

ConsumeStoredJobService::execute()   F

Complexity

Conditions 17
Paths 227

Size

Total Lines 92

Duplication

Lines 17
Ratio 18.48 %

Code Coverage

Tests 5
CRAP Score 247.5895

Importance

Changes 0
Metric Value
dl 17
loc 92
ccs 5
cts 69
cp 0.0725
rs 3.4112
c 0
b 0
f 0
cc 17
nc 227
nop 3
crap 247.5895

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace Shippinno\Job\Application\Messaging;
4
5
use Closure;
6
use Enqueue\Sqs\SqsMessage;
7
use Interop\Queue\PsrContext;
8
use Interop\Queue\PsrMessage;
9
use Psr\Log\LoggerAwareTrait;
10
use Psr\Log\LoggerInterface;
11
use Psr\Log\NullLogger;
12
use Shippinno\Job\Application\Job\JobRunnerRegistry;
13
use Shippinno\Job\Domain\Model\AbandonedJobMessage;
14
use Shippinno\Job\Domain\Model\AbandonedJobMessageStore;
15
use Shippinno\Job\Domain\Model\JobFailedException;
16
use Shippinno\Job\Domain\Model\JobRunnerNotRegisteredException;
17
use Shippinno\Job\Domain\Model\JobSerializer;
18
use Shippinno\Job\Domain\Model\JobStore;
19
use Shippinno\Job\Domain\Model\StoredJobSerializer;
20
21
class ConsumeStoredJobService
22
{
23
    use LoggerAwareTrait;
24
25
    /**
26
     * @var StoredJobSerializer
27
     */
28
    private $storedJobSerializer;
29
30
    /**
31
     * @var PsrContext
32
     */
33
    private $context;
34
35
    /**
36
     * @var JobSerializer
37
     */
38
    private $jobSerializer;
39
40
    /**
41
     * @var JobRunnerRegistry
42
     */
43
    private $jobRunnerRegistry;
44
45
    /**
46
     * @var JobStore
47
     */
48
    private $jobStore;
49
50
    /**
51
     * @var AbandonedJobMessageStore
52
     */
53
    private $abandonedJobMessageStore;
54
55
    /**
56
     * @var JobFlightManager
57
     */
58
    private $jobFlightManager;
59
60
    /**
61
     * @param PsrContext $context
62
     * @param StoredJobSerializer $storedJobSerializer
63
     * @param JobSerializer $jobSerializer
64
     * @param JobRunnerRegistry $jobRunnerRegistry
65
     * @param JobStore $jobStore
66
     * @param AbandonedJobMessageStore $abandonedJobMessageStore
67
     * @param JobFlightManager|null $jobFlightManager
68
     * @param LoggerInterface|null $logger
69
     */
70 1
    public function __construct(
71
        PsrContext $context,
72
        StoredJobSerializer $storedJobSerializer,
73
        JobSerializer $jobSerializer,
74
        JobRunnerRegistry $jobRunnerRegistry,
75
        JobStore $jobStore,
76
        AbandonedJobMessageStore $abandonedJobMessageStore,
77
        JobFlightManager $jobFlightManager = null,
78
        LoggerInterface $logger = null
79
    ) {
80 1
        $this->context = $context;
81 1
        $this->storedJobSerializer = $storedJobSerializer;
82 1
        $this->jobSerializer = $jobSerializer;
83 1
        $this->jobRunnerRegistry = $jobRunnerRegistry;
84 1
        $this->jobStore = $jobStore;
85 1
        $this->abandonedJobMessageStore = $abandonedJobMessageStore;
86 1
        $this->jobFlightManager = $jobFlightManager ?: new NullJobFlightManager;
87 1
        $this->setLogger(null !== $logger ?: new NullLogger);
0 ignored issues
show
Bug introduced by
It seems like null !== $logger ?: new \Psr\Log\NullLogger() can also be of type boolean; however, Psr\Log\LoggerAwareTrait::setLogger() does only seem to accept object<Psr\Log\LoggerInterface>, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
88 1
    }
89
90
    /**
91
     * @param string $queueName
92
     * @param Closure|null $persist
93
     */
94 1
    public function execute(string $queueName, Closure $persist = null, Closure $clear = null): void
95
    {
96 1
        $consumer = $this->context->createConsumer($this->context->createQueue($queueName));
97 1
        $message = $consumer->receive(5000);
98 1
        if (null === $message) {
99 1
            return;
100
        }
101
        $storedJob = $this->storedJobSerializer->deserialize($message->getBody());
102
        $job = $this->jobSerializer->deserialize($storedJob->body(), $storedJob->name());
103
        try {
104
            $jobRunner = $this->jobRunnerRegistry->get(get_class($job));
105
        } catch (JobRunnerNotRegisteredException $e) {
106
            $this->abandonedJobMessageStore->add(
107
                new AbandonedJobMessage($queueName, $message->getBody(), $e->__toString())
108
            );
109
            $this->logger->alert(
110
                'No JobRunner is registered. Message is abandoned. Rejecting the message.',
111
                ['message' => $message->getBody()]
112
            );
113
            $this->jobFlightManager->rejected($message->getMessageId());
114
            $consumer->reject($message);
115
            return;
116
        }
117
        try {
118
            $jobRunner->run($job);
119 View Code Duplication
            if (!is_null($persist) && !$persist()) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
120
                $this->logger->info(
121
                    'Persistence failed after the job. Requeueing the message.',
122
                    ['message' => $message->getBody()]
123
                );
124
                $this->jobFlightManager->requeued($message->getMessageId());
125
                $consumer->reject($message, true);
126
                return;
127
            }
128
            $dependentJobs = $job->dependentJobs();
129
            if (count($dependentJobs) > 0) {
130
                foreach ($dependentJobs as $dependentJob) {
131
                    $this->jobStore->append($dependentJob);
132
                }
133
            }
134
            $this->logger->info('Acknowledging message.', ['message' => $message->getBody()]);
135
            $this->jobFlightManager->acknowledged($message->getMessageId());
136
            $consumer->acknowledge($message);
137
        } catch (JobFailedException $e) {
138
            if ($job->isExpendable()) {
139
                $this->logger->debug(
140
                    'Expendable job failed. Acknowledging and letting it go.',
141
                    ['message' => $message->getBody()]
142
                );
143
                $this->jobFlightManager->letGo($message->getMessageId());
144
                $consumer->acknowledge($message);
145
                return;
146
            }
147
            $attempts = $message->getProperty('attempts', 0) + 1;
148
            if ($attempts >= $job->maxAttempts()) {
149
                if (!is_null($clear)) {
150
                    $clear();
151
                }
152
                $this->abandonedJobMessageStore->add(
153
                    new AbandonedJobMessage($queueName, $message->getBody(), $e->__toString())
154
                );
155
                $this->logger->info(
156
                    'Rejecting the message reaching the max attempts.',
157
                    ['message' => $message->getBody()]
158
                );
159
                $this->jobFlightManager->rejected($message->getMessageId());
160
                $consumer->reject($message);
161 View Code Duplication
                if (!is_null($persist) && !$persist()) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
162
                    $this->logger->info(
163
                        'Failed after the job. Requeueing the message.',
164
                        ['message' => $message->getBody()]
165
                    );
166
                    $this->jobFlightManager->requeued($message->getMessageId());
167
                    $consumer->reject($message, true);
168
                }
169
                return;
170
            }
171
            $message->setProperty('attempts', $attempts);
172
            if ($job->reattemptDelay() > 0) {
173
                $message = $this->delayMessage($message, $job->reattemptDelay());
174
            }
175
            if (method_exists($message, 'setMessageDeduplicationId')) {
176
                $message->setMessageDeduplicationId(uniqid());
177
            }
178
            if (method_exists($message, 'setMessageGroupId')) {
179
                $message->setMessageGroupId(is_null($storedJob->fifoGroupId()) ? uniqid() : $storedJob->fifoGroupId());
180
            }
181
            $this->logger->info('Requeueing the message.', ['message' => $message->getBody()]);
182
            $this->jobFlightManager->requeued($message->getMessageId());
183
            $consumer->reject($message, true);
184
        }
185
    }
186
187
    /**
188
     * @param PsrMessage $message
189
     * @param int $delay
190
     * @return PsrMessage
191
     */
192
    protected function delayMessage(PsrMessage $message, int $delay)
193
    {
194
        if ($message instanceof SqsMessage) {
195
            $message->setDelaySeconds($delay);
196
        }
197
198
        return $message;
199
    }
200
}
201