1 | <?php |
||
20 | class EnqueueStoredJobsService |
||
21 | { |
||
22 | use LoggerAwareTrait; |
||
23 | |||
24 | /** |
||
25 | * @var PsrContext |
||
26 | */ |
||
27 | private $context; |
||
28 | |||
29 | /** |
||
30 | * @var JobStore |
||
31 | */ |
||
32 | protected $jobStore; |
||
33 | |||
34 | /** |
||
35 | * @var StoredJobSerializer |
||
36 | */ |
||
37 | private $storedJobSerializer; |
||
38 | |||
39 | /** |
||
40 | * @var JobFlightManager |
||
41 | */ |
||
42 | private $jobFlightManager; |
||
43 | |||
44 | /** |
||
45 | * @param PsrContext $context |
||
46 | * @param JobStore $jobStore |
||
47 | * @param StoredJobSerializer $storedJobSerializer |
||
48 | * @param JobFlightManager|null $jobFlightManager |
||
49 | * @param LoggerInterface|null $logger |
||
50 | */ |
||
51 | 6 | public function __construct( |
|
52 | PsrContext $context, |
||
53 | JobStore $jobStore, |
||
54 | StoredJobSerializer $storedJobSerializer, |
||
55 | JobFlightManager $jobFlightManager = null, |
||
56 | LoggerInterface $logger = null |
||
57 | ) { |
||
58 | 6 | $this->context = $context; |
|
59 | 6 | $this->jobStore = $jobStore; |
|
60 | 6 | $this->storedJobSerializer = $storedJobSerializer; |
|
61 | 6 | $this->jobFlightManager = $jobFlightManager ?: new NullJobFlightManager; |
|
62 | 6 | $this->setLogger($logger ?: new NullLogger); |
|
63 | 6 | } |
|
64 | |||
65 | /** |
||
66 | * @param string $topicName |
||
67 | * @return int |
||
68 | * @throws FailedToEnqueueStoredJobException |
||
69 | */ |
||
70 | 6 | public function execute(string $topicName): int |
|
71 | { |
||
72 | 6 | $enqueuedMessagesCount = 0; |
|
73 | 6 | $preBoardingJobIds = $this->jobFlightManager->undepartedJobFlights($topicName); |
|
74 | 6 | if (0 === count($preBoardingJobIds)) { |
|
75 | 1 | return $enqueuedMessagesCount; |
|
76 | } |
||
77 | 5 | $this->logger->debug('Enqueueing jobs: ' . implode(',', $preBoardingJobIds)); |
|
78 | 5 | $storedJobsToEnqueue = $this->jobStore->storedJobsOfIds($preBoardingJobIds); |
|
79 | 5 | $producer = $this->createProducer(); |
|
80 | 5 | $topic = $this->createTopic($topicName); |
|
81 | try { |
||
82 | 5 | $messages = []; |
|
83 | 5 | foreach ($storedJobsToEnqueue as $storedJob) { |
|
84 | 5 | $this->jobFlightManager->boarding($storedJob->id()); |
|
85 | 5 | $message = $this->createMessage($storedJob); |
|
86 | 5 | $message->setMessageId($storedJob->id()); |
|
87 | 5 | if ($message instanceof SqsMessage) { |
|
88 | 2 | $message->setMessageDeduplicationId(uniqid()); |
|
89 | 2 | $message->setMessageGroupId( |
|
90 | 2 | is_null($storedJob->fifoGroupId()) |
|
91 | 2 | ? uniqid() |
|
92 | 2 | : $storedJob->fifoGroupId() |
|
93 | ); |
||
94 | } |
||
95 | 5 | $messages[] = [ |
|
96 | 5 | 'jobName' => $storedJob->name(), |
|
97 | 5 | 'message' => $message |
|
98 | ]; |
||
99 | } |
||
100 | 4 | if ($producer instanceof SqsProducer) { |
|
101 | 2 | foreach (array_chunk($messages, 10) as $i => $chunk) { |
|
102 | 2 | $enqueuedMessagesCount = $enqueuedMessagesCount + count($chunk); |
|
103 | 2 | $enqueuedMessageIds = $producer->sendAll($topic, array_column($chunk, 'message')); |
|
|
|||
104 | 1 | foreach ($enqueuedMessageIds as $messageId) { |
|
105 | 1 | $this->jobFlightManager->departed($messageId); |
|
106 | } |
||
107 | } |
||
108 | } else { |
||
109 | 2 | foreach ($messages as $i => $message) { |
|
110 | 2 | $producer->send($topic, $message['message']); |
|
111 | 2 | $enqueuedMessagesCount = $enqueuedMessagesCount + 1; |
|
112 | 3 | $this->jobFlightManager->departed($message['message']->getMessageId()); |
|
113 | } |
||
114 | } |
||
115 | 2 | } catch (Throwable $e) { |
|
116 | 2 | throw new FailedToEnqueueStoredJobException($enqueuedMessagesCount, $e); |
|
117 | } |
||
118 | |||
119 | 3 | return $enqueuedMessagesCount; |
|
120 | } |
||
121 | |||
122 | /** |
||
123 | * @return PsrProducer |
||
124 | */ |
||
125 | 5 | protected function createProducer(): PsrProducer |
|
131 | |||
132 | /** |
||
133 | * @param string $topicName |
||
134 | * @return PsrTopic |
||
135 | */ |
||
136 | 5 | protected function createTopic(string $topicName): PsrTopic |
|
142 | |||
143 | /** |
||
144 | * @param StoredJob $storedJob |
||
145 | * @return PsrMessage |
||
146 | */ |
||
147 | 5 | protected function createMessage(StoredJob $storedJob): PsrMessage |
|
153 | } |
||
154 |
This check looks for function or method calls that always return null and whose return value is assigned to a variable.
The method
getObject()
can return nothing but null, so it makes no sense to assign that value to a variable.The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.