Test Failed
Push — master ( 2c619e...495924 )
by Hirofumi
03:41
created

EnqueueStoredJobsService::createProducer()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 6
ccs 3
cts 3
cp 1
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 0
crap 1
1
<?php
2
3
namespace Shippinno\Job\Application\Messaging;
4
5
use Enqueue\Sqs\SqsMessage;
6
use Enqueue\Sqs\SqsProducer;
7
use Interop\Queue\PsrContext;
8
use Interop\Queue\PsrMessage;
9
use Interop\Queue\PsrProducer;
10
use Interop\Queue\PsrTopic;
11
use Psr\Log\LoggerAwareTrait;
12
use Psr\Log\LoggerInterface;
13
use Psr\Log\NullLogger;
14
use Shippinno\Job\Domain\Model\StoredJob;
15
use Shippinno\Job\Domain\Model\JobStore;
16
use Shippinno\Job\Domain\Model\FailedToEnqueueStoredJobException;
17
use Shippinno\Job\Domain\Model\StoredJobSerializer;
18
use Throwable;
19
20
class EnqueueStoredJobsService
21
{
22
    use LoggerAwareTrait;
23
24
    /**
25
     * @var PsrContext
26
     */
27
    private $context;
28
29
    /**
30
     * @var JobStore
31
     */
32
    protected $jobStore;
33
34
    /**
35
     * @var StoredJobSerializer
36
     */
37
    private $storedJobSerializer;
38
39
    /**
40
     * @var JobFlightManager
41
     */
42
    private $jobFlightManager;
43
44
    /**
45
     * @param PsrContext $context
46
     * @param JobStore $jobStore
47
     * @param StoredJobSerializer $storedJobSerializer
48
     * @param JobFlightManager|null $jobFlightManager
49
     * @param LoggerInterface|null $logger
50
     */
51 6
    public function __construct(
52
        PsrContext $context,
53
        JobStore $jobStore,
54
        StoredJobSerializer $storedJobSerializer,
55
        JobFlightManager $jobFlightManager = null,
56
        LoggerInterface $logger = null
57
    ) {
58 6
        $this->context = $context;
59 6
        $this->jobStore = $jobStore;
60 6
        $this->storedJobSerializer = $storedJobSerializer;
61 6
        $this->jobFlightManager = $jobFlightManager ?: new NullJobFlightManager;
62 6
        $this->setLogger($logger ?: new NullLogger);
63 6
    }
64
65
    /**
66
     * @param string $topicName
67
     * @return int
68
     * @throws FailedToEnqueueStoredJobException
69
     */
70 6
    public function execute(string $topicName): int
71
    {
72 6
        $enqueuedMessagesCount = 0;
73 6
        $preBoardingJobIds = $this->jobFlightManager->undepartedJobFlights($topicName);
74 6
        if (0 === count($preBoardingJobIds)) {
75 1
            return $enqueuedMessagesCount;
76
        }
77 5
        $this->logger->debug('Enqueueing jobs: ' . implode(',', $preBoardingJobIds));
78 5
        $storedJobsToEnqueue = $this->jobStore->storedJobsOfIds($preBoardingJobIds);
79 5
        $producer = $this->createProducer();
80 5
        $topic = $this->createTopic($topicName);
81
        try {
82 5
            $messages = [];
83 5
            foreach ($storedJobsToEnqueue as $storedJob) {
84 5
                $this->jobFlightManager->boarding($storedJob->id());
85 5
                $message = $this->createMessage($storedJob);
86 5
                $message->setMessageId($storedJob->id());
87 5
                if ($message instanceof SqsMessage) {
88 2
                    $message->setMessageDeduplicationId(uniqid());
89 2
                    $message->setMessageGroupId(
90 2
                        is_null($storedJob->fifoGroupId())
91 2
                            ? uniqid()
92 2
                            : $storedJob->fifoGroupId()
93
                    );
94
                }
95 5
                $messages[] = [
96 5
                    'jobName' => $storedJob->name(),
97 5
                    'message' => $message
98
                ];
99
            }
100 4
            if ($producer instanceof SqsProducer) {
101 2
                foreach (array_chunk($messages, 10) as $i => $chunk) {
102 2
                    $enqueuedMessagesCount = $enqueuedMessagesCount + count($chunk);
103 2
                    $enqueuedMessageIds = $producer->sendAll($topic, array_column($chunk, 'message'));
0 ignored issues
show
Bug introduced by
Are you sure the assignment to $enqueuedMessageIds is correct as $producer->sendAll($topi...umn($chunk, 'message')) (which targets Enqueue\Sqs\SqsProducer::sendAll()) seems to always return null.

This check looks for function or method calls that always return null and whose return value is assigned to a variable.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
$object = $a->getObject();

The method getObject() can return nothing but null, so it makes no sense to assign that value to a variable.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
104 1
                    foreach ($enqueuedMessageIds as $messageId) {
0 ignored issues
show
Bug introduced by
The expression $enqueuedMessageIds of type null is not traversable.
Loading history...
105 1
                        $this->jobFlightManager->departed($messageId);
106
                    }
107
                }
108
            } else {
109 2
                foreach ($messages as $i => $message) {
110 2
                    $producer->send($topic, $message['message']);
111 2
                    $enqueuedMessagesCount = $enqueuedMessagesCount + 1;
112 3
                    $this->jobFlightManager->departed($message['message']->getMessageId());
113
                }
114
            }
115 2
        } catch (Throwable $e) {
116 2
            throw new FailedToEnqueueStoredJobException($enqueuedMessagesCount, $e);
117
        }
118
119 3
        return $enqueuedMessagesCount;
120
    }
121
122
    /**
123
     * @return PsrProducer
124
     */
125 5
    protected function createProducer(): PsrProducer
126
    {
127 5
        $producer = $this->context->createProducer();
128
129 5
        return $producer;
130
    }
131
132
    /**
133
     * @param string $topicName
134
     * @return PsrTopic
135
     */
136 5
    protected function createTopic(string $topicName): PsrTopic
137
    {
138 5
        $topic = $this->context->createTopic($topicName);
139
140 5
        return $topic;
141
    }
142
143
    /**
144
     * @param StoredJob $storedJob
145
     * @return PsrMessage
146
     */
147 5
    protected function createMessage(StoredJob $storedJob): PsrMessage
148
    {
149 5
        $message = $this->context->createMessage($this->storedJobSerializer->serialize($storedJob));
150
151 5
        return $message;
152
    }
153
}
154