MessageProcessor   A
last analyzed

Complexity

Total Complexity 9

Size/Duplication

Total Lines 98
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 7

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
wmc 9
lcom 1
cbo 7
dl 0
loc 98
ccs 36
cts 36
cp 1
rs 10
c 0
b 0
f 0

2 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 9 1
B process() 0 55 8
1
<?php
2
namespace PSB\Core\Transport\RabbitMq;
3
4
5
use PSB\Core\EndpointControlToken;
6
use PSB\Core\Exception\CriticalErrorException;
7
use PSB\Core\Transport\PushContext;
8
use PSB\Core\Transport\PushPipe;
9
use PSB\Core\Transport\ReceiveCancellationToken;
10
11
class MessageProcessor
12
{
13
    /**
14
     * @var BrokerModel
15
     */
16
    private $brokerModel;
17
18
    /**
19
     * @var RoutingTopology
20
     */
21
    private $routingTopology;
22
23
    /**
24
     * @var MessageConverter
25
     */
26
    private $messageConverter;
27
28
    /**
29
     * @param BrokerModel      $brokerModel
30
     * @param RoutingTopology  $routingTopology
31
     * @param MessageConverter $messageConverter
32
     */
33 7
    public function __construct(
34
        BrokerModel $brokerModel,
35
        RoutingTopology $routingTopology,
36
        MessageConverter $messageConverter
37
    ) {
38 7
        $this->brokerModel = $brokerModel;
39 7
        $this->routingTopology = $routingTopology;
40 7
        $this->messageConverter = $messageConverter;
41 7
    }
42
43
    /**
44
     * @param \AMQPEnvelope            $envelope
45
     * @param \AMQPQueue               $queue
46
     * @param PushPipe                 $pushPipe
47
     * @param string                   $errorQueue
48
     * @param ReceiveCancellationToken $cancellationToken
49
     * @param EndpointControlToken     $endpointControlToken
50
     *
51
     * @return bool
52
     */
53 6
    public function process(
54
        \AMQPEnvelope $envelope,
55
        \AMQPQueue $queue,
56
        PushPipe $pushPipe,
57
        $errorQueue,
58
        ReceiveCancellationToken $cancellationToken,
59
        EndpointControlToken $endpointControlToken
60
    ) {
61
        try {
62 6
            $messageId = '';
63 6
            $headers = [];
64 6
            $pushMessage = false;
65
            try {
66 6
                $messageId = $this->messageConverter->retrieveMessageId($envelope);
67 5
                $headers = $this->messageConverter->retrieveHeaders($envelope);
68 5
                $pushMessage = true;
69 1
            } catch (\Throwable $t) {
70 1
                $this->routingTopology->sendToQueue(
71 1
                    $this->brokerModel,
72 1
                    $errorQueue,
73 1
                    $envelope->getBody(),
74 1
                    ['headers' => $envelope->getHeaders()]
75
                );
76
            }
77
78 6
            if ($pushMessage) {
79 5
                $pushPipe->push(
80 5
                    new PushContext(
81 5
                        $messageId,
82 5
                        $headers,
83 5
                        $envelope->getBody() ?: '',
84 5
                        $cancellationToken,
85 5
                        $endpointControlToken
86
                    )
87
                );
88
            }
89
90 4
            if ($cancellationToken->isCancellationRequested()) {
91 1
                $queue->reject($envelope->getDeliveryTag(), AMQP_REQUEUE);
92
            } else {
93 4
                $queue->ack($envelope->getDeliveryTag());
94
            }
95 2
        } catch (CriticalErrorException $e) {
96
            // just ... die
97 1
            throw $e;
98 1
        } catch (\Throwable $t) {
99 1
            $queue->reject($envelope->getDeliveryTag(), AMQP_REQUEUE);
100
        }
101
102 5
        if ($endpointControlToken->isShutdownRequested()) {
103 1
            return false;
104
        }
105
106 4
        return true;
107
    }
108
}
109