Test Failed
Push — master ( f0cc3f...73392d )
by Hirofumi
02:22
created

EnqueueStoredJobsService   A

Complexity

Total Complexity 18

Size/Duplication

Total Lines 164
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 15

Test Coverage

Coverage 92.31%

Importance

Changes 0
Metric Value
wmc 18
lcom 1
cbo 15
dl 0
loc 164
ccs 60
cts 65
cp 0.9231
rs 10
c 0
b 0
f 0

6 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 15 3
C execute() 0 60 11
A getStoredJobsToEnqueue() 0 8 1
A createProducer() 0 6 1
A createTopic() 0 6 1
A createMessage() 0 6 1
1
<?php
2
3
namespace Shippinno\Job\Application\Messaging;
4
5
use Enqueue\Sqs\SqsMessage;
6
use Enqueue\Sqs\SqsProducer;
7
use Interop\Queue\PsrContext;
8
use Interop\Queue\PsrMessage;
9
use Interop\Queue\PsrProducer;
10
use Interop\Queue\PsrTopic;
11
use Psr\Log\LoggerAwareTrait;
12
use Psr\Log\LoggerInterface;
13
use Psr\Log\NullLogger;
14
use Shippinno\Job\Domain\Model\StoredJob;
15
use Shippinno\Job\Domain\Model\JobStore;
16
use Shippinno\Job\Domain\Model\FailedToEnqueueStoredJobException;
17
use Shippinno\Job\Domain\Model\StoredJobSerializer;
18
use Throwable;
19
20
class EnqueueStoredJobsService
21
{
22
    use LoggerAwareTrait;
23
24
    /**
25
     * @var PsrContext
26
     */
27
    private $context;
28
29
    /**
30
     * @var JobStore
31
     */
32
    protected $jobStore;
33
34
    /**
35
     * @var StoredJobSerializer
36
     */
37
    private $storedJobSerializer;
38
39
    /**
40
     * @var EnqueuedStoredJobTrackerStore
41
     */
42
    protected $enqueuedStoredJobTrackerStore;
43
44
    /**
45
     * @var JobFlightManager
46
     */
47
    private $jobFlightManager;
48
49
    /**
50
     * @param PsrContext $context
51
     * @param JobStore $jobStore
52
     * @param StoredJobSerializer $storedJobSerializer
53
     * @param EnqueuedStoredJobTrackerStore $enqueuedStoredJobTrackerStore
54
     * @param JobFlightManager|null $jobFlightManager
55
     * @param LoggerInterface|null $logger
56
     */
57 6
    public function __construct(
58
        PsrContext $context,
59
        JobStore $jobStore,
60
        StoredJobSerializer $storedJobSerializer,
61
        EnqueuedStoredJobTrackerStore $enqueuedStoredJobTrackerStore,
62
        JobFlightManager $jobFlightManager = null,
63
        LoggerInterface $logger = null
64
    ) {
65 6
        $this->context = $context;
66 6
        $this->jobStore = $jobStore;
67 6
        $this->storedJobSerializer = $storedJobSerializer;
68 6
        $this->enqueuedStoredJobTrackerStore = $enqueuedStoredJobTrackerStore;
69 6
        $this->jobFlightManager = $jobFlightManager ?: new NullJobFlightManager;
70 6
        $this->setLogger($logger ?: new NullLogger);
71 6
    }
72
73
    /**
74
     * @param string $topicName
75
     * @return int
76
     * @throws FailedToEnqueueStoredJobException
77
     */
78 6
    public function execute(string $topicName): int
79
    {
80 6
        $enqueuedMessagesCount = 0;
81 6
        $lastEnqueuedStoredJob = null;
82 6
        $jobFlightIds = $this->jobFlightManager->preBoardingJobFlights($topicName);
83 6
        if (0 === count($jobFlightIds)) {
84 1
            return $enqueuedMessagesCount;
85
        }
86 5
        $storedJobsToEnqueue = $this->jobStore->storedJobsOfIds($jobFlightIds);
0 ignored issues
show
Documentation introduced by
$jobFlightIds is of type array<integer,object<Shi...n\Messaging\JobFlight>>, but the function expects a array<integer,integer>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
87 5
        $producer = $this->createProducer();
88 5
        $topic = $this->createTopic($topicName);
89
        try {
90 5
            $messages = [];
91 5
            foreach ($storedJobsToEnqueue as $storedJob) {
92 5
                $this->jobFlightManager->boarding($storedJob->id());
93 5
                $message = $this->createMessage($storedJob);
94 5
                $message->setMessageId($storedJob->id());
95 5
                if ($message instanceof SqsMessage) {
96 2
                    $message->setMessageDeduplicationId(uniqid());
97 2
                    $message->setMessageGroupId(
98 2
                        is_null($storedJob->fifoGroupId())
99 2
                            ? uniqid()
100 2
                            : $storedJob->fifoGroupId()
101
                    );
102
                }
103 5
                $messages[] = [
104 5
                    'jobName' => $storedJob->name(),
105 5
                    'message' => $message
106
                ];
107
            }
108 4
            if ($producer instanceof SqsProducer) {
109 2
                foreach (array_chunk($messages, 10) as $i => $chunk) {
110
                    /** @var PsrMessage[] $chunk */
111 2
                    $enqueuedMessagesCount = $enqueuedMessagesCount + count($chunk);
112 2
                    $lastEnqueuedStoredJob = $storedJobsToEnqueue[$i * 10 + count($chunk) - 1];
113 2
                    $producer->sendAll($topic, array_column($chunk, 'message'));
114 1
                    foreach ($chunk as $message) {
115 1
                        $this->jobFlightManager->departed($message['message']->getMessageId());
116
                    }
117
                }
118
            } else {
119 2
                foreach ($messages as $i => $message) {
120 2
                    $producer->send($topic, $message['message']);
121 2
                    $enqueuedMessagesCount = $enqueuedMessagesCount + 1;
122 2
                    $lastEnqueuedStoredJob = $storedJobsToEnqueue[$i];
123 3
                    $this->jobFlightManager->departed($message['message']->getMessageId());
124
                }
125
            }
126 2
        } catch (Throwable $e) {
127 2
            throw new FailedToEnqueueStoredJobException($enqueuedMessagesCount, $e);
128 3
        } finally {
129 5
            if (null !== $lastEnqueuedStoredJob) {
130 4
                $this->enqueuedStoredJobTrackerStore->trackLastEnqueuedStoredJob($topicName, $lastEnqueuedStoredJob);
131 4
                $this->logger->debug('last enqueued stored job update:',
132 5
                    ['jobId'=> $lastEnqueuedStoredJob->id()]);
133
            }
134
        }
135
136 3
        return $enqueuedMessagesCount;
137
    }
138
139
    /**
140
     * @param string $topicName
141
     * @return StoredJob[]
142
     */
143
    private function getStoredJobsToEnqueue(string $topicName): array
0 ignored issues
show
Unused Code introduced by
This method is not used, and could be removed.
Loading history...
144
    {
145
        $this->logger->debug('last enqueued stored job:',
146
            ['jobId'=> $this->enqueuedStoredJobTrackerStore->lastEnqueuedStoredJobId($topicName)]);
147
        return $this->jobStore->storedJobsSince(
148
            $this->enqueuedStoredJobTrackerStore->lastEnqueuedStoredJobId($topicName)
149
        );
150
    }
151
152
    /**
153
     * @return PsrProducer
154
     */
155 5
    protected function createProducer(): PsrProducer
156
    {
157 5
        $producer = $this->context->createProducer();
158
159 5
        return $producer;
160
    }
161
162
    /**
163
     * @param string $topicName
164
     * @return PsrTopic
165
     */
166 5
    protected function createTopic(string $topicName): PsrTopic
167
    {
168 5
        $topic = $this->context->createTopic($topicName);
169
170 5
        return $topic;
171
    }
172
173
    /**
174
     * @param StoredJob $storedJob
175
     * @return PsrMessage
176
     */
177 5
    protected function createMessage(StoredJob $storedJob): PsrMessage
178
    {
179 5
        $message = $this->context->createMessage($this->storedJobSerializer->serialize($storedJob));
180
181 5
        return $message;
182
    }
183
}
184