Passed
Push — master ( e4c413...18bb85 )
by Hirofumi
09:56 queued 06:38
created

EnqueueStoredJobsService::createProducer()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 6
ccs 3
cts 3
cp 1
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 0
crap 1
1
<?php
2
3
namespace Shippinno\Job\Application\Messaging;
4
5
use Interop\Queue\PsrContext;
6
use Interop\Queue\PsrMessage;
7
use Interop\Queue\PsrProducer;
8
use Interop\Queue\PsrTopic;
9
use Shippinno\Job\Domain\Model\StoredJob;
10
use Shippinno\Job\Domain\Model\JobStore;
11
use Shippinno\Job\Domain\Model\FailedToEnqueueStoredJobException;
12
use Shippinno\Job\Domain\Model\StoredJobSerializer;
13
use Throwable;
14
15
class EnqueueStoredJobsService
16
{
17
    /**
18
     * @var PsrContext
19
     */
20
    private $context;
21
22
    /**
23
     * @var JobStore
24
     */
25
    protected $jobStore;
26
27
    /**
28
     * @var StoredJobSerializer
29
     */
30
    private $storedJobSerializer;
31
32
    /**
33
     * @var EnqueuedStoredJobTrackerStore
34
     */
35
    protected $enqueuedStoredJobTrackerStore;
36
37
    /**
38
     * @param PsrContext $context
39
     * @param JobStore $jobStore
40
     * @param StoredJobSerializer $storedJobSerializer
41
     * @param EnqueuedStoredJobTrackerStore $enqueuedStoredJobTrackerStore
42
     */
43 3
    public function __construct(
44
        PsrContext $context,
45
        JobStore $jobStore,
46
        StoredJobSerializer $storedJobSerializer,
47
        EnqueuedStoredJobTrackerStore $enqueuedStoredJobTrackerStore
48
    ) {
49 3
        $this->context = $context;
50 3
        $this->jobStore = $jobStore;
51 3
        $this->storedJobSerializer = $storedJobSerializer;
52 3
        $this->enqueuedStoredJobTrackerStore = $enqueuedStoredJobTrackerStore;
53 3
    }
54
55
    /**
56
     * @param string $topicName
57
     * @return int
58
     * @throws FailedToEnqueueStoredJobException
59
     */
60 3
    public function execute(string $topicName): int
61
    {
62 3
        $enqueuedMessagesCount = 0;
63 3
        $lastEnqueuedStoredJob = null;
64 3
        $storedJobsToEnqueue = $this->getStoredJobsToEnqueue($topicName);
65 3
        if (0 === count($storedJobsToEnqueue)) {
66 1
            return $enqueuedMessagesCount;
67
        }
68 2
        $producer = $this->createProducer();
69 2
        $topic = $this->createTopic($topicName);
70
        try {
71 2
            foreach ($storedJobsToEnqueue as $storedJob) {
72 2
                $message = $this->createMessage($storedJob);
73 2
                if (method_exists($message, 'setMessageDeduplicationId')) {
74
                    $message->setMessageDeduplicationId(uniqid());
75
                }
76 2 View Code Duplication
                if (method_exists($message, 'setMessageGroupId')) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
77
                    $message->setMessageGroupId(
78
                        is_null($storedJob->fifoGroupId())
79
                            ? uniqid()
80
                            : $storedJob->fifoGroupId()
81
                    );
82
                }
83 2
                $producer->send($topic, $message);
84 2
                $enqueuedMessagesCount = $enqueuedMessagesCount + 1;
85 2
                $lastEnqueuedStoredJob = $storedJob;
86
            }
87 1
        } catch (Throwable $e) {
88 1
            throw new FailedToEnqueueStoredJobException($enqueuedMessagesCount, $e);
89 1
        } finally {
90 2
            if (null !== $lastEnqueuedStoredJob) {
91 2
                $this->enqueuedStoredJobTrackerStore->trackLastEnqueuedStoredJob($topicName, $lastEnqueuedStoredJob);
92
            }
93
        }
94
95 1
        return $enqueuedMessagesCount;
96
    }
97
98
    /**
99
     * @param string $topicName
100
     * @return StoredJob[]
101
     */
102 3
    private function getStoredJobsToEnqueue(string $topicName): array
103
    {
104 3
        return $this->jobStore->storedJobsSince(
105 3
            $this->enqueuedStoredJobTrackerStore->lastEnqueuedStoredJobId($topicName)
106
        );
107
    }
108
109
    /**
110
     * @return PsrProducer
111
     */
112 2
    protected function createProducer(): PsrProducer
113
    {
114 2
        $producer = $this->context->createProducer();
115
116 2
        return $producer;
117
    }
118
119
    /**
120
     * @param string $topicName
121
     * @return PsrTopic
122
     */
123 2
    protected function createTopic(string $topicName): PsrTopic
124
    {
125 2
        $topic = $this->context->createTopic($topicName);
126
127 2
        return $topic;
128
    }
129
130
    /**
131
     * @param StoredJob $storedJob
132
     * @return PsrMessage
133
     */
134 2
    protected function createMessage(StoredJob $storedJob): PsrMessage
135
    {
136 2
        $message = $this->context->createMessage($this->storedJobSerializer->serialize($storedJob));
137
138 2
        return $message;
139
    }
140
}
141