MessageHandler::getPayloadDescription()   A
last analyzed

Complexity

Conditions 3
Paths 3

Size

Total Lines 11
Code Lines 5

Duplication

Lines 11
Ratio 100 %

Code Coverage

Tests 4
CRAP Score 3.3332

Importance

Changes 0
Metric Value
dl 11
loc 11
ccs 4
cts 6
cp 0.6667
rs 9.4285
c 0
b 0
f 0
cc 3
eloc 5
nc 3
nop 1
crap 3.3332
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Werkspot\MessageQueue\DeliveryQueue;
6
7
use DateTime;
8
use ErrorException;
9
use Exception;
10
use Psr\Log\LoggerInterface;
11
use Psr\Log\NullLogger;
12
use Throwable;
13
use Werkspot\MessageQueue\Message\MessageInterface;
14
15
final class MessageHandler implements MessageHandlerInterface
16
{
17
    /**
18
     * @var MessageDeliveryServiceInterface
19
     */
20
    private $messageDeliveryService;
21
22
    /**
23
     * @var PersistenceClientInterface
24
     */
25
    private $persistenceClient;
26
27
    /**
28
     * @var LoggerInterface
29
     */
30
    private $logger;
31
32
    /**
33
     * @var MessageInterface
34
     */
35
    private $message;
36
37 4
    public function __construct(
38
        MessageDeliveryServiceInterface $messageDeliveryService,
39
        PersistenceClientInterface $persistenceClient,
40
        LoggerInterface $logger = null
41
    ) {
42 4
        $this->messageDeliveryService = $messageDeliveryService;
43 4
        $this->persistenceClient = $persistenceClient;
44 4
        $this->logger = $logger ?? new NullLogger();
45 4
    }
46
47 4
    public function handle(MessageInterface $message): void
48
    {
49
        // There can be an edge case where we get a fatal/exception, and we go out of this flow and keep handlingMessage
50
        // on true, but in that case the flow is broken anyway, as handle() should normally handle all cases sanely.
51
        // If we're in the shutdown handler, the script is being killed anyway. So we don't care anymore.
52 4
        $this->initiateHandling($message);
53
54
        try {
55 4
            $this->process();
56 3
        } catch (Throwable $e) {
57 3
            $this->onError($e);
58 2
        } finally {
59 4
            $this->terminateHandling();
60
            // We clear the persistenceClient (entity manager), to ensure each message is handled without stale entities.
61
            // Because the workers run for a long time, and we have multiple workers running, we need to prevent
62
            // having old entities cached in memory. When other workers already updated the entity to a new state.
63
            // Otherwise we would use the old, out-dated state, and overwrite and stuff happened after it in the other
64
            // workers.
65 4
            $this->persistenceClient->reset();
66
        }
67 2
    }
68
69 2
    public function isHandlingMessage(): bool
70
    {
71 2
        return $this->message !== null;
72
    }
73
74
    public function shutdown(): void
75
    {
76
        $error = error_get_last();
77
78
        if ($error !== null && $this->isHandlingMessage()) {
79
            $this->onError(new ErrorException($error['message'], 0, E_ERROR, $error['file'], $error['line']));
80
        }
81
    }
82
83 3
    public function onError(Throwable $error): void
84
    {
85
        // If we have a fatal error, we were not able to rollback the transaction, so we will need to do it here,
86
        // otherwise we cannot queue the command below, as it will be in a transaction that will never be committed
87 3
        $this->persistenceClient->rollbackTransaction();
88
89 3
        if (!is_subclass_of($this->message, MessageInterface::class)) {
90
            // This happens when we have a serialization problem in the queue for whatever reason (e.g. a class
91
            // was moved, or renamed). In that case it's not a Command, but an PHP_Incomplete_Class.
92
            // If we'd call handleFailure() for an Incomplete_Class we'd get a fatal error, this is not something we
93
            // want, because the worker will die. And although supervisor will restart it again, it's not good,
94
            // as supervisor will only restart it for an x amount of tries, before giving up, and we end up with
95
            // a queue that is stuck.
96
            // So in this case we can't log the error with the current logic. It's on the todo to just do an update
97
            // in the database directly here, e.g. 'update QueuedCommand set error = ... where id = .. ' but we are
98
            // working on the project restructure with a hard deadline and want to prioritize that above this.
99
            // At least the fix is there, just not the logging, which we'll do later.
100
            $logMessage = sprintf(
101
                'Cannot deliver queued message because it is not a MessageInterface %s: %s',
102
                $this->message->getId(),
103
                $error->getMessage()
104
            );
105
106
            $this->logger->error($logMessage);
107
            $this->message->fail($error);
108
            $this->persistenceClient->persistUndeliverableMessage($this->message, $logMessage);
109
        } else {
110 3
            $this->message->fail($error);
111
112
            try {
113 3
                $this->persistenceClient->persist($this->message);
114 2
            } catch (Exception $exception) {
115
                // It can happen that we have some problem with persisting most likely a uniqid constraint because of a
116
                // race condition or if we processed a message twice. In that case persist it in the
117
                // UnRequeueableMessage table so you can do monitoring on this and fix it.
118 2
                $this->persistenceClient->persistUndeliverableMessage($this->message, $exception->getMessage());
119
120 1
                throw $exception;
121
            }
122
        }
123 1
    }
124
125 4
    private function process(): void
126
    {
127 4
        $this->logger->info('Delivering message: ' . $this->getLogMessage($this->message));
128
129 4
        $this->messageDeliveryService->deliver($this->message);
130 1
    }
131
132 4 View Code Duplication
    private function getLogMessage(MessageInterface $message): string
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
133
    {
134 4
        return $message->getId()
135 4
            . ', ' . $message->getDestination()
136 4
            . ', ' . $message->getDeliverAt()->format(DateTime::ATOM)
137 4
            . ', ' . $message->getCreatedAt()->format(DateTime::ATOM)
138 4
            . ', ' . $message->getUpdatedAt()->format(DateTime::ATOM)
139 4
            . ', ' . $message->getTries()
140 4
            . ', ' . $message->getPriority()
141 4
            . ', ' . $this->getPayloadDescription($message);
142
    }
143
144 4 View Code Duplication
    private function getPayloadDescription(MessageInterface $message): string
145
    {
146 4
        if (is_object($message->getPayload())) {
147
            return get_class($message->getPayload());
148
        }
149
150 4
        if (is_array($message->getPayload())) {
151
            return 'array';
152
        }
153
154 4
        return (string) $message->getPayload();
155
    }
156
157 4
    private function initiateHandling(MessageInterface $message): void
158
    {
159 4
        $this->message = $message;
160 4
    }
161
162 4
    private function terminateHandling(): void
163
    {
164 4
        $this->message = null;
165 4
    }
166
}
167