Test Failed
Push — master ( bb3910...ad28d8 )
by Hirofumi
02:00
created

ConsumeStoredJobService   A

Complexity

Total Complexity 23

Size/Duplication

Total Lines 190
Duplicated Lines 11.05 %

Coupling/Cohesion

Components 1
Dependencies 19

Test Coverage

Coverage 54.35%

Importance

Changes 0
Metric Value
wmc 23
lcom 1
cbo 19
dl 21
loc 190
ccs 50
cts 92
cp 0.5435
rs 10
c 0
b 0
f 0

3 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 19 3
F execute() 21 102 18
A delayMessage() 0 8 2

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
<?php
2
3
namespace Shippinno\Job\Application\Messaging;
4
5
use Closure;
6
use Enqueue\Sqs\SqsMessage;
7
use Interop\Queue\PsrContext;
8
use Interop\Queue\PsrMessage;
9
use Psr\Log\LoggerAwareTrait;
10
use Psr\Log\LoggerInterface;
11
use Psr\Log\NullLogger;
12
use Shippinno\Job\Application\Job\JobRunnerRegistry;
13
use Shippinno\Job\Domain\Model\AbandonedJobMessage;
14
use Shippinno\Job\Domain\Model\AbandonedJobMessageStore;
15
use Shippinno\Job\Domain\Model\JobFailedException;
16
use Shippinno\Job\Domain\Model\JobRunnerNotRegisteredException;
17
use Shippinno\Job\Domain\Model\JobSerializer;
18
use Shippinno\Job\Domain\Model\JobStore;
19
use Shippinno\Job\Domain\Model\StoredJobSerializer;
20
21
class ConsumeStoredJobService
22
{
23
    use LoggerAwareTrait;
24
25
    /**
26
     * @var StoredJobSerializer
27
     */
28
    private $storedJobSerializer;
29
30
    /**
31
     * @var PsrContext
32
     */
33
    private $context;
34
35
    /**
36
     * @var JobSerializer
37
     */
38
    private $jobSerializer;
39
40
    /**
41
     * @var JobRunnerRegistry
42
     */
43
    private $jobRunnerRegistry;
44
45
    /**
46
     * @var JobStore
47
     */
48
    private $jobStore;
49
50
    /**
51
     * @var AbandonedJobMessageStore
52
     */
53
    private $abandonedJobMessageStore;
54
55
    /**
56
     * @var JobFlightManager
57
     */
58
    private $jobFlightManager;
59
60
    /**
61
     * @param PsrContext $context
62
     * @param StoredJobSerializer $storedJobSerializer
63
     * @param JobSerializer $jobSerializer
64
     * @param JobRunnerRegistry $jobRunnerRegistry
65
     * @param JobStore $jobStore
66
     * @param AbandonedJobMessageStore $abandonedJobMessageStore
67
     * @param JobFlightManager|null $jobFlightManager
68
     * @param LoggerInterface|null $logger
69
     */
70 3
    public function __construct(
71
        PsrContext $context,
72
        StoredJobSerializer $storedJobSerializer,
73
        JobSerializer $jobSerializer,
74
        JobRunnerRegistry $jobRunnerRegistry,
75
        JobStore $jobStore,
76
        AbandonedJobMessageStore $abandonedJobMessageStore,
77
        JobFlightManager $jobFlightManager = null,
78
        LoggerInterface $logger = null
79
    ) {
80 3
        $this->context = $context;
81 3
        $this->storedJobSerializer = $storedJobSerializer;
82 3
        $this->jobSerializer = $jobSerializer;
83 3
        $this->jobRunnerRegistry = $jobRunnerRegistry;
84 3
        $this->jobStore = $jobStore;
85 3
        $this->abandonedJobMessageStore = $abandonedJobMessageStore;
86 3
        $this->jobFlightManager = $jobFlightManager ?: new NullJobFlightManager;
87 3
        $this->setLogger($logger ?: new NullLogger);
88 3
    }
89
90
    /**
91
     * @param string $queueName
92
     * @param Closure|null $persist
93
     */
94 3
    public function execute(string $queueName, Closure $persist = null, Closure $clear = null): void
95
    {
96 3
        $consumer = $this->context->createConsumer($this->context->createQueue($queueName));
97 3
        $message = $consumer->receive(5000);
98 3
        if (null === $message) {
99 1
            return;
100
        }
101 2
        if (null === $message->getMessageId()) {
102
            $this->logger->alert('Message without ID.', ['message' => $message->getBody()]);
103
            return;
104
        }
105 2
        $storedJob = $this->storedJobSerializer->deserialize($message->getBody());
106 2
        $job = $this->jobSerializer->deserialize($storedJob->body(), $storedJob->name());
107
        try {
108 2
            $jobRunner = $this->jobRunnerRegistry->get(get_class($job));
109
        } catch (JobRunnerNotRegisteredException $e) {
110
            $this->abandonedJobMessageStore->add(
111
                new AbandonedJobMessage($queueName, $message->getBody(), $e->__toString())
112
            );
113
            $this->logger->alert(
114
                'No JobRunner is registered. Message is abandoned. Rejecting the message.',
115
                ['message' => $message->getBody()]
116
            );
117
            $this->jobFlightManager->rejected($message->getMessageId());
118
            $consumer->reject($message);
119
            return;
120
        }
121
        try {
122 2
            $jobRunner->run($job);
123 1 View Code Duplication
            if (!is_null($persist) && !$persist()) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
124 1
                $this->logger->info(
125 1
                    'Persistence failed after the job. Requeueing the message.',
126 1
                    ['message' => $message->getBody()]
127
                );
128 1
                $newMessageId = $message->getMessageId() . '+';
129 1
                $this->jobFlightManager->requeued($message->getMessageId(), $newMessageId);
130 1
                $message->setMessageId($newMessageId);
131 1
                $consumer->reject($message, true);
132 1
                return;
133
            }
134
            $dependentJobs = $job->dependentJobs();
135
            if (count($dependentJobs) > 0) {
136
                foreach ($dependentJobs as $dependentJob) {
137
                    $this->jobStore->append($dependentJob);
138
                }
139
            }
140
            $this->logger->info('Acknowledging message.', ['message' => $message->getBody()]);
141
            $this->jobFlightManager->acknowledged($message->getMessageId());
142
            $consumer->acknowledge($message);
143 1
        } catch (JobFailedException $e) {
144 1
            if ($job->isExpendable()) {
145
                $this->logger->debug(
146
                    'Expendable job failed. Acknowledging and letting it go.',
147
                    ['message' => $message->getBody()]
148
                );
149
                $this->jobFlightManager->letGo($message->getMessageId());
150
                $consumer->acknowledge($message);
151
                return;
152
            }
153 1
            $attempts = $message->getProperty('attempts', 0) + 1;
154 1
            if ($attempts >= $job->maxAttempts()) {
155
                if (!is_null($clear)) {
156
                    $clear();
157
                }
158
                $this->abandonedJobMessageStore->add(
159
                    new AbandonedJobMessage($queueName, $message->getBody(), $e->__toString())
160
                );
161
                $this->logger->info(
162
                    'Rejecting the message reaching the max attempts.',
163
                    ['message' => $message->getBody()]
164
                );
165
                $this->jobFlightManager->rejected($message->getMessageId());
166
                $consumer->reject($message);
167 View Code Duplication
                if (!is_null($persist) && !$persist()) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
168
                    $this->logger->info(
169
                        'Failed after the job. Requeueing the message.',
170
                        ['message' => $message->getBody()]
171
                    );
172
                    $newMessageId = $message->getMessageId() . '+';
173
                    $this->jobFlightManager->requeued($message->getMessageId(), $newMessageId);
174
                    $message->setMessageId($newMessageId);
175
                    $consumer->reject($message, true);
176
                }
177
                return;
178
            }
179 1
            $message->setProperty('attempts', $attempts);
180 1
            if ($job->reattemptDelay() > 0) {
181 1
                $message = $this->delayMessage($message, $job->reattemptDelay());
182
            }
183 1
            if (method_exists($message, 'setMessageDeduplicationId')) {
184 1
                $message->setMessageDeduplicationId(uniqid());
185
            }
186 1
            if (method_exists($message, 'setMessageGroupId')) {
187 1
                $message->setMessageGroupId(is_null($storedJob->fifoGroupId()) ? uniqid() : $storedJob->fifoGroupId());
188
            }
189 1
            $this->logger->info('Requeueing the message.', ['message' => $message->getBody()]);
190 1
            $newMessageId = $message->getMessageId() . '+';
191 1
            $this->jobFlightManager->requeued($message->getMessageId(), $newMessageId);
192 1
            $message->setMessageId($newMessageId);
193 1
            $consumer->reject($message, true);
194
        }
195 1
    }
196
197
    /**
198
     * @param PsrMessage $message
199
     * @param int $delay
200
     * @return PsrMessage
201
     */
202 1
    protected function delayMessage(PsrMessage $message, int $delay)
203
    {
204 1
        if ($message instanceof SqsMessage) {
205 1
            $message->setDelaySeconds($delay);
206
        }
207
208 1
        return $message;
209
    }
210
}
211