Passed
Push — master ( 806c04...bf1804 )
by Hirofumi
05:36
created

EnqueueStoredJobsService::execute()   C

Complexity

Conditions 8
Paths 70

Size

Total Lines 33
Code Lines 24

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 21
CRAP Score 8.0421

Importance

Changes 0
Metric Value
dl 0
loc 33
ccs 21
cts 23
cp 0.913
rs 5.3846
c 0
b 0
f 0
cc 8
eloc 24
nc 70
nop 1
crap 8.0421
1
<?php
2
3
namespace Shippinno\Job\Application\Messaging;
4
5
use Interop\Queue\PsrContext;
6
use Interop\Queue\PsrMessage;
7
use Interop\Queue\PsrProducer;
8
use Interop\Queue\PsrTopic;
9
use Shippinno\Job\Domain\Model\StoredJob;
10
use Shippinno\Job\Domain\Model\JobStore;
11
use Shippinno\Job\Domain\Model\FailedToEnqueueStoredJobException;
12
use Shippinno\Job\Domain\Model\StoredJobSerializer;
13
use Throwable;
14
15
class EnqueueStoredJobsService
16
{
17
    /**
18
     * @var PsrContext
19
     */
20
    private $context;
21
22
    /**
23
     * @var JobStore
24
     */
25
    protected $jobStore;
26
27
    /**
28
     * @var StoredJobSerializer
29
     */
30
    private $storedJobSerializer;
31
32
    /**
33
     * @var EnqueuedStoredJobTrackerStore
34
     */
35
    protected $enqueuedStoredJobTrackerStore;
36
37
    /**
38
     * @param PsrContext $context
39
     * @param JobStore $jobStore
40
     * @param StoredJobSerializer $storedJobSerializer
41
     * @param EnqueuedStoredJobTrackerStore $enqueuedStoredJobTrackerStore
42
     */
43 3
    public function __construct(
44
        PsrContext $context,
45
        JobStore $jobStore,
46
        StoredJobSerializer $storedJobSerializer,
47
        EnqueuedStoredJobTrackerStore $enqueuedStoredJobTrackerStore
48
    ) {
49 3
        $this->context = $context;
50 3
        $this->jobStore = $jobStore;
51 3
        $this->storedJobSerializer = $storedJobSerializer;
52 3
        $this->enqueuedStoredJobTrackerStore = $enqueuedStoredJobTrackerStore;
53 3
    }
54
55
    /**
56
     * @param string $topicName
57
     * @return int
58
     * @throws FailedToEnqueueStoredJobException
59
     */
60 3
    public function execute(string $topicName): int
61
    {
62 3
        $enqueuedMessagesCount = 0;
63 3
        $lastEnqueuedStoredJob = null;
64 3
        $storedJobsToEnqueue = $this->getStoredJobsToEnqueue($topicName);
65 3
        if (0 === count($storedJobsToEnqueue)) {
66 1
            return $enqueuedMessagesCount;
67
        }
68 2
        $producer = $this->createProducer();
69 2
        $topic = $this->createTopic($topicName);
70
        try {
71 2
            foreach ($storedJobsToEnqueue as $storedJob) {
72 2
                $message = $this->createMessage($storedJob);
73 2
                if (method_exists($message, 'setMessageDeduplicationId')) {
74
                    $message->setMessageDeduplicationId(uniqid());
75
                }
76 2
                if (method_exists($message, 'setMessageGroupId')) {
77
                    $message->setMessageGroupId(is_null($storedJob->fifoGroupId()) ? uniqid() : $storedJob->fifoGroupId());
78
                }
79 2
                $producer->send($topic, $message);
80 2
                $enqueuedMessagesCount = $enqueuedMessagesCount + 1;
81 2
                $lastEnqueuedStoredJob = $storedJob;
82
            }
83 1
        } catch (Throwable $e) {
84 1
            throw new FailedToEnqueueStoredJobException($enqueuedMessagesCount, $e);
85 1
        } finally {
86 2
            if (null !== $lastEnqueuedStoredJob) {
87 2
                $this->enqueuedStoredJobTrackerStore->trackLastEnqueuedStoredJob($topicName, $lastEnqueuedStoredJob);
88
            }
89
        }
90
91 1
        return $enqueuedMessagesCount;
92
    }
93
94
    /**
95
     * @param string $topicName
96
     * @return StoredJob[]
97
     */
98 3
    private function getStoredJobsToEnqueue(string $topicName): array
99
    {
100 3
        return $this->jobStore->storedJobsSince(
101 3
            $this->enqueuedStoredJobTrackerStore->lastEnqueuedStoredJobId($topicName)
102
        );
103
    }
104
105
    /**
106
     * @return PsrProducer
107
     */
108 2
    protected function createProducer(): PsrProducer
109
    {
110 2
        $producer = $this->context->createProducer();
111
112 2
        return $producer;
113
    }
114
115
    /**
116
     * @param string $topicName
117
     * @return PsrTopic
118
     */
119 2
    protected function createTopic(string $topicName): PsrTopic
120
    {
121 2
        $topic = $this->context->createTopic($topicName);
122
123 2
        return $topic;
124
    }
125
126
    /**
127
     * @param StoredJob $storedJob
128
     * @return PsrMessage
129
     */
130 2
    protected function createMessage(StoredJob $storedJob): PsrMessage
131
    {
132 2
        $message = $this->context->createMessage($this->storedJobSerializer->serialize($storedJob));
133
134 2
        return $message;
135
    }
136
}
137