Issues (6)

src/ScheduledQueueToDeliveryQueueWorker.php (1 issue)

1
<?php
2
3
namespace Werkspot\MessageQueue;
4
5
use DateTime;
6
use Psr\Log\LoggerInterface;
7
use Psr\Log\NullLogger;
8
use Throwable;
9
use Werkspot\MessageQueue\DeliveryQueue\ProducerInterface;
10
use Werkspot\MessageQueue\Message\MessageInterface;
11
use Werkspot\MessageQueue\ScheduledQueue\ScheduledQueueServiceInterface;
12
13
final class ScheduledQueueToDeliveryQueueWorker implements ScheduledQueueToDeliveryQueueWorkerInterface
14
{
15
    /**
16
     * @var ScheduledQueueServiceInterface
17
     */
18
    private $scheduledMessageService;
19
20
    /**
21
     * @var ProducerInterface
22
     */
23
    private $deliveryQueueMessageProducer;
24
25
    /**
26
     * @var string
27
     */
28
    private $deliveryQueueName;
29
30
    /**
31
     * @var LoggerInterface
32
     */
33
    private $logger;
34
35 5
    public function __construct(
36
        ScheduledQueueServiceInterface $scheduledMessageService,
37
        ProducerInterface $deliveryQueueMessageProducer,
38
        string $deliveryQueueName,
39
        LoggerInterface $logger = null
40
    ) {
41 5
        $this->scheduledMessageService = $scheduledMessageService;
42 5
        $this->deliveryQueueMessageProducer = $deliveryQueueMessageProducer;
43 5
        $this->deliveryQueueName = $deliveryQueueName;
44 5
        $this->logger = $logger ?? new NullLogger();
45 5
    }
46
47
    public function moveMessageBatch(int $batchSize, callable $afterMessageTransfer = null): int
48
    {
49 4
        $afterMessageTransfer = $afterMessageTransfer ?? function () {
50 4
        };
51 4
        $messageList = $this->scheduledMessageService->findScheduledMessageList($batchSize);
52
53 4
        foreach ($messageList as $message) {
54 3
            $this->moveMessage($message);
55 3
            $afterMessageTransfer();
56
        }
57
58 4
        return count($messageList);
59
    }
60
61 3
    private function moveMessage(MessageInterface $message): void
62
    {
63 3
        $this->logger->info('Moving scheduled message to delivery queue: ' . $this->getLogMessage($message));
64
65
        try {
66 3
            $this->sendToDeliveryQueue($message);
67 3
            $this->scheduledMessageService->unscheduleMessage($message);
68 3
            $this->logger->info('Successfully moved scheduled message to delivery queue: ' . $message->getId());
69 1
        } catch (Throwable $error) {
70 1
            $this->logger->error('Error moving scheduled message ' . $message->getId() . ' to delivery queue: ' . $this->getLogErrorMessage($error));
71
        }
72 3
    }
73
74 3 View Code Duplication
    private function getLogMessage(MessageInterface $message): string
0 ignored issues
show
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
75
    {
76 3
        return $message->getId()
77 3
            . ', ' . $message->getDestination()
78 3
            . ', ' . $message->getDeliverAt()->format(DateTime::ATOM)
79 3
            . ', ' . $message->getCreatedAt()->format(DateTime::ATOM)
80 3
            . ', ' . $message->getUpdatedAt()->format(DateTime::ATOM)
81 3
            . ', ' . $message->getTries()
82 3
            . ', ' . $message->getPriority();
83
    }
84
85 1
    private function getLogErrorMessage(Throwable $error): string
86
    {
87
        return 'An error occurred while trying to move the scheduled message to the delivery queue: '
88 1
            . "\n" . $error->getFile()
89 1
            . ', ' . $error->getLine()
90 1
            . ', ' . $error->getMessage();
91
    }
92
93 3
    private function sendToDeliveryQueue(MessageInterface $message): void
94
    {
95 3
        $this->deliveryQueueMessageProducer->send($message, $this->deliveryQueueName);
96 3
    }
97
}
98