Test Failed
Push — master ( 63f589...659b12 )
by Hirofumi
12:45
created

EnqueueStoredJobsService::execute()   C

Complexity

Conditions 10
Paths 208

Size

Total Lines 48

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 33
CRAP Score 10.0025

Importance

Changes 0
Metric Value
dl 0
loc 48
ccs 33
cts 34
cp 0.9706
rs 6.5212
c 0
b 0
f 0
cc 10
nc 208
nop 1
crap 10.0025

How to fix   Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace Shippinno\Job\Application\Messaging;
4
5
use Enqueue\Sqs\SqsMessage;
6
use Enqueue\Sqs\SqsProducer;
7
use Interop\Queue\PsrContext;
8
use Interop\Queue\PsrMessage;
9
use Interop\Queue\PsrProducer;
10
use Interop\Queue\PsrTopic;
11
use Shippinno\Job\Domain\Model\StoredJob;
12
use Shippinno\Job\Domain\Model\JobStore;
13
use Shippinno\Job\Domain\Model\FailedToEnqueueStoredJobException;
14
use Shippinno\Job\Domain\Model\StoredJobSerializer;
15
use Throwable;
16
17
class EnqueueStoredJobsService
18
{
19
    /**
20
     * @var PsrContext
21
     */
22
    private $context;
23
24
    /**
25
     * @var JobStore
26
     */
27
    protected $jobStore;
28
29
    /**
30
     * @var StoredJobSerializer
31
     */
32
    private $storedJobSerializer;
33
34
    /**
35
     * @var EnqueuedStoredJobTrackerStore
36
     */
37
    protected $enqueuedStoredJobTrackerStore;
38
39
    /**
40
     * @param PsrContext $context
41
     * @param JobStore $jobStore
42
     * @param StoredJobSerializer $storedJobSerializer
43
     * @param EnqueuedStoredJobTrackerStore $enqueuedStoredJobTrackerStore
44
     */
45 4
    public function __construct(
46
        PsrContext $context,
47
        JobStore $jobStore,
48
        StoredJobSerializer $storedJobSerializer,
49
        EnqueuedStoredJobTrackerStore $enqueuedStoredJobTrackerStore
50
    ) {
51 4
        $this->context = $context;
52 4
        $this->jobStore = $jobStore;
53 4
        $this->storedJobSerializer = $storedJobSerializer;
54 4
        $this->enqueuedStoredJobTrackerStore = $enqueuedStoredJobTrackerStore;
55 4
    }
56
57
    /**
58
     * @param string $topicName
59
     * @return int
60
     * @throws FailedToEnqueueStoredJobException
61
     */
62 4
    public function execute(string $topicName): int
63
    {
64 4
        $enqueuedMessagesCount = 0;
65 4
        $lastEnqueuedStoredJob = null;
66 4
        $storedJobsToEnqueue = $this->getStoredJobsToEnqueue($topicName);
67 4
        if (0 === count($storedJobsToEnqueue)) {
68 1
            return $enqueuedMessagesCount;
69
        }
70 3
        $producer = $this->createProducer();
71 3
        $topic = $this->createTopic($topicName);
72
        try {
73 3
            $messages = [];
74 3
            foreach ($storedJobsToEnqueue as $storedJob) {
75 3
                $message = $this->createMessage($storedJob);
76 3
                if ($message instanceof SqsMessage) {
77 1
                    $message->setMessageId($storedJob->id());
78 1
                    $message->setMessageDeduplicationId(uniqid());
79 1
                    $message->setMessageGroupId(
80 1
                        is_null($storedJob->fifoGroupId())
81 1
                            ? uniqid()
82 1
                            : $storedJob->fifoGroupId()
83
                    );
84
                }
85 3
                $messages[] = $message;
86
            }
87 3
            if ($producer instanceof SqsProducer) {
88 1
                foreach (array_chunk($messages, 10) as $i => $chunk) {
89 1
                    $enqueuedMessagesCount = $enqueuedMessagesCount + count($chunk);
90 1
                    $lastEnqueuedStoredJob = $storedJobsToEnqueue[($i + 1) * 10 - 1];
91
                    $producer->sendAll($topic, $chunk);
92
                }
93
            } else {
94 2
                foreach ($messages as $i => $message) {
95 2
                    $producer->send($topic, $message);
96 2
                    $enqueuedMessagesCount = $enqueuedMessagesCount + 1;
97 2
                    $lastEnqueuedStoredJob = $storedJobsToEnqueue[$i];
98
                }
99
            }
100 2
        } catch (Throwable $e) {
101 2
            throw new FailedToEnqueueStoredJobException($enqueuedMessagesCount, $e);
102 1
        } finally {
103 3
            if (null !== $lastEnqueuedStoredJob) {
104 3
                $this->enqueuedStoredJobTrackerStore->trackLastEnqueuedStoredJob($topicName, $lastEnqueuedStoredJob);
105
            }
106
        }
107
108 1
        return $enqueuedMessagesCount;
109
    }
110
111
    /**
112
     * @param string $topicName
113
     * @return StoredJob[]
114
     */
115 4
    private function getStoredJobsToEnqueue(string $topicName): array
116
    {
117 4
        return $this->jobStore->storedJobsSince(
118 4
            $this->enqueuedStoredJobTrackerStore->lastEnqueuedStoredJobId($topicName)
119
        );
120
    }
121
122
    /**
123
     * @return PsrProducer
124
     */
125 3
    protected function createProducer(): PsrProducer
126
    {
127 3
        $producer = $this->context->createProducer();
128
129 3
        return $producer;
130
    }
131
132
    /**
133
     * @param string $topicName
134
     * @return PsrTopic
135
     */
136 3
    protected function createTopic(string $topicName): PsrTopic
137
    {
138 3
        $topic = $this->context->createTopic($topicName);
139
140 3
        return $topic;
141
    }
142
143
    /**
144
     * @param StoredJob $storedJob
145
     * @return PsrMessage
146
     */
147 3
    protected function createMessage(StoredJob $storedJob): PsrMessage
148
    {
149 3
        $message = $this->context->createMessage($this->storedJobSerializer->serialize($storedJob));
150
151 3
        return $message;
152
    }
153
}
154