1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Werkspot\MessageQueue; |
4
|
|
|
|
5
|
|
|
use DateTime; |
6
|
|
|
use Psr\Log\LoggerInterface; |
7
|
|
|
use Psr\Log\NullLogger; |
8
|
|
|
use Throwable; |
9
|
|
|
use Werkspot\MessageQueue\DeliveryQueue\ProducerInterface; |
10
|
|
|
use Werkspot\MessageQueue\Message\MessageInterface; |
11
|
|
|
use Werkspot\MessageQueue\ScheduledQueue\ScheduledQueueServiceInterface; |
12
|
|
|
|
13
|
|
|
final class ScheduledQueueToDeliveryQueueWorker implements ScheduledQueueToDeliveryQueueWorkerInterface |
14
|
|
|
{ |
15
|
|
|
/** |
16
|
|
|
* @var ScheduledQueueServiceInterface |
17
|
|
|
*/ |
18
|
|
|
private $scheduledMessageService; |
19
|
|
|
|
20
|
|
|
/** |
21
|
|
|
* @var ProducerInterface |
22
|
|
|
*/ |
23
|
|
|
private $deliveryQueueMessageProducer; |
24
|
|
|
|
25
|
|
|
/** |
26
|
|
|
* @var string |
27
|
|
|
*/ |
28
|
|
|
private $deliveryQueueName; |
29
|
|
|
|
30
|
|
|
/** |
31
|
|
|
* @var LoggerInterface |
32
|
|
|
*/ |
33
|
|
|
private $logger; |
34
|
|
|
|
35
|
5 |
|
public function __construct( |
36
|
|
|
ScheduledQueueServiceInterface $scheduledMessageService, |
37
|
|
|
ProducerInterface $deliveryQueueMessageProducer, |
38
|
|
|
string $deliveryQueueName, |
39
|
|
|
LoggerInterface $logger = null |
40
|
|
|
) { |
41
|
5 |
|
$this->scheduledMessageService = $scheduledMessageService; |
42
|
5 |
|
$this->deliveryQueueMessageProducer = $deliveryQueueMessageProducer; |
43
|
5 |
|
$this->deliveryQueueName = $deliveryQueueName; |
44
|
5 |
|
$this->logger = $logger ?? new NullLogger(); |
45
|
5 |
|
} |
46
|
|
|
|
47
|
|
|
public function moveMessageBatch(int $batchSize, callable $afterMessageTransfer = null): int |
48
|
|
|
{ |
49
|
4 |
|
$afterMessageTransfer = $afterMessageTransfer ?? function () { |
50
|
4 |
|
}; |
51
|
4 |
|
$messageList = $this->scheduledMessageService->findScheduledMessageList($batchSize); |
52
|
|
|
|
53
|
4 |
|
foreach ($messageList as $message) { |
54
|
3 |
|
$this->moveMessage($message); |
55
|
3 |
|
$afterMessageTransfer(); |
56
|
|
|
} |
57
|
|
|
|
58
|
4 |
|
return count($messageList); |
59
|
|
|
} |
60
|
|
|
|
61
|
3 |
|
private function moveMessage(MessageInterface $message): void |
62
|
|
|
{ |
63
|
3 |
|
$this->logger->info('Moving scheduled message to delivery queue: ' . $this->getLogMessage($message)); |
64
|
|
|
|
65
|
|
|
try { |
66
|
3 |
|
$this->sendToDeliveryQueue($message); |
67
|
3 |
|
$this->scheduledMessageService->unscheduleMessage($message); |
68
|
3 |
|
$this->logger->info('Successfully moved scheduled message to delivery queue: ' . $message->getId()); |
69
|
1 |
|
} catch (Throwable $error) { |
70
|
1 |
|
$this->logger->error('Error moving scheduled message ' . $message->getId() . ' to delivery queue: ' . $this->getLogErrorMessage($error)); |
71
|
|
|
} |
72
|
3 |
|
} |
73
|
|
|
|
74
|
3 |
View Code Duplication |
private function getLogMessage(MessageInterface $message): string |
|
|
|
|
75
|
|
|
{ |
76
|
3 |
|
return $message->getId() |
77
|
3 |
|
. ', ' . $message->getDestination() |
78
|
3 |
|
. ', ' . $message->getDeliverAt()->format(DateTime::ATOM) |
79
|
3 |
|
. ', ' . $message->getCreatedAt()->format(DateTime::ATOM) |
80
|
3 |
|
. ', ' . $message->getUpdatedAt()->format(DateTime::ATOM) |
81
|
3 |
|
. ', ' . $message->getTries() |
82
|
3 |
|
. ', ' . $message->getPriority(); |
83
|
|
|
} |
84
|
|
|
|
85
|
1 |
|
private function getLogErrorMessage(Throwable $error): string |
86
|
|
|
{ |
87
|
|
|
return 'An error occurred while trying to move the scheduled message to the delivery queue: ' |
88
|
1 |
|
. "\n" . $error->getFile() |
89
|
1 |
|
. ', ' . $error->getLine() |
90
|
1 |
|
. ', ' . $error->getMessage(); |
91
|
|
|
} |
92
|
|
|
|
93
|
3 |
|
private function sendToDeliveryQueue(MessageInterface $message): void |
94
|
|
|
{ |
95
|
3 |
|
$this->deliveryQueueMessageProducer->send($message, $this->deliveryQueueName); |
96
|
3 |
|
} |
97
|
|
|
} |
98
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.