Test Failed
Pull Request — master (#7)
by Herberto
08:05 queued 03:58
created

MessageHandler::getPayloadType()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 11
Code Lines 5

Duplication

Lines 11
Ratio 100 %

Importance

Changes 0
Metric Value
dl 11
loc 11
c 0
b 0
f 0
rs 9.4285
cc 3
eloc 5
nc 3
nop 1
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Werkspot\MessageQueue\DeliveryQueue;
6
7
use DateTime;
8
use ErrorException;
9
use Exception;
10
use Psr\Log\LoggerInterface;
11
use Psr\Log\NullLogger;
12
use Throwable;
13
use Werkspot\MessageQueue\Message\MessageInterface;
14
15
final class MessageHandler implements MessageHandlerInterface
16
{
17
    /**
18
     * @var MessageDeliveryServiceInterface
19
     */
20
    private $messageDeliveryService;
21
22
    /**
23
     * @var PersistenceClientInterface
24
     */
25
    private $persistenceClient;
26
27
    /**
28
     * @var LoggerInterface
29
     */
30
    private $logger;
31
32
    /**
33
     * @var MessageInterface
34
     */
35
    private $message;
36
37
    public function __construct(
38
        MessageDeliveryServiceInterface $messageDeliveryService,
39
        PersistenceClientInterface $persistenceClient,
40
        LoggerInterface $logger = null
41
    ) {
42
        $this->messageDeliveryService = $messageDeliveryService;
43
        $this->persistenceClient = $persistenceClient;
44
        $this->logger = $logger ?? new NullLogger();
45
    }
46
47
    public function handle(MessageInterface $message): void
48
    {
49
        // There can be an edge case where we get a fatal/exception, and we go out of this flow and keep handlingMessage
50
        // on true, but in that case the flow is broken anyway, as handle() should normally handle all cases sanely.
51
        // If we're in the shutdown handler, the script is being killed anyway. So we don't care anymore.
52
        $this->initiateHandling($message);
53
54
        try {
55
            $this->process();
56
        } catch (Throwable $e) {
57
            $this->onError($e);
58
        } finally {
59
            $this->terminateHandling();
60
            // We clear the persistenceClient (entity manager), to ensure each message is handled without stale entities.
61
            // Because the workers run for a long time, and we have multiple workers running, we need to prevent
62
            // having old entities cached in memory. When other workers already updated the entity to a new state.
63
            // Otherwise we would use the old, out-dated state, and overwrite and stuff happened after it in the other
64
            // workers.
65
            $this->persistenceClient->reset();
66
        }
67
    }
68
69
    public function isHandlingMessage(): bool
70
    {
71
        return $this->message !== null;
72
    }
73
74
    public function shutdown(): void
75
    {
76
        $error = error_get_last();
77
78
        if ($error !== null && $this->isHandlingMessage()) {
79
            $this->onError(new ErrorException($error['message'], 0, E_ERROR, $error['file'], $error['line']));
80
        }
81
    }
82
83
    public function onError(Throwable $error): void
84
    {
85
        // If we have a fatal error, we were not able to rollback the transaction, so we will need to do it here,
86
        // otherwise we cannot queue the command below, as it will be in a transaction that will never be committed
87
        $this->persistenceClient->rollbackTransaction();
88
89
        if (!is_subclass_of($this->message, MessageInterface::class)) {
90
            // This happens when we have a serialization problem in the queue for whatever reason (e.g. a class
91
            // was moved, or renamed). In that case it's not a Command, but an PHP_Incomplete_Class.
92
            // If we'd call handleFailure() for an Incomplete_Class we'd get a fatal error, this is not something we
93
            // want, because the worker will die. And although supervisor will restart it again, it's not good,
94
            // as supervisor will only restart it for an x amount of tries, before giving up, and we end up with
95
            // a queue that is stuck.
96
            // So in this case we can't log the error with the current logic. It's on the todo to just do an update
97
            // in the database directly here, e.g. 'update QueuedCommand set error = ... where id = .. ' but we are
98
            // working on the project restructure with a hard deadline and want to prioritize that above this.
99
            // At least the fix is there, just not the logging, which we'll do later.
100
            $logMessage = sprintf(
101
                'Cannot deliver queued message because it is not a MessageInterface %s: %s',
102
                $this->message->getId(),
103
                $error->getMessage()
104
            );
105
106
            $this->logger->error($logMessage);
107
            $this->message->fail($error);
108
            $this->persistenceClient->persistUndeliverableMessage($this->message, $logMessage);
109
        } else {
110
            $this->message->fail($error);
111
112
            try {
113
                $this->persistenceClient->persist($this->message);
114
            } catch (Exception $exception) {
115
                // It can happen that we have some problem with persisting most likely a uniqid constraint because of a
116
                // race condition or if we processed a message twice. In that case persist it in the
117
                // UnRequeueableMessage table so you can do monitoring on this and fix it.
118
                $this->persistenceClient->persistUndeliverableMessage($this->message, $exception->getMessage());
119
120
                throw $exception;
121
            }
122
        }
123
    }
124
125
    private function process(): void
126
    {
127
        $this->logger->info('Delivering message: ' . $this->getLogMessage($this->message));
128
129
        $this->messageDeliveryService->deliver($this->message);
130
    }
131
132
    private function getLogMessage(MessageInterface $message): string
133
    {
134
        return $message->getId()
135
            . ', ' . $message->getDestination()
136
            . ', ' . $message->getDeliverAt()->format(DateTime::ATOM)
137
            . ', ' . $message->getCreatedAt()->format(DateTime::ATOM)
138
            . ', ' . $message->getUpdatedAt()->format(DateTime::ATOM)
139
            . ', ' . $message->getTries()
140
            . ', ' . $message->getPriority()
141
            . ', ' . $this->getPayloadDescription($message);
142
    }
143
144
    private function getPayloadDescription(MessageInterface $message): string
145
    {
146
        if (is_object($message->getPayload())) {
147
            return get_class($message->getPayload());
148
        }
149
150
        if (is_array($message->getPayload())) {
151
            return 'array';
152
        }
153
154
        return (string) $message->getPayload();
155
    }
156
157
    private function initiateHandling(MessageInterface $message): void
158
    {
159
        $this->message = $message;
160
    }
161
162
    private function terminateHandling(): void
163
    {
164
        $this->message = null;
165
    }
166
}
167