Completed
Push — master ( e04e5a...e45d07 )
by Tomas
14:43
created

Dispatcher::handleMessage()   B

Complexity

Conditions 7
Paths 26

Size

Total Lines 37

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 18
CRAP Score 7.049

Importance

Changes 0
Metric Value
dl 0
loc 37
ccs 18
cts 20
cp 0.9
rs 8.3946
c 0
b 0
f 0
cc 7
nc 26
nop 2
crap 7.049
1
<?php
2
declare(strict_types=1);
3
4
namespace Tomaj\Hermes;
5
6
use Exception;
7
use Psr\Log\LoggerInterface;
8
use Psr\Log\LogLevel;
9
use Tomaj\Hermes\Handler\HandlerInterface;
10
use Tomaj\Hermes\Driver\DriverInterface;
11
use Tomaj\Hermes\Restart\RestartException;
12
use Tomaj\Hermes\Restart\RestartInterface;
13
use Tracy\Debugger;
14
use DateTime;
15
16
class Dispatcher implements DispatcherInterface
17
{
18
    /**
19
     * Dispatcher driver
20
     *
21
     * @var DriverInterface
22
     */
23
    private $driver;
24
25
    /**
26
     * Logger
27
     *
28
     * @var LoggerInterface
29
     */
30
    private $logger;
31
32
    /**
33
     * Restart
34
     *
35
     * @var RestartInterface
36
     */
37
    private $restart;
38
39
    /**
40
     * All registered handlers
41
     *
42
     * @var HandlerInterface[][]
43
     */
44
    private $handlers = [];
45
46
    /**
47
     * @var DateTime
48
     */
49
    private $startTime;
50
51
    /**
52
     * Create new Dispatcher
53
     *
54
     * @param DriverInterface $driver
55
     * @param LoggerInterface $logger
56
     * @param RestartInterface $restart
57 18
     */
58
    public function __construct(DriverInterface $driver, LoggerInterface $logger = null, RestartInterface $restart = null)
59 18
    {
60 18
        $this->driver = $driver;
61 18
        $this->logger = $logger;
62 18
        $this->restart = $restart;
63 18
        $this->startTime = new DateTime();
64
    }
65
66
    /**
67
     * @deprecated - use Emitter::emit method intead
68
     */
69 View Code Duplication
    public function emit(MessageInterface $message): DispatcherInterface
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
70
    {
71
        $this->driver->send($message);
72
73
        $this->log(
74
            LogLevel::INFO,
75
            "Dispatcher send message #{$message->getId()} to driver " . get_class($this->driver),
76
            $this->messageLoggerContext($message)
77
        );
78
        return $this;
79
    }
80
81
    /**
82
     * Basic method for background job to star listening.
83
     *
84
     * This method hook to driver wait() method and start listening events.
85
     * Method is blocking, so when you call it all processing will stop.
86
     * WARNING! Don't use it on web server calls. Run it only with cli.
87
     *
88
     * @return void
89
     */
90
    public function handle(): void
91
    {
92 18
        try {
93 18
            $this->driver->wait(function (MessageInterface $message) {
94 18
                $this->log(
95 18
                    LogLevel::INFO,
96 18
                    "Start handle message #{$message->getId()} ({$message->getType()})",
97 12
                    $this->messageLoggerContext($message)
98
                );
99 18
100
                $result = $this->dispatch($message);
101 18
102 3
                if ($this->restart && $this->restart->shouldRestart($this->startTime)) {
103
                    throw new RestartException('Restart');
104
                }
105 15
106 18
                return $result;
107 13
            });
108 3
        } catch (RestartException $e) {
109 2
            $this->log(LogLevel::NOTICE, 'Existing hermes dispatcher - restart');
110
        } catch (Exception $exception) {
111
            if (Debugger::isEnabled()) {
112 18
                Debugger::log($exception, Debugger::EXCEPTION);
113
            }
114
        }
115
    }
116
117
    /**
118
     * Dispatch message
119
     *
120
     * @param MessageInterface $message
121 18
     *
122
     * @return bool
123 18
     */
124
    private function dispatch(MessageInterface $message): bool
125 18
    {
126 6
        $type = $message->getType();
127
128
        if (!$this->hasHandlers($type)) {
129 15
            return true;
130
        }
131 15
132 15
        $result = true;
133
134 15
        foreach ($this->handlers[$type] as $handler) {
135 6
            $handlerResult = $this->handleMessage($handler, $message);
136 4
137 10
            if ($result && !$handlerResult) {
138
                $result = false;
139 15
            }
140
        }
141
142
        return $result;
143
    }
144
145
    /**
146
     * Handle given message with given handler
147
     *
148
     * @param HandlerInterface $handler
149
     * @param MessageInterface $message
150 15
     *
151
     * @return bool
152
     */
153 15
    private function handleMessage(HandlerInterface $handler, MessageInterface $message): bool
154
    {
155
        // check if handler implements Psr\Log\LoggerAwareInterface (you can use \Psr\Log\LoggerAwareTrait)
156
        if ($this->logger && method_exists($handler, 'setLogger')) {
157
            $handler->setLogger($this->logger);
0 ignored issues
show
Bug introduced by
The method setLogger() does not seem to exist on object<Tomaj\Hermes\Handler\HandlerInterface>.

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
158 15
        }
159
160 12
        try {
161 12
            $result = $handler->handle($message);
162 12
163 12
            $this->log(
164 8
                LogLevel::INFO,
165 11
                "End handle message #{$message->getId()} ({$message->getType()})",
166 3
                $this->messageLoggerContext($message)
167 3
            );
168 3
        } catch (Exception $e) {
169 3
            $this->log(
170 2
                LogLevel::ERROR,
171 3
                "Handler " . get_class($handler) . " throws exception - {$e->getMessage()}",
172 3
                ['error' => $e, 'message' => $this->messageLoggerContext($message), 'exception' => $e]
173
            );
174 15
            if (Debugger::isEnabled()) {
175
                Debugger::log($e, Debugger::EXCEPTION);
176
            }
177
178
            if (method_exists($handler, 'canRetry')) {
179
                if ($message->getRetries() < $handler->maxRetry()) {
180
                    $executeAt = $this->nextRetry($message);
181
                    $newMessage = new Message($message->getType(), $message->getPayload(), $message->getId(), $message->getCreated(), $executeAt, $message->getRetries() + 1);
182
                    $this->driver->send($newMessage);
183
                }
184 18
            }
185
186 18
            $result = false;
187
        }
188
        return $result;
189
    }
190
191
    /**
192 18
     * Calculate next retry
193
     *
194 18
     * Inspired by ruby sidekiq (https://github.com/mperham/sidekiq/wiki/Error-Handling#automatic-job-retry)
195 18
     *
196 12
     * @param MessageInterface $message
197
     * @return float
198 18
     */
199 18
    private function nextRetry(MessageInterface $message): float
200
    {
201
        return microtime(true) + pow($message->getRetries(), 4) + 15 + (rand(1, 30) * ($message->getRetries() + 1));
202
    }
203
204
    /**
205
     * Check if actual dispatcher has handler for given type
206
     *
207
     * @param string $type
208 18
     *
209
     * @return bool
210
     */
211 18
    private function hasHandlers(string $type): bool
212 18
    {
213 18
        return isset($this->handlers[$type]) && count($this->handlers[$type]) > 0;
214 18
    }
215 12
216
    /**
217
     * {@inheritdoc}
218
     */
219
    public function registerHandler(string $type, HandlerInterface $handler): DispatcherInterface
220
    {
221
        if (!isset($this->handlers[$type])) {
222
            $this->handlers[$type] = [];
223
        }
224
225
        $this->handlers[$type][] = $handler;
226
        return $this;
227 18
    }
228
229 18
    /**
230 3
     * Serialize message to logger context
231 2
     *
232 18
     * @param MessageInterface $message
233
     *
234
     * @return array
235
     */
236 View Code Duplication
    private function messageLoggerContext(MessageInterface $message): array
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
237
    {
238
        return [
239
            'id' => $message->getId(),
240
            'created' => $message->getCreated(),
241
            'type' => $message->getType(),
242
            'payload' => $message->getPayload(),
243
        ];
244
    }
245
246
    /**
247
     * Interal log method wrapper
248
     *
249
     * @param mixed $level
250
     * @param string $message
251
     * @param array $context
252
     *
253
     * @return void
254
     */
255
    private function log($level, string $message, array $context = array()): void
256
    {
257
        if ($this->logger) {
258
            $this->logger->log($level, $message, $context);
259
        }
260
    }
261
}
262