Passed
Push — master ( 3f2ce0...6a9240 )
by Hirofumi
02:12
created

EnqueueStoredJobsService::serializer()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 12
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 12
ccs 8
cts 8
cp 1
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 8
nc 2
nop 0
crap 2
1
<?php
2
3
namespace Shippinno\Job\Application\Messaging;
4
5
use Interop\Queue\Exception as QueueException;
6
use Interop\Queue\PsrContext;
7
use Interop\Queue\PsrMessage;
8
use Interop\Queue\PsrProducer;
9
use Interop\Queue\PsrTopic;
10
use Shippinno\Job\Domain\Model\StoredJob;
11
use Shippinno\Job\Domain\Model\JobStore;
12
use Shippinno\Job\Domain\Model\FailedToEnqueueStoredJobException;
13
use Shippinno\Job\Domain\Model\StoredJobSerializer;
14
15
class EnqueueStoredJobsService
16
{
17
    /**
18
     * @var PsrContext
19
     */
20
    private $context;
21
22
    /**
23
     * @var JobStore
24
     */
25
    protected $jobStore;
26
27
    /**
28
     * @var StoredJobSerializer
29
     */
30
    private $storedJobSerializer;
31
32
    /**
33
     * @var EnqueuedStoredJobTrackerStore
34
     */
35
    protected $enqueuedStoredJobTrackerStore;
36
37
    /**
38
     * @param PsrContext $context
39
     * @param JobStore $jobStore
40
     * @param StoredJobSerializer $storedJobSerializer
41
     * @param EnqueuedStoredJobTrackerStore $enqueuedStoredJobTrackerStore
42
     */
43 3
    public function __construct(
44
        PsrContext $context,
45
        JobStore $jobStore,
46
        StoredJobSerializer $storedJobSerializer,
47
        EnqueuedStoredJobTrackerStore $enqueuedStoredJobTrackerStore
48
    ) {
49 3
        $this->context = $context;
50 3
        $this->jobStore = $jobStore;
51 3
        $this->storedJobSerializer = $storedJobSerializer;
52 3
        $this->enqueuedStoredJobTrackerStore = $enqueuedStoredJobTrackerStore;
53 3
    }
54
55
    /**
56
     * @param string $topicName
57
     * @return int
58
     * @throws FailedToEnqueueStoredJobException
59
     */
60 3
    public function execute(string $topicName): int
61
    {
62 3
        $enqueuedMessagesCount = 0;
63 3
        $lastEnqueuedStoredJob = null;
64 3
        $storedJobsToEnqueue = $this->getStoredJobsToEnqueue($topicName);
65 3
        if (0 === count($storedJobsToEnqueue)) {
66 1
            return $enqueuedMessagesCount;
67
        }
68 2
        $producer = $this->createProducer();
69 2
        $topic = $this->createTopic($topicName);
70
        try {
71 2
            foreach ($storedJobsToEnqueue as $storedJob) {
72 2
                $message = $this->createMessage($storedJob);
73 2
                $producer->send($topic, $message);
74 2
                $enqueuedMessagesCount = $enqueuedMessagesCount + 1;
75 2
                $lastEnqueuedStoredJob = $storedJob;
76
            }
77 1
        } catch (QueueException $e) {
78 1
            throw new FailedToEnqueueStoredJobException($e);
79 1
        } finally {
80 2
            if (null !== $lastEnqueuedStoredJob) {
81 2
                $this->enqueuedStoredJobTrackerStore->trackLastEnqueuedStoredJob($topicName, $lastEnqueuedStoredJob);
82
            }
83
        }
84
85 1
        return $enqueuedMessagesCount;
86
    }
87
88
    /**
89
     * @param string $topicName
90
     * @return StoredJob[]
91
     */
92 3
    private function getStoredJobsToEnqueue(string $topicName): array
93
    {
94 3
        return $this->jobStore->storedJobsSince(
95 3
            $this->enqueuedStoredJobTrackerStore->lastEnqueuedStoredJobId($topicName)
96
        );
97
    }
98
99
    /**
100
     * @return PsrProducer
101
     */
102 2
    protected function createProducer(): PsrProducer
103
    {
104 2
        $producer = $this->context->createProducer();
105
106 2
        return $producer;
107
    }
108
109
    /**
110
     * @param string $topicName
111
     * @return PsrTopic
112
     */
113 2
    protected function createTopic(string $topicName): PsrTopic
114
    {
115 2
        $topic = $this->context->createTopic($topicName);
116
117 2
        return $topic;
118
    }
119
120
    /**
121
     * @param StoredJob $storedJob
122
     * @return PsrMessage
123
     */
124 2
    protected function createMessage(StoredJob $storedJob): PsrMessage
125
    {
126 2
        $message = $this->context->createMessage($this->storedJobSerializer->serialize($storedJob));
127
128 2
        return $message;
129
    }
130
}
131