Passed
Push — master ( 1069fe...3696ab )
by Hirofumi
10:52
created

ConsumeStoredJobService   A

Complexity

Total Complexity 21

Size/Duplication

Total Lines 164
Duplicated Lines 10.98 %

Coupling/Cohesion

Components 1
Dependencies 18

Test Coverage

Coverage 93.24%

Importance

Changes 0
Metric Value
wmc 21
lcom 1
cbo 18
dl 18
loc 164
ccs 69
cts 74
cp 0.9324
rs 10
c 0
b 0
f 0

3 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 17 2
D execute() 18 84 17
A delayMessage() 0 8 2

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
<?php
2
3
namespace Shippinno\Job\Application\Messaging;
4
5
use Closure;
6
use Enqueue\Sqs\SqsMessage;
7
use Interop\Queue\PsrContext;
8
use Interop\Queue\PsrMessage;
9
use Psr\Log\LoggerAwareTrait;
10
use Psr\Log\LoggerInterface;
11
use Psr\Log\NullLogger;
12
use Shippinno\Job\Application\Job\JobRunnerRegistry;
13
use Shippinno\Job\Domain\Model\AbandonedJobMessage;
14
use Shippinno\Job\Domain\Model\AbandonedJobMessageStore;
15
use Shippinno\Job\Domain\Model\JobFailedException;
16
use Shippinno\Job\Domain\Model\JobRunnerNotRegisteredException;
17
use Shippinno\Job\Domain\Model\JobSerializer;
18
use Shippinno\Job\Domain\Model\JobStore;
19
use Shippinno\Job\Domain\Model\StoredJobSerializer;
20
21
class ConsumeStoredJobService
22
{
23
    use LoggerAwareTrait;
24
25
    /**
26
     * @var StoredJobSerializer
27
     */
28
    private $storedJobSerializer;
29
30
    /**
31
     * @var PsrContext
32
     */
33
    private $context;
34
35
    /**
36
     * @var JobSerializer
37
     */
38
    private $jobSerializer;
39
40
    /**
41
     * @var JobRunnerRegistry
42
     */
43
    private $jobRunnerRegistry;
44
45
    /**
46
     * @var JobStore
47
     */
48
    private $jobStore;
49
50
    /**
51
     * @var AbandonedJobMessageStore
52
     */
53
    private $abandonedJobMessageStore;
54
55
    /**
56
     * @param PsrContext $context
57
     * @param StoredJobSerializer $storedJobSerializer
58
     * @param JobSerializer $jobSerializer
59
     * @param JobRunnerRegistry $jobRunnerRegistry
60
     * @param JobStore $jobStore
61
     * @param AbandonedJobMessageStore $abandonedJobMessageStore
62
     * @param LoggerInterface|null $logger
63
     */
64 8
    public function __construct(
65
        PsrContext $context,
66
        StoredJobSerializer $storedJobSerializer,
67
        JobSerializer $jobSerializer,
68
        JobRunnerRegistry $jobRunnerRegistry,
69
        JobStore $jobStore,
70
        AbandonedJobMessageStore $abandonedJobMessageStore,
71
        LoggerInterface $logger = null
72
    ) {
73 8
        $this->context = $context;
74 8
        $this->storedJobSerializer = $storedJobSerializer;
75 8
        $this->jobSerializer = $jobSerializer;
76 8
        $this->jobRunnerRegistry = $jobRunnerRegistry;
77 8
        $this->jobStore = $jobStore;
78 8
        $this->abandonedJobMessageStore = $abandonedJobMessageStore;
79 8
        $this->setLogger(null !== $logger ? $logger : new NullLogger);
80 8
    }
81
82
    /**
83
     * @param string $queueName
84
     * @param Closure|null $persist
85
     */
86 8
    public function execute(string $queueName, Closure $persist = null, Closure $clear = null): void
87
    {
88 8
        $consumer = $this->context->createConsumer($this->context->createQueue($queueName));
89 8
        $message = $consumer->receive(5000);
90 8
        if (null === $message) {
91 1
            return;
92
        }
93 7
        $storedJob = $this->storedJobSerializer->deserialize($message->getBody());
94 7
        $job = $this->jobSerializer->deserialize($storedJob->body(), $storedJob->name());
95
        try {
96 7
            $jobRunner = $this->jobRunnerRegistry->get(get_class($job));
97 1
        } catch (JobRunnerNotRegisteredException $e) {
98 1
            $this->abandonedJobMessageStore->add(
99 1
                new AbandonedJobMessage($queueName, $message->getBody(), $e->__toString())
100
            );
101 1
            $this->logger->alert(
102 1
                'No JobRunner is registered. Message is abandoned. Rejecting the message.',
103 1
                ['message' => $message->getBody()]
104
            );
105 1
            $consumer->reject($message);
106 1
            return;
107
        }
108
        try {
109 6
            $jobRunner->run($job);
110 3 View Code Duplication
            if (!is_null($persist) && !$persist()) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
111 1
                $this->logger->info(
112 1
                    'Persistence failed after the job. Requeueing the message.',
113 1
                    ['message' => $message->getBody()]
114
                );
115 1
                $consumer->reject($message, true);
116 1
                return;
117
            }
118 2
            $dependentJobs = $job->dependentJobs();
119 2
            if (count($dependentJobs) > 0) {
120 1
                foreach ($dependentJobs as $dependentJob) {
121 1
                    $this->jobStore->append($dependentJob);
122
                }
123
            }
124 2
            $this->logger->info('Acknowledging message.', ['message' => $message->getBody()]);
125 2
            $consumer->acknowledge($message);
126 3
        } catch (JobFailedException $e) {
127 3
            if ($job->isExpendable()) {
128 1
                $this->logger->debug(
129 1
                    'Expendable job failed. Letting it go.',
130 1
                    ['message' => $message->getBody()]
131
                );
132 1
                return;
133
            }
134 2
            $attempts = $message->getProperty('attempts', 0) + 1;
135 2
            if ($attempts >= $job->maxAttempts()) {
136 1
                if (!is_null($clear)) {
137
                    $clear();
138
                }
139 1
                $this->abandonedJobMessageStore->add(
140 1
                    new AbandonedJobMessage($queueName, $message->getBody(), $e->__toString())
141
                );
142 1
                $this->logger->info(
143 1
                    'Rejecting the message reaching the max attempts.',
144 1
                    ['message' => $message->getBody()]
145
                );
146 1
                $consumer->reject($message);
147 1 View Code Duplication
                if (!is_null($persist) && !$persist()) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
148
                    $this->logger->info(
149
                        'Failed after the job. Requeueing the message.',
150
                        ['message' => $message->getBody()]
151
                    );
152
                    $consumer->reject($message, true);
153
                }
154 1
                return;
155
            }
156 1
            $message->setProperty('attempts', $attempts);
157 1
            if ($job->reattemptDelay() > 0) {
158 1
                $message = $this->delayMessage($message, $job->reattemptDelay());
159
            }
160 1
            if (method_exists($message, 'setMessageDeduplicationId')) {
161 1
                $message->setMessageDeduplicationId(uniqid());
162
            }
163 1 View Code Duplication
            if (method_exists($message, 'setMessageGroupId')) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
164 1
                $message->setMessageGroupId(is_null($storedJob->fifoGroupId()) ? uniqid() : $storedJob->fifoGroupId());
165
            }
166 1
            $this->logger->info('Requeueing the message.', ['message' => $message->getBody()]);
167 1
            $consumer->reject($message, true);
168
        }
169 3
    }
170
171
    /**
172
     * @param PsrMessage $message
173
     * @param int $delay
174
     * @return PsrMessage
175
     */
176 1
    protected function delayMessage(PsrMessage $message, int $delay)
177
    {
178 1
        if ($message instanceof SqsMessage) {
179 1
            $message->setDelaySeconds($delay);
180
        }
181
182 1
        return $message;
183
    }
184
}
185