1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Shippinno\Job\Application\Messaging; |
4
|
|
|
|
5
|
|
|
use Closure; |
6
|
|
|
use Enqueue\Sqs\SqsMessage; |
7
|
|
|
use Interop\Queue\PsrContext; |
8
|
|
|
use Interop\Queue\PsrMessage; |
9
|
|
|
use Psr\Log\LoggerAwareTrait; |
10
|
|
|
use Psr\Log\LoggerInterface; |
11
|
|
|
use Psr\Log\NullLogger; |
12
|
|
|
use Shippinno\Job\Application\Job\JobRunnerRegistry; |
13
|
|
|
use Shippinno\Job\Domain\Model\AbandonedJobMessage; |
14
|
|
|
use Shippinno\Job\Domain\Model\AbandonedJobMessageStore; |
15
|
|
|
use Shippinno\Job\Domain\Model\JobFailedException; |
16
|
|
|
use Shippinno\Job\Domain\Model\JobRunnerNotRegisteredException; |
17
|
|
|
use Shippinno\Job\Domain\Model\JobSerializer; |
18
|
|
|
use Shippinno\Job\Domain\Model\JobStore; |
19
|
|
|
use Shippinno\Job\Domain\Model\StoredJobSerializer; |
20
|
|
|
|
21
|
|
|
class ConsumeStoredJobService |
22
|
|
|
{ |
23
|
|
|
use LoggerAwareTrait; |
24
|
|
|
|
25
|
|
|
/** |
26
|
|
|
* @var StoredJobSerializer |
27
|
|
|
*/ |
28
|
|
|
private $storedJobSerializer; |
29
|
|
|
|
30
|
|
|
/** |
31
|
|
|
* @var PsrContext |
32
|
|
|
*/ |
33
|
|
|
private $context; |
34
|
|
|
|
35
|
|
|
/** |
36
|
|
|
* @var JobSerializer |
37
|
|
|
*/ |
38
|
|
|
private $jobSerializer; |
39
|
|
|
|
40
|
|
|
/** |
41
|
|
|
* @var JobRunnerRegistry |
42
|
|
|
*/ |
43
|
|
|
private $jobRunnerRegistry; |
44
|
|
|
|
45
|
|
|
/** |
46
|
|
|
* @var JobStore |
47
|
|
|
*/ |
48
|
|
|
private $jobStore; |
49
|
|
|
|
50
|
|
|
/** |
51
|
|
|
* @var AbandonedJobMessageStore |
52
|
|
|
*/ |
53
|
|
|
private $abandonedJobMessageStore; |
54
|
|
|
|
55
|
|
|
/** |
56
|
|
|
* @var JobFlightManager |
57
|
|
|
*/ |
58
|
|
|
private $jobFlightManager; |
59
|
|
|
|
60
|
|
|
/** |
61
|
|
|
* @param PsrContext $context |
62
|
|
|
* @param StoredJobSerializer $storedJobSerializer |
63
|
|
|
* @param JobSerializer $jobSerializer |
64
|
|
|
* @param JobRunnerRegistry $jobRunnerRegistry |
65
|
|
|
* @param JobStore $jobStore |
66
|
|
|
* @param AbandonedJobMessageStore $abandonedJobMessageStore |
67
|
|
|
* @param JobFlightManager|null $jobFlightManager |
68
|
|
|
* @param LoggerInterface|null $logger |
69
|
|
|
*/ |
70
|
1 |
|
public function __construct( |
71
|
|
|
PsrContext $context, |
72
|
|
|
StoredJobSerializer $storedJobSerializer, |
73
|
|
|
JobSerializer $jobSerializer, |
74
|
|
|
JobRunnerRegistry $jobRunnerRegistry, |
75
|
|
|
JobStore $jobStore, |
76
|
|
|
AbandonedJobMessageStore $abandonedJobMessageStore, |
77
|
|
|
JobFlightManager $jobFlightManager = null, |
78
|
|
|
LoggerInterface $logger = null |
79
|
|
|
) { |
80
|
1 |
|
$this->context = $context; |
81
|
1 |
|
$this->storedJobSerializer = $storedJobSerializer; |
82
|
1 |
|
$this->jobSerializer = $jobSerializer; |
83
|
1 |
|
$this->jobRunnerRegistry = $jobRunnerRegistry; |
84
|
1 |
|
$this->jobStore = $jobStore; |
85
|
1 |
|
$this->abandonedJobMessageStore = $abandonedJobMessageStore; |
86
|
1 |
|
$this->jobFlightManager = $jobFlightManager ?: new NullJobFlightManager; |
87
|
1 |
|
$this->setLogger($logger ?: new NullLogger); |
88
|
1 |
|
} |
89
|
|
|
|
90
|
|
|
/** |
91
|
|
|
* @param string $queueName |
92
|
|
|
* @param Closure|null $persist |
93
|
|
|
*/ |
94
|
1 |
|
public function execute(string $queueName, Closure $persist = null, Closure $clear = null): void |
95
|
|
|
{ |
96
|
1 |
|
$consumer = $this->context->createConsumer($this->context->createQueue($queueName)); |
97
|
1 |
|
$message = $consumer->receive(5000); |
98
|
1 |
|
if (null === $message) { |
99
|
1 |
|
return; |
100
|
|
|
} |
101
|
|
|
$storedJob = $this->storedJobSerializer->deserialize($message->getBody()); |
102
|
|
|
if (null === $message->getMessageId()) { |
103
|
|
|
$this->logger->alert('Message without ID. Filling it.', ['message' => $message->getBody()]); |
104
|
|
|
$message->setMessageId($storedJob->id()); |
105
|
|
|
} |
106
|
|
|
$this->jobFlightManager->arrived($message->getMessageId()); |
107
|
|
|
$job = $this->jobSerializer->deserialize($storedJob->body(), $storedJob->name()); |
108
|
|
|
try { |
109
|
|
|
$jobRunner = $this->jobRunnerRegistry->get(get_class($job)); |
110
|
|
|
} catch (JobRunnerNotRegisteredException $e) { |
111
|
|
|
$this->abandonedJobMessageStore->add( |
112
|
|
|
new AbandonedJobMessage($queueName, $message->getBody(), $e->__toString()) |
113
|
|
|
); |
114
|
|
|
$this->logger->alert( |
115
|
|
|
'No JobRunner is registered. Message is abandoned. Rejecting the message.', |
116
|
|
|
['message' => $message->getBody()] |
117
|
|
|
); |
118
|
|
|
$this->jobFlightManager->rejected($message->getMessageId()); |
119
|
|
|
$consumer->reject($message); |
120
|
|
|
return; |
121
|
|
|
} |
122
|
|
|
try { |
123
|
|
|
$jobRunner->run($job); |
124
|
|
View Code Duplication |
if (!is_null($persist) && !$persist()) { |
|
|
|
|
125
|
|
|
$this->logger->info( |
126
|
|
|
'Persistence failed after the job. Requeueing the message.', |
127
|
|
|
['message' => $message->getBody()] |
128
|
|
|
); |
129
|
|
|
$newMessageId = $message->getMessageId() . '+'; |
130
|
|
|
$this->jobFlightManager->requeued($message->getMessageId(), $newMessageId); |
131
|
|
|
$message->setMessageId($newMessageId); |
132
|
|
|
$consumer->reject($message, true); |
133
|
|
|
return; |
134
|
|
|
} |
135
|
|
|
$dependentJobs = $job->dependentJobs(); |
136
|
|
|
if (count($dependentJobs) > 0) { |
137
|
|
|
foreach ($dependentJobs as $dependentJob) { |
138
|
|
|
$this->jobStore->append($dependentJob); |
139
|
|
|
} |
140
|
|
|
} |
141
|
|
|
$this->logger->info('Acknowledging message.', ['message' => $message->getBody()]); |
142
|
|
|
$this->jobFlightManager->acknowledged($message->getMessageId()); |
143
|
|
|
$consumer->acknowledge($message); |
144
|
|
|
} catch (JobFailedException $e) { |
145
|
|
|
if ($job->isExpendable()) { |
146
|
|
|
$this->logger->debug( |
147
|
|
|
'Expendable job failed. Acknowledging and letting it go.', |
148
|
|
|
['message' => $message->getBody()] |
149
|
|
|
); |
150
|
|
|
$this->jobFlightManager->letGo($message->getMessageId()); |
151
|
|
|
$consumer->acknowledge($message); |
152
|
|
|
return; |
153
|
|
|
} |
154
|
|
|
$attempts = $message->getProperty('attempts', 0) + 1; |
155
|
|
|
if ($attempts >= $job->maxAttempts()) { |
156
|
|
|
if (!is_null($clear)) { |
157
|
|
|
$clear(); |
158
|
|
|
} |
159
|
|
|
$this->abandonedJobMessageStore->add( |
160
|
|
|
new AbandonedJobMessage($queueName, $message->getBody(), $e->__toString()) |
161
|
|
|
); |
162
|
|
|
$this->logger->info( |
163
|
|
|
'Rejecting the message reaching the max attempts.', |
164
|
|
|
['message' => $message->getBody()] |
165
|
|
|
); |
166
|
|
|
$this->jobFlightManager->rejected($message->getMessageId()); |
167
|
|
|
$consumer->reject($message); |
168
|
|
View Code Duplication |
if (!is_null($persist) && !$persist()) { |
|
|
|
|
169
|
|
|
$this->logger->info( |
170
|
|
|
'Failed after the job. Requeueing the message.', |
171
|
|
|
['message' => $message->getBody()] |
172
|
|
|
); |
173
|
|
|
$newMessageId = $message->getMessageId() . '+'; |
174
|
|
|
$this->jobFlightManager->requeued($message->getMessageId(), $newMessageId); |
175
|
|
|
$message->setMessageId($newMessageId); |
176
|
|
|
$consumer->reject($message, true); |
177
|
|
|
} |
178
|
|
|
return; |
179
|
|
|
} |
180
|
|
|
$message->setProperty('attempts', $attempts); |
181
|
|
|
if ($job->reattemptDelay() > 0) { |
182
|
|
|
$message = $this->delayMessage($message, $job->reattemptDelay()); |
183
|
|
|
} |
184
|
|
|
if (method_exists($message, 'setMessageDeduplicationId')) { |
185
|
|
|
$message->setMessageDeduplicationId(uniqid()); |
186
|
|
|
} |
187
|
|
|
if (method_exists($message, 'setMessageGroupId')) { |
188
|
|
|
$message->setMessageGroupId(is_null($storedJob->fifoGroupId()) ? uniqid() : $storedJob->fifoGroupId()); |
189
|
|
|
} |
190
|
|
|
$this->logger->info('Requeueing the message.', ['message' => $message->getBody()]); |
191
|
|
|
$newMessageId = $message->getMessageId() . '+'; |
192
|
|
|
$this->jobFlightManager->requeued($message->getMessageId(), $newMessageId); |
193
|
|
|
$message->setMessageId($newMessageId); |
194
|
|
|
$consumer->reject($message, true); |
195
|
|
|
} |
196
|
|
|
} |
197
|
|
|
|
198
|
|
|
/** |
199
|
|
|
* @param PsrMessage $message |
200
|
|
|
* @param int $delay |
201
|
|
|
* @return PsrMessage |
202
|
|
|
*/ |
203
|
|
|
protected function delayMessage(PsrMessage $message, int $delay) |
204
|
|
|
{ |
205
|
|
|
if ($message instanceof SqsMessage) { |
206
|
|
|
$message->setDelaySeconds($delay); |
207
|
|
|
} |
208
|
|
|
|
209
|
|
|
return $message; |
210
|
|
|
} |
211
|
|
|
} |
212
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.