Passed
Push — master ( 659b12...6731a8 )
by Hirofumi
02:31
created

EnqueueStoredJobsService::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 11
ccs 6
cts 6
cp 1
rs 9.9
c 0
b 0
f 0
cc 1
nc 1
nop 4
crap 1
1
<?php
2
3
namespace Shippinno\Job\Application\Messaging;
4
5
use Enqueue\Sqs\SqsMessage;
6
use Enqueue\Sqs\SqsProducer;
7
use Interop\Queue\PsrContext;
8
use Interop\Queue\PsrMessage;
9
use Interop\Queue\PsrProducer;
10
use Interop\Queue\PsrTopic;
11
use Shippinno\Job\Domain\Model\StoredJob;
12
use Shippinno\Job\Domain\Model\JobStore;
13
use Shippinno\Job\Domain\Model\FailedToEnqueueStoredJobException;
14
use Shippinno\Job\Domain\Model\StoredJobSerializer;
15
use Throwable;
16
17
class EnqueueStoredJobsService
18
{
19
    /**
20
     * @var PsrContext
21
     */
22
    private $context;
23
24
    /**
25
     * @var JobStore
26
     */
27
    protected $jobStore;
28
29
    /**
30
     * @var StoredJobSerializer
31
     */
32
    private $storedJobSerializer;
33
34
    /**
35
     * @var EnqueuedStoredJobTrackerStore
36
     */
37
    protected $enqueuedStoredJobTrackerStore;
38
39
    /**
40
     * @param PsrContext $context
41
     * @param JobStore $jobStore
42
     * @param StoredJobSerializer $storedJobSerializer
43
     * @param EnqueuedStoredJobTrackerStore $enqueuedStoredJobTrackerStore
44
     */
45 5
    public function __construct(
46
        PsrContext $context,
47
        JobStore $jobStore,
48
        StoredJobSerializer $storedJobSerializer,
49
        EnqueuedStoredJobTrackerStore $enqueuedStoredJobTrackerStore
50
    ) {
51 5
        $this->context = $context;
52 5
        $this->jobStore = $jobStore;
53 5
        $this->storedJobSerializer = $storedJobSerializer;
54 5
        $this->enqueuedStoredJobTrackerStore = $enqueuedStoredJobTrackerStore;
55 5
    }
56
57
    /**
58
     * @param string $topicName
59
     * @return int
60
     * @throws FailedToEnqueueStoredJobException
61
     */
62 5
    public function execute(string $topicName): int
63
    {
64 5
        $enqueuedMessagesCount = 0;
65 5
        $lastEnqueuedStoredJob = null;
66 5
        $storedJobsToEnqueue = $this->getStoredJobsToEnqueue($topicName);
67 5
        if (0 === count($storedJobsToEnqueue)) {
68 1
            return $enqueuedMessagesCount;
69
        }
70 4
        $producer = $this->createProducer();
71 4
        $topic = $this->createTopic($topicName);
72
        try {
73 4
            $messages = [];
74 4
            foreach ($storedJobsToEnqueue as $storedJob) {
75 4
                $message = $this->createMessage($storedJob);
76 4
                if ($message instanceof SqsMessage) {
77 2
                    $message->setMessageId($storedJob->id());
78 2
                    $message->setMessageDeduplicationId(uniqid());
79 2
                    $message->setMessageGroupId(
80 2
                        is_null($storedJob->fifoGroupId())
81 2
                            ? uniqid()
82 2
                            : $storedJob->fifoGroupId()
83
                    );
84
                }
85 4
                $messages[] = $message;
86
            }
87 4
            if ($producer instanceof SqsProducer) {
88 2
                foreach (array_chunk($messages, 10) as $i => $chunk) {
89 2
                    $enqueuedMessagesCount = $enqueuedMessagesCount + count($chunk);
90 2
                    $lastEnqueuedStoredJob = $storedJobsToEnqueue[$i * 10 + count($chunk) - 1];
91 2
                    $producer->sendAll($topic, $chunk);
92
                }
93
            } else {
94 2
                foreach ($messages as $i => $message) {
95 2
                    $producer->send($topic, $message);
96 2
                    $enqueuedMessagesCount = $enqueuedMessagesCount + 1;
97 3
                    $lastEnqueuedStoredJob = $storedJobsToEnqueue[$i];
98
                }
99
            }
100 2
        } catch (Throwable $e) {
101 2
            throw new FailedToEnqueueStoredJobException($enqueuedMessagesCount, $e);
102 2
        } finally {
103 4
            if (null !== $lastEnqueuedStoredJob) {
104 4
                $this->enqueuedStoredJobTrackerStore->trackLastEnqueuedStoredJob($topicName, $lastEnqueuedStoredJob);
105
            }
106
        }
107
108 2
        return $enqueuedMessagesCount;
109
    }
110
111
    /**
112
     * @param string $topicName
113
     * @return StoredJob[]
114
     */
115 5
    private function getStoredJobsToEnqueue(string $topicName): array
116
    {
117 5
        return $this->jobStore->storedJobsSince(
118 5
            $this->enqueuedStoredJobTrackerStore->lastEnqueuedStoredJobId($topicName)
119
        );
120
    }
121
122
    /**
123
     * @return PsrProducer
124
     */
125 4
    protected function createProducer(): PsrProducer
126
    {
127 4
        $producer = $this->context->createProducer();
128
129 4
        return $producer;
130
    }
131
132
    /**
133
     * @param string $topicName
134
     * @return PsrTopic
135
     */
136 4
    protected function createTopic(string $topicName): PsrTopic
137
    {
138 4
        $topic = $this->context->createTopic($topicName);
139
140 4
        return $topic;
141
    }
142
143
    /**
144
     * @param StoredJob $storedJob
145
     * @return PsrMessage
146
     */
147 4
    protected function createMessage(StoredJob $storedJob): PsrMessage
148
    {
149 4
        $message = $this->context->createMessage($this->storedJobSerializer->serialize($storedJob));
150
151 4
        return $message;
152
    }
153
}
154