RabbitMqMessagePusher   A
last analyzed

Complexity

Total Complexity 4

Size/Duplication

Total Lines 71
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 5

Test Coverage

Coverage 68.18%

Importance

Changes 0
Metric Value
wmc 4
lcom 1
cbo 5
dl 0
loc 71
ccs 15
cts 22
cp 0.6818
rs 10
c 0
b 0
f 0

3 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 7 1
A init() 0 9 2
A start() 0 16 1
1
<?php
2
namespace PSB\Core\Transport\RabbitMq;
3
4
5
use PSB\Core\EndpointControlToken;
6
use PSB\Core\Transport\MessagePusherInterface;
7
use PSB\Core\Transport\PushPipe;
8
use PSB\Core\Transport\PushSettings;
9
use PSB\Core\Transport\ReceiveCancellationToken;
10
11
class RabbitMqMessagePusher implements MessagePusherInterface
12
{
13
    /**
14
     * @var BrokerModel
15
     */
16
    private $brokerModel;
17
18
    /**
19
     * @var MessageProcessor
20
     */
21
    private $messageProcessor;
22
23
    /**
24
     * @var PushPipe
25
     */
26
    private $pushPipe;
27
28
    /**
29
     * @var PushSettings
30
     */
31
    private $pushSettings;
32
33
    /**
34
     * @param BrokerModel      $brokerModel
35
     * @param MessageProcessor $messageProcessor
36
     */
37 4
    public function __construct(
38
        BrokerModel $brokerModel,
39
        MessageProcessor $messageProcessor
40
    ) {
41 4
        $this->brokerModel = $brokerModel;
42 4
        $this->messageProcessor = $messageProcessor;
43 4
    }
44
45
    /**
46
     * It initializes the message pusher with the pipe it will send messages through.
47
     * The pipe is invoked for each incoming message.
48
     *
49
     * @param PushPipe     $pushPipe
50
     * @param PushSettings $pushSettings
51
     */
52 3
    public function init(PushPipe $pushPipe, PushSettings $pushSettings)
53
    {
54 3
        $this->pushPipe = $pushPipe;
55 3
        $this->pushSettings = $pushSettings;
56
57 3
        if ($pushSettings->isPurgeOnStartup()) {
58 1
            $this->brokerModel->purgeQueue($pushSettings->getInputQueue());
59
        }
60 3
    }
61
62
    /**
63
     * Starts pushing messages
64
     */
65 1
    public function start()
66
    {
67 1
        $this->brokerModel->consume(
68 1
            $this->pushSettings->getInputQueue(),
69
            function (\AMQPEnvelope $envelope, \AMQPQueue $queue) {
70
                return $this->messageProcessor->process(
71
                    $envelope,
72
                    $queue,
73
                    $this->pushPipe,
74
                    $this->pushSettings->getErrorQueue(),
75
                    new ReceiveCancellationToken(),
76
                    new EndpointControlToken()
77
                );
78 1
            }
79
        );
80 1
    }
81
}
82