MessageProcessor   A
last analyzed

Complexity

Total Complexity 6

Size/Duplication

Total Lines 49
Duplicated Lines 0 %

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
wmc 6
eloc 30
c 0
b 0
f 0
dl 0
loc 49
ccs 29
cts 29
cp 1
rs 10

3 Methods

Rating   Name   Duplication   Size   Complexity  
A processMessage() 0 29 4
A __construct() 0 4 1
A process() 0 7 1
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Antidot\Queue;
6
7
use Antidot\Queue\Event\MessageProcessed;
8
use Antidot\Queue\Event\MessageReceived;
9
use Enqueue\Consumption\Result;
10
use Interop\Queue\Context;
11
use Interop\Queue\Message;
12
use Interop\Queue\Processor;
13
use Psr\Container\ContainerInterface;
14
use Psr\EventDispatcher\EventDispatcherInterface;
15
use Throwable;
16
17
class MessageProcessor implements Processor
18
{
19
    private const ERROR_MESSAGE_TEMPLATE = 'Error with message: %s. In file %s in line %s. Failing message: %s.';
20
    private ContainerInterface $container;
21
    private EventDispatcherInterface $dispatcher;
22
23 5
    public function __construct(ContainerInterface $container, EventDispatcherInterface $dispatcher)
24
    {
25 5
        $this->container = $container;
26 5
        $this->dispatcher = $dispatcher;
27 5
    }
28
29 4
    public function process(Message $message, Context $context): Result
30
    {
31 4
        $this->dispatcher->dispatch(MessageReceived::occur($message));
32 4
        $result = $this->processMessage($message);
33 4
        $this->dispatcher->dispatch(MessageProcessed::occur($result));
34
35 4
        return $result;
36
    }
37 4
    public function processMessage(Message $message): Result
38
    {
39 4
        $jobPayload = null;
0 ignored issues
show
Unused Code introduced by
The assignment to $jobPayload is dead and can be removed.
Loading history...
40
        try {
41 4
            $jobPayload = JobPayload::createFromMessage($message);
42 3
            if ($this->container->has($jobPayload->type())) {
43 2
                $action = $this->container->get($jobPayload->type());
44 2
                $action($jobPayload);
45 1
                return Result::ack(sprintf('Message type "%s" routed to action.', $jobPayload->type()));
46
            }
47
48 1
            return Result::ack(
49 1
                sprintf('No action matched for message type "%s". Message omitted.', $jobPayload->type())
50
            );
51 2
        } catch (Throwable $exception) {
52 2
            $errorMessage = sprintf(
53 2
                self::ERROR_MESSAGE_TEMPLATE,
54 2
                $exception->getMessage(),
55 2
                $exception->getFile(),
56 2
                $exception->getLine(),
57 2
                json_encode($message, JSON_THROW_ON_ERROR)
58
            );
59
60 2
            if (null === $jobPayload) {
61 1
                return Result::reject(sprintf('Invalid message format given and caused an error: %s', $errorMessage));
62
            }
63
64 1
            return Result::reject(
65 1
                sprintf('Error occurred processing "%s" message: %s', $jobPayload->type(), $errorMessage)
66
            );
67
        }
68
    }
69
}
70