Test Failed
Push — master ( 9ba503...581a8d )
by Aya
01:52
created

EnqueueStoredJobsService::getStoredJobsToEnqueue()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 8
ccs 5
cts 5
cp 1
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 1
crap 1
1
<?php
2
3
namespace Shippinno\Job\Application\Messaging;
4
5
use Enqueue\Sqs\SqsMessage;
6
use Enqueue\Sqs\SqsProducer;
7
use Interop\Queue\PsrContext;
8
use Interop\Queue\PsrMessage;
9
use Interop\Queue\PsrProducer;
10
use Interop\Queue\PsrTopic;
11
use Psr\Log\LoggerAwareTrait;
12
use Psr\Log\LoggerInterface;
13
use Psr\Log\NullLogger;
14
use Shippinno\Job\Domain\Model\StoredJob;
15
use Shippinno\Job\Domain\Model\JobStore;
16
use Shippinno\Job\Domain\Model\FailedToEnqueueStoredJobException;
17
use Shippinno\Job\Domain\Model\StoredJobSerializer;
18
use Throwable;
19
20
class EnqueueStoredJobsService
21
{
22
    use LoggerAwareTrait;
23
24
    /**
25
     * @var PsrContext
26
     */
27
    private $context;
28
29
    /**
30
     * @var JobStore
31
     */
32
    protected $jobStore;
33
34
    /**
35
     * @var StoredJobSerializer
36
     */
37
    private $storedJobSerializer;
38
39
    /**
40
     * @var EnqueuedStoredJobTrackerStore
41
     */
42
    protected $enqueuedStoredJobTrackerStore;
43
44
    /**
45
     * @var JobFlightManager
46
     */
47
    private $jobFlightManager;
48
49
    /**
50
     * @param PsrContext $context
51
     * @param JobStore $jobStore
52
     * @param StoredJobSerializer $storedJobSerializer
53
     * @param EnqueuedStoredJobTrackerStore $enqueuedStoredJobTrackerStore
54
     * @param JobFlightManager|null $jobFlightManager
55
     * @param LoggerInterface|null $logger
56
     */
57 5
    public function __construct(
58
        PsrContext $context,
59
        JobStore $jobStore,
60
        StoredJobSerializer $storedJobSerializer,
61
        EnqueuedStoredJobTrackerStore $enqueuedStoredJobTrackerStore,
62
        JobFlightManager $jobFlightManager = null,
63
        LoggerInterface $logger = null
64
    ) {
65 5
        $this->context = $context;
66 5
        $this->jobStore = $jobStore;
67 5
        $this->storedJobSerializer = $storedJobSerializer;
68 5
        $this->enqueuedStoredJobTrackerStore = $enqueuedStoredJobTrackerStore;
69 5
        $this->jobFlightManager = $jobFlightManager ?: new NullJobFlightManager;
70 5
        $this->setLogger($logger ?: new NullLogger);
71 5
    }
72
73
    /**
74
     * @param string $topicName
75
     * @return int
76
     * @throws FailedToEnqueueStoredJobException
77
     */
78 5
    public function execute(string $topicName): int
79
    {
80 5
        $enqueuedMessagesCount = 0;
81 5
        $lastEnqueuedStoredJob = null;
82 5
        $storedJobsToEnqueue = $this->getStoredJobsToEnqueue($topicName);
83 5
        if (0 === count($storedJobsToEnqueue)) {
84 1
            return $enqueuedMessagesCount;
85
        }
86 4
        $producer = $this->createProducer();
87 4
        $topic = $this->createTopic($topicName);
88
        try {
89
            /** @var PsrMessage[] $messages */
90 4
            $messages = [];
91 4
            foreach ($storedJobsToEnqueue as $storedJob) {
92 4
                $message = $this->createMessage($storedJob);
93 4
                $message->setMessageId($storedJob->id());
94 4
                if ($message instanceof SqsMessage) {
95 2
                    $message->setMessageDeduplicationId(uniqid());
96 2
                    $message->setMessageGroupId(
97 2
                        is_null($storedJob->fifoGroupId())
98 2
                            ? uniqid()
99 2
                            : $storedJob->fifoGroupId()
100
                    );
101
                }
102 4
                $messages[] = [
103 4
                    'jobName' => $storedJob->name(),
104 4
                    'message' => $message
105
                ];
106
            }
107 4
            if ($producer instanceof SqsProducer) {
108 2
                foreach (array_chunk($messages, 10) as $i => $chunk) {
109 2
                    $enqueuedMessagesCount = $enqueuedMessagesCount + count($chunk);
110 2
                    $lastEnqueuedStoredJob = $storedJobsToEnqueue[$i * 10 + count($chunk) - 1];
111 2
                    $producer->sendAll($topic, array_column($chunk, 'message'));
112 1
                    foreach ($chunk as $message) {
113 1
                        $this->jobFlightManager->departed(
114 1
                            $message['message']->getMessageId(),
115 1
                            $message['jobName'],
116 1
                            $topicName
117
                        );
118
                    }
119
                }
120
            } else {
121 2
                foreach ($messages as $i => $message) {
122 2
                    $producer->send($topic, $message['message']);
123 2
                    $enqueuedMessagesCount = $enqueuedMessagesCount + 1;
124 2
                    $lastEnqueuedStoredJob = $storedJobsToEnqueue[$i];
125 2
                    $this->jobFlightManager->departed(
126 2
                        $message['message']->getMessageId(),
127 2
                        $message['jobName'],
128 3
                        $topicName
129
                    );
130
                }
131
            }
132 2
        } catch (Throwable $e) {
133 2
            throw new FailedToEnqueueStoredJobException($enqueuedMessagesCount, $e);
134 2
        } finally {
135 4
            if (null !== $lastEnqueuedStoredJob) {
136 4
                $this->enqueuedStoredJobTrackerStore->trackLastEnqueuedStoredJob($topicName, $lastEnqueuedStoredJob);
137 4
                $this->logger->debug('last enqueued stored job update:',
138 4
                    ['jobId'=> $lastEnqueuedStoredJob->id()]);
139
            }
140
        }
141
142 2
        return $enqueuedMessagesCount;
143
    }
144
145
    /**
146
     * @param string $topicName
147
     * @return StoredJob[]
148
     */
149 5
    private function getStoredJobsToEnqueue(string $topicName): array
150
    {
151 5
        $this->logger->debug('last enqueued stored job:',
152 5
            ['jobId'=> $this->enqueuedStoredJobTrackerStore->lastEnqueuedStoredJobId($topicName)]);
153 5
        return $this->jobStore->storedJobsSince(
154 5
            $this->enqueuedStoredJobTrackerStore->lastEnqueuedStoredJobId($topicName)
155
        );
156
    }
157
158
    /**
159
     * @return PsrProducer
160
     */
161 4
    protected function createProducer(): PsrProducer
162
    {
163 4
        $producer = $this->context->createProducer();
164
165 4
        return $producer;
166
    }
167
168
    /**
169
     * @param string $topicName
170
     * @return PsrTopic
171
     */
172 4
    protected function createTopic(string $topicName): PsrTopic
173
    {
174 4
        $topic = $this->context->createTopic($topicName);
175
176 4
        return $topic;
177
    }
178
179
    /**
180
     * @param StoredJob $storedJob
181
     * @return PsrMessage
182
     */
183 4
    protected function createMessage(StoredJob $storedJob): PsrMessage
184
    {
185 4
        $message = $this->context->createMessage($this->storedJobSerializer->serialize($storedJob));
186
187 4
        return $message;
188
    }
189
}
190