Completed
Push — master ( 1f2f1a...d37772 )
by Tomas
11:35
created

Dispatcher::emit()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 11

Duplication

Lines 11
Ratio 100 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 11
loc 11
ccs 0
cts 0
cp 0
rs 9.9
c 0
b 0
f 0
cc 1
nc 1
nop 1
crap 2
1
<?php
2
declare(strict_types=1);
3
4
namespace Tomaj\Hermes;
5
6
use DateTime;
7
use Exception;
8
use Psr\Log\LogLevel;
9
use Psr\Log\LoggerInterface;
10
use Tomaj\Hermes\Driver\DriverInterface;
11
use Tomaj\Hermes\Driver\RestartTrait;
12
use Tomaj\Hermes\Handler\HandlerInterface;
13
use Tomaj\Hermes\Restart\RestartException;
14
use Tomaj\Hermes\Restart\RestartInterface;
15
use Tracy\Debugger;
16
17
class Dispatcher implements DispatcherInterface
18
{
19
    /**
20
     * Dispatcher driver
21
     *
22
     * @var DriverInterface
23
     */
24
    private $driver;
25
26
    /**
27
     * Logger
28
     *
29
     * @var LoggerInterface
30
     */
31
    private $logger;
32
33
    /**
34
     * Restart
35
     *
36
     * @var RestartInterface
37
     */
38
    private $restart;
39
40
    /**
41
     * All registered handlers
42
     *
43
     * @var HandlerInterface[][]
44
     */
45
    private $handlers = [];
46
47
    /**
48
     * @var DateTime
49
     */
50
    private $startTime;
51
52
    /**
53
     * Create new Dispatcher
54
     *
55
     * @param DriverInterface $driver
56
     * @param LoggerInterface|null $logger
57 18
     * @param RestartInterface|null $restart
58
     */
59 18
    public function __construct(DriverInterface $driver, LoggerInterface $logger = null, RestartInterface $restart = null)
60 18
    {
61 18
        $this->driver = $driver;
62 18
        $this->logger = $logger;
63 18
        $this->restart = $restart;
64
        $this->startTime = new DateTime();
65
66
67
        // check if driver use RestartTrait
68
        if ($restart && method_exists($this->driver, 'setRestart')) {
69
            $this->driver->setRestart($restart);
0 ignored issues
show
Bug introduced by
It seems like you code against a concrete implementation and not the interface Tomaj\Hermes\Driver\DriverInterface as the method setRestart() does only exist in the following implementations of said interface: Tomaj\Hermes\Driver\AmazonSqsDriver, Tomaj\Hermes\Driver\LazyRabbitMqDriver, Tomaj\Hermes\Driver\RedisSetDriver.

Let’s take a look at an example:

interface User
{
    /** @return string */
    public function getPassword();
}

class MyUser implements User
{
    public function getPassword()
    {
        // return something
    }

    public function getDisplayName()
    {
        // return some name.
    }
}

class AuthSystem
{
    public function authenticate(User $user)
    {
        $this->logger->info(sprintf('Authenticating %s.', $user->getDisplayName()));
        // do something.
    }
}

In the above example, the authenticate() method works fine as long as you just pass instances of MyUser. However, if you now also want to pass a different implementation of User which does not have a getDisplayName() method, the code will break.

Available Fixes

  1. Change the type-hint for the parameter:

    class AuthSystem
    {
        public function authenticate(MyUser $user) { /* ... */ }
    }
    
  2. Add an additional type-check:

    class AuthSystem
    {
        public function authenticate(User $user)
        {
            if ($user instanceof MyUser) {
                $this->logger->info(/** ... */);
            }
    
            // or alternatively
            if ( ! $user instanceof MyUser) {
                throw new \LogicException(
                    '$user must be an instance of MyUser, '
                   .'other instances are not supported.'
                );
            }
    
        }
    }
    
Note: PHP Analyzer uses reverse abstract interpretation to narrow down the types inside the if block in such a case.
  1. Add the method to the interface:

    interface User
    {
        /** @return string */
        public function getPassword();
    
        /** @return string */
        public function getDisplayName();
    }
    
Loading history...
70
        }
71
    }
72
73
    /**
74
     * @param MessageInterface $message
75
     * @return DispatcherInterface
76
     * @deprecated - use Emitter::emit method instead
77
     */
78 View Code Duplication
    public function emit(MessageInterface $message): DispatcherInterface
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
79
    {
80
        $this->driver->send($message);
81
82
        $this->log(
83
            LogLevel::INFO,
84
            "Dispatcher send message #{$message->getId()} to driver " . get_class($this->driver),
85
            $this->messageLoggerContext($message)
86
        );
87
        return $this;
88
    }
89
90
    /**
91
     * Basic method for background job to star listening.
92 18
     *
93 18
     * This method hook to driver wait() method and start listening events.
94 18
     * Method is blocking, so when you call it all processing will stop.
95 18
     * WARNING! Don't use it on web server calls. Run it only with cli.
96 18
     *
97 12
     * @return void
98
     */
99 18
    public function handle(): void
100
    {
101 18
        try {
102 3
            $this->driver->wait(function (MessageInterface $message) {
103
                $this->log(
104
                    LogLevel::INFO,
105 15
                    "Start handle message #{$message->getId()} ({$message->getType()})",
106 18
                    $this->messageLoggerContext($message)
107 13
                );
108 3
109 2
                $result = $this->dispatch($message);
110
111
                if ($this->restart !== null && $this->restart->shouldRestart($this->startTime)) {
112 18
                    throw new RestartException('Restart');
113
                }
114
115
                return $result;
116
            });
117
        } catch (RestartException $e) {
118
            $this->log(LogLevel::NOTICE, 'Exiting hermes dispatcher - restart');
119
        } catch (Exception $exception) {
120
            if (Debugger::isEnabled()) {
121 18
                Debugger::log($exception, Debugger::EXCEPTION);
122
            }
123 18
        }
124
    }
125 18
126 6
    /**
127
     * Dispatch message
128
     *
129 15
     * @param MessageInterface $message
130
     *
131 15
     * @return bool
132 15
     */
133
    private function dispatch(MessageInterface $message): bool
134 15
    {
135 6
        $type = $message->getType();
136 4
137 10
        if (!$this->hasHandlers($type)) {
138
            return true;
139 15
        }
140
141
        $result = true;
142
143
        foreach ($this->handlers[$type] as $handler) {
144
            $handlerResult = $this->handleMessage($handler, $message);
145
146
            if ($result && !$handlerResult) {
147
                $result = false;
148
            }
149
        }
150 15
151
        return $result;
152
    }
153 15
154
    /**
155
     * Handle given message with given handler
156
     *
157
     * @param HandlerInterface $handler
158 15
     * @param MessageInterface $message
159
     *
160 12
     * @return bool
161 12
     */
162 12
    private function handleMessage(HandlerInterface $handler, MessageInterface $message): bool
163 12
    {
164 8
        // check if handler implements Psr\Log\LoggerAwareInterface (you can use \Psr\Log\LoggerAwareTrait)
165 11
        if ($this->logger !== null && method_exists($handler, 'setLogger')) {
166 3
            $handler->setLogger($this->logger);
0 ignored issues
show
Bug introduced by
The method setLogger() does not seem to exist on object<Tomaj\Hermes\Handler\HandlerInterface>.

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
167 3
        }
168 3
169 3
        try {
170 2
            $result = $handler->handle($message);
171 3
172 3
            $this->log(
173
                LogLevel::INFO,
174 15
                "End handle message #{$message->getId()} ({$message->getType()})",
175
                $this->messageLoggerContext($message)
176
            );
177
        } catch (Exception $e) {
178
            $this->log(
179
                LogLevel::ERROR,
180
                "Handler " . get_class($handler) . " throws exception - {$e->getMessage()}",
181
                ['error' => $e, 'message' => $this->messageLoggerContext($message), 'exception' => $e]
182
            );
183
            if (Debugger::isEnabled()) {
184 18
                Debugger::log($e, Debugger::EXCEPTION);
185
            }
186 18
187
            $this->retryMessage($message, $handler);
188
189
            $result = false;
190
        }
191
        return $result;
192 18
    }
193
194 18
    /**
195 18
     * Helper function for sending retrying message back to driver
196 12
     *
197
     * @param MessageInterface $message
198 18
     * @param HandlerInterface $handler
199 18
     */
200
    private function retryMessage(MessageInterface $message, HandlerInterface $handler): void
201
    {
202
        if (method_exists($handler, 'canRetry') && method_exists($handler, 'maxRetry')) {
203
            if ($message->getRetries() < $handler->maxRetry()) {
204
                $executeAt = $this->nextRetry($message);
205
                $newMessage = new Message($message->getType(), $message->getPayload(), $message->getId(), $message->getCreated(), $executeAt, $message->getRetries() + 1);
206
                $this->driver->send($newMessage);
207
            }
208 18
        }
209
    }
210
211 18
    /**
212 18
     * Calculate next retry
213 18
     *
214 18
     * Inspired by ruby sidekiq (https://github.com/mperham/sidekiq/wiki/Error-Handling#automatic-job-retry)
215 12
     *
216
     * @param MessageInterface $message
217
     * @return float
218
     */
219
    private function nextRetry(MessageInterface $message): float
220
    {
221
        return microtime(true) + pow($message->getRetries(), 4) + 15 + (rand(1, 30) * ($message->getRetries() + 1));
222
    }
223
224
    /**
225
     * Check if actual dispatcher has handler for given type
226
     *
227 18
     * @param string $type
228
     *
229 18
     * @return bool
230 3
     */
231 2
    private function hasHandlers(string $type): bool
232 18
    {
233
        return isset($this->handlers[$type]) && count($this->handlers[$type]) > 0;
234
    }
235
236
    /**
237
     * {@inheritdoc}
238
     */
239
    public function registerHandler(string $type, HandlerInterface $handler): DispatcherInterface
240
    {
241
        if (!isset($this->handlers[$type])) {
242
            $this->handlers[$type] = [];
243
        }
244
245
        $this->handlers[$type][] = $handler;
246
        return $this;
247
    }
248
249
    /**
250
     * {@inheritdoc}
251
     */
252
    public function registerHandlers(string $type, array $handlers): DispatcherInterface
253
    {
254
        foreach ($handlers as $handler) {
255
            $this->registerHandler($type, $handler);
256
        }
257
        return $this;
258
    }
259
260
    /**
261
     * Serialize message to logger context
262
     *
263
     * @param MessageInterface $message
264
     *
265
     * @return array
266
     */
267 View Code Duplication
    private function messageLoggerContext(MessageInterface $message): array
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
268
    {
269
        return [
270
            'id' => $message->getId(),
271
            'created' => $message->getCreated(),
272
            'type' => $message->getType(),
273
            'payload' => $message->getPayload(),
274
            'retries' => $message->getRetries(),
275
            'execute_at' => $message->getExecuteAt(),
276
        ];
277
    }
278
279
    /**
280
     * Interal log method wrapper
281
     *
282
     * @param mixed $level
283
     * @param string $message
284
     * @param array $context
285
     *
286
     * @return void
287
     */
288
    private function log($level, string $message, array $context = array()): void
289
    {
290
        if ($this->logger !== null) {
291
            $this->logger->log($level, $message, $context);
292
        }
293
    }
294
}
295