Passed
Push — master ( 1069fe...3696ab )
by Hirofumi
10:52
created

ConsumeStoredJobService::execute()   D

Complexity

Conditions 17
Paths 175

Size

Total Lines 84

Duplication

Lines 18
Ratio 21.43 %

Code Coverage

Tests 56
CRAP Score 17.1593

Importance

Changes 0
Metric Value
dl 18
loc 84
ccs 56
cts 61
cp 0.918
rs 4.0224
c 0
b 0
f 0
cc 17
nc 175
nop 3
crap 17.1593

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace Shippinno\Job\Application\Messaging;
4
5
use Closure;
6
use Enqueue\Sqs\SqsMessage;
7
use Interop\Queue\PsrContext;
8
use Interop\Queue\PsrMessage;
9
use Psr\Log\LoggerAwareTrait;
10
use Psr\Log\LoggerInterface;
11
use Psr\Log\NullLogger;
12
use Shippinno\Job\Application\Job\JobRunnerRegistry;
13
use Shippinno\Job\Domain\Model\AbandonedJobMessage;
14
use Shippinno\Job\Domain\Model\AbandonedJobMessageStore;
15
use Shippinno\Job\Domain\Model\JobFailedException;
16
use Shippinno\Job\Domain\Model\JobRunnerNotRegisteredException;
17
use Shippinno\Job\Domain\Model\JobSerializer;
18
use Shippinno\Job\Domain\Model\JobStore;
19
use Shippinno\Job\Domain\Model\StoredJobSerializer;
20
21
class ConsumeStoredJobService
22
{
23
    use LoggerAwareTrait;
24
25
    /**
26
     * @var StoredJobSerializer
27
     */
28
    private $storedJobSerializer;
29
30
    /**
31
     * @var PsrContext
32
     */
33
    private $context;
34
35
    /**
36
     * @var JobSerializer
37
     */
38
    private $jobSerializer;
39
40
    /**
41
     * @var JobRunnerRegistry
42
     */
43
    private $jobRunnerRegistry;
44
45
    /**
46
     * @var JobStore
47
     */
48
    private $jobStore;
49
50
    /**
51
     * @var AbandonedJobMessageStore
52
     */
53
    private $abandonedJobMessageStore;
54
55
    /**
56
     * @param PsrContext $context
57
     * @param StoredJobSerializer $storedJobSerializer
58
     * @param JobSerializer $jobSerializer
59
     * @param JobRunnerRegistry $jobRunnerRegistry
60
     * @param JobStore $jobStore
61
     * @param AbandonedJobMessageStore $abandonedJobMessageStore
62
     * @param LoggerInterface|null $logger
63
     */
64 8
    public function __construct(
65
        PsrContext $context,
66
        StoredJobSerializer $storedJobSerializer,
67
        JobSerializer $jobSerializer,
68
        JobRunnerRegistry $jobRunnerRegistry,
69
        JobStore $jobStore,
70
        AbandonedJobMessageStore $abandonedJobMessageStore,
71
        LoggerInterface $logger = null
72
    ) {
73 8
        $this->context = $context;
74 8
        $this->storedJobSerializer = $storedJobSerializer;
75 8
        $this->jobSerializer = $jobSerializer;
76 8
        $this->jobRunnerRegistry = $jobRunnerRegistry;
77 8
        $this->jobStore = $jobStore;
78 8
        $this->abandonedJobMessageStore = $abandonedJobMessageStore;
79 8
        $this->setLogger(null !== $logger ? $logger : new NullLogger);
80 8
    }
81
82
    /**
83
     * @param string $queueName
84
     * @param Closure|null $persist
85
     */
86 8
    public function execute(string $queueName, Closure $persist = null, Closure $clear = null): void
87
    {
88 8
        $consumer = $this->context->createConsumer($this->context->createQueue($queueName));
89 8
        $message = $consumer->receive(5000);
90 8
        if (null === $message) {
91 1
            return;
92
        }
93 7
        $storedJob = $this->storedJobSerializer->deserialize($message->getBody());
94 7
        $job = $this->jobSerializer->deserialize($storedJob->body(), $storedJob->name());
95
        try {
96 7
            $jobRunner = $this->jobRunnerRegistry->get(get_class($job));
97 1
        } catch (JobRunnerNotRegisteredException $e) {
98 1
            $this->abandonedJobMessageStore->add(
99 1
                new AbandonedJobMessage($queueName, $message->getBody(), $e->__toString())
100
            );
101 1
            $this->logger->alert(
102 1
                'No JobRunner is registered. Message is abandoned. Rejecting the message.',
103 1
                ['message' => $message->getBody()]
104
            );
105 1
            $consumer->reject($message);
106 1
            return;
107
        }
108
        try {
109 6
            $jobRunner->run($job);
110 3 View Code Duplication
            if (!is_null($persist) && !$persist()) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
111 1
                $this->logger->info(
112 1
                    'Persistence failed after the job. Requeueing the message.',
113 1
                    ['message' => $message->getBody()]
114
                );
115 1
                $consumer->reject($message, true);
116 1
                return;
117
            }
118 2
            $dependentJobs = $job->dependentJobs();
119 2
            if (count($dependentJobs) > 0) {
120 1
                foreach ($dependentJobs as $dependentJob) {
121 1
                    $this->jobStore->append($dependentJob);
122
                }
123
            }
124 2
            $this->logger->info('Acknowledging message.', ['message' => $message->getBody()]);
125 2
            $consumer->acknowledge($message);
126 3
        } catch (JobFailedException $e) {
127 3
            if ($job->isExpendable()) {
128 1
                $this->logger->debug(
129 1
                    'Expendable job failed. Letting it go.',
130 1
                    ['message' => $message->getBody()]
131
                );
132 1
                return;
133
            }
134 2
            $attempts = $message->getProperty('attempts', 0) + 1;
135 2
            if ($attempts >= $job->maxAttempts()) {
136 1
                if (!is_null($clear)) {
137
                    $clear();
138
                }
139 1
                $this->abandonedJobMessageStore->add(
140 1
                    new AbandonedJobMessage($queueName, $message->getBody(), $e->__toString())
141
                );
142 1
                $this->logger->info(
143 1
                    'Rejecting the message reaching the max attempts.',
144 1
                    ['message' => $message->getBody()]
145
                );
146 1
                $consumer->reject($message);
147 1 View Code Duplication
                if (!is_null($persist) && !$persist()) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
148
                    $this->logger->info(
149
                        'Failed after the job. Requeueing the message.',
150
                        ['message' => $message->getBody()]
151
                    );
152
                    $consumer->reject($message, true);
153
                }
154 1
                return;
155
            }
156 1
            $message->setProperty('attempts', $attempts);
157 1
            if ($job->reattemptDelay() > 0) {
158 1
                $message = $this->delayMessage($message, $job->reattemptDelay());
159
            }
160 1
            if (method_exists($message, 'setMessageDeduplicationId')) {
161 1
                $message->setMessageDeduplicationId(uniqid());
162
            }
163 1 View Code Duplication
            if (method_exists($message, 'setMessageGroupId')) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
164 1
                $message->setMessageGroupId(is_null($storedJob->fifoGroupId()) ? uniqid() : $storedJob->fifoGroupId());
165
            }
166 1
            $this->logger->info('Requeueing the message.', ['message' => $message->getBody()]);
167 1
            $consumer->reject($message, true);
168
        }
169 3
    }
170
171
    /**
172
     * @param PsrMessage $message
173
     * @param int $delay
174
     * @return PsrMessage
175
     */
176 1
    protected function delayMessage(PsrMessage $message, int $delay)
177
    {
178 1
        if ($message instanceof SqsMessage) {
179 1
            $message->setDelaySeconds($delay);
180
        }
181
182 1
        return $message;
183
    }
184
}
185