Test Failed
Push — master ( 03f785...9ba503 )
by Aya
02:11
created

EnqueueStoredJobsService   A

Complexity

Total Complexity 17

Size/Duplication

Total Lines 170
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 14

Test Coverage

Coverage 20.29%

Importance

Changes 0
Metric Value
wmc 17
lcom 1
cbo 14
dl 0
loc 170
ccs 14
cts 69
cp 0.2029
rs 10
c 0
b 0
f 0

6 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 15 2
C execute() 0 66 11
A getStoredJobsToEnqueue() 0 8 1
A createProducer() 0 6 1
A createTopic() 0 6 1
A createMessage() 0 6 1
1
<?php
2
3
namespace Shippinno\Job\Application\Messaging;
4
5
use Enqueue\Sqs\SqsMessage;
6
use Enqueue\Sqs\SqsProducer;
7
use Interop\Queue\PsrContext;
8
use Interop\Queue\PsrMessage;
9
use Interop\Queue\PsrProducer;
10
use Interop\Queue\PsrTopic;
11
use Psr\Log\LoggerAwareTrait;
12
use Psr\Log\LoggerInterface;
13
use Psr\Log\NullLogger;
14
use Shippinno\Job\Domain\Model\StoredJob;
15
use Shippinno\Job\Domain\Model\JobStore;
16
use Shippinno\Job\Domain\Model\FailedToEnqueueStoredJobException;
17
use Shippinno\Job\Domain\Model\StoredJobSerializer;
18
use Throwable;
19
20
class EnqueueStoredJobsService
21
{
22
    use LoggerAwareTrait;
23
24
    /**
25
     * @var PsrContext
26
     */
27
    private $context;
28
29
    /**
30
     * @var JobStore
31
     */
32
    protected $jobStore;
33
34
    /**
35
     * @var StoredJobSerializer
36
     */
37
    private $storedJobSerializer;
38
39
    /**
40
     * @var EnqueuedStoredJobTrackerStore
41
     */
42
    protected $enqueuedStoredJobTrackerStore;
43
44
    /**
45
     * @var JobFlightManager
46
     */
47
    private $jobFlightManager;
48
49
    /**
50
     * @param PsrContext $context
51
     * @param JobStore $jobStore
52
     * @param StoredJobSerializer $storedJobSerializer
53
     * @param EnqueuedStoredJobTrackerStore $enqueuedStoredJobTrackerStore
54
     * @param JobFlightManager|null $jobFlightManager
55
     * @param LoggerInterface|null $logger
56
     */
57 1
    public function __construct(
58
        PsrContext $context,
59
        JobStore $jobStore,
60
        StoredJobSerializer $storedJobSerializer,
61
        EnqueuedStoredJobTrackerStore $enqueuedStoredJobTrackerStore,
62
        JobFlightManager $jobFlightManager = null,
63
        LoggerInterface $logger = null
64
    ) {
65 1
        $this->context = $context;
66 1
        $this->jobStore = $jobStore;
67 1
        $this->storedJobSerializer = $storedJobSerializer;
68 1
        $this->enqueuedStoredJobTrackerStore = $enqueuedStoredJobTrackerStore;
69 1
        $this->jobFlightManager = $jobFlightManager ?: new NullJobFlightManager;
70 1
        $this->logger = $logger;
71 1
    }
72
73
    /**
74
     * @param string $topicName
75
     * @return int
76
     * @throws FailedToEnqueueStoredJobException
77
     */
78 1
    public function execute(string $topicName): int
79
    {
80 1
        $enqueuedMessagesCount = 0;
81 1
        $lastEnqueuedStoredJob = null;
82 1
        $storedJobsToEnqueue = $this->getStoredJobsToEnqueue($topicName);
83
        if (0 === count($storedJobsToEnqueue)) {
84
            return $enqueuedMessagesCount;
85
        }
86
        $producer = $this->createProducer();
87
        $topic = $this->createTopic($topicName);
88
        try {
89
            /** @var PsrMessage[] $messages */
90
            $messages = [];
91
            foreach ($storedJobsToEnqueue as $storedJob) {
92
                $message = $this->createMessage($storedJob);
93
                $message->setMessageId($storedJob->id());
94
                if ($message instanceof SqsMessage) {
95
                    $message->setMessageDeduplicationId(uniqid());
96
                    $message->setMessageGroupId(
97
                        is_null($storedJob->fifoGroupId())
98
                            ? uniqid()
99
                            : $storedJob->fifoGroupId()
100
                    );
101
                }
102
                $messages[] = [
103
                    'jobName' => $storedJob->name(),
104
                    'message' => $message
105
                ];
106
            }
107
            if ($producer instanceof SqsProducer) {
108
                foreach (array_chunk($messages, 10) as $i => $chunk) {
109
                    $enqueuedMessagesCount = $enqueuedMessagesCount + count($chunk);
110
                    $lastEnqueuedStoredJob = $storedJobsToEnqueue[$i * 10 + count($chunk) - 1];
111
                    $producer->sendAll($topic, array_column($chunk, 'message'));
112
                    foreach ($chunk as $message) {
113
                        $this->jobFlightManager->departed(
114
                            $message['message']->getMessageId(),
115
                            $message['jobName'],
116
                            $topicName
117
                        );
118
                    }
119
                }
120
            } else {
121
                foreach ($messages as $i => $message) {
122
                    $producer->send($topic, $message['message']);
123
                    $enqueuedMessagesCount = $enqueuedMessagesCount + 1;
124
                    $lastEnqueuedStoredJob = $storedJobsToEnqueue[$i];
125
                    $this->jobFlightManager->departed(
126
                        $message['message']->getMessageId(),
127
                        $message['jobName'],
128
                        $topicName
129
                    );
130
                }
131
            }
132
        } catch (Throwable $e) {
133
            throw new FailedToEnqueueStoredJobException($enqueuedMessagesCount, $e);
134
        } finally {
135
            if (null !== $lastEnqueuedStoredJob) {
136
                $this->enqueuedStoredJobTrackerStore->trackLastEnqueuedStoredJob($topicName, $lastEnqueuedStoredJob);
137
                $this->logger->debug('last enqueued stored job update:',
138
                    ['jobId'=> $lastEnqueuedStoredJob->id()]);
139
            }
140
        }
141
142
        return $enqueuedMessagesCount;
143
    }
144
145
    /**
146
     * @param string $topicName
147
     * @return StoredJob[]
148
     */
149 1
    private function getStoredJobsToEnqueue(string $topicName): array
150
    {
151 1
        $this->logger->debug('last enqueued stored job:',
152
            ['jobId'=> $this->enqueuedStoredJobTrackerStore->lastEnqueuedStoredJobId($topicName)]);
153
        return $this->jobStore->storedJobsSince(
154
            $this->enqueuedStoredJobTrackerStore->lastEnqueuedStoredJobId($topicName)
155
        );
156
    }
157
158
    /**
159
     * @return PsrProducer
160
     */
161
    protected function createProducer(): PsrProducer
162
    {
163
        $producer = $this->context->createProducer();
164
165
        return $producer;
166
    }
167
168
    /**
169
     * @param string $topicName
170
     * @return PsrTopic
171
     */
172
    protected function createTopic(string $topicName): PsrTopic
173
    {
174
        $topic = $this->context->createTopic($topicName);
175
176
        return $topic;
177
    }
178
179
    /**
180
     * @param StoredJob $storedJob
181
     * @return PsrMessage
182
     */
183
    protected function createMessage(StoredJob $storedJob): PsrMessage
184
    {
185
        $message = $this->context->createMessage($this->storedJobSerializer->serialize($storedJob));
186
187
        return $message;
188
    }
189
}
190