RabbitMqMessagePusher::start()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 16

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 1.1984

Importance

Changes 0
Metric Value
dl 0
loc 16
ccs 5
cts 12
cp 0.4167
rs 9.7333
c 0
b 0
f 0
cc 1
nc 1
nop 0
crap 1.1984
1
<?php
2
namespace PSB\Core\Transport\RabbitMq;
3
4
5
use PSB\Core\EndpointControlToken;
6
use PSB\Core\Transport\MessagePusherInterface;
7
use PSB\Core\Transport\PushPipe;
8
use PSB\Core\Transport\PushSettings;
9
use PSB\Core\Transport\ReceiveCancellationToken;
10
11
class RabbitMqMessagePusher implements MessagePusherInterface
12
{
13
    /**
14
     * @var BrokerModel
15
     */
16
    private $brokerModel;
17
18
    /**
19
     * @var MessageProcessor
20
     */
21
    private $messageProcessor;
22
23
    /**
24
     * @var PushPipe
25
     */
26
    private $pushPipe;
27
28
    /**
29
     * @var PushSettings
30
     */
31
    private $pushSettings;
32
33
    /**
34
     * @param BrokerModel      $brokerModel
35
     * @param MessageProcessor $messageProcessor
36
     */
37 4
    public function __construct(
38
        BrokerModel $brokerModel,
39
        MessageProcessor $messageProcessor
40
    ) {
41 4
        $this->brokerModel = $brokerModel;
42 4
        $this->messageProcessor = $messageProcessor;
43 4
    }
44
45
    /**
46
     * It initializes the message pusher with the pipe it will send messages through.
47
     * The pipe is invoked for each incoming message.
48
     *
49
     * @param PushPipe     $pushPipe
50
     * @param PushSettings $pushSettings
51
     */
52 3
    public function init(PushPipe $pushPipe, PushSettings $pushSettings)
53
    {
54 3
        $this->pushPipe = $pushPipe;
55 3
        $this->pushSettings = $pushSettings;
56
57 3
        if ($pushSettings->isPurgeOnStartup()) {
58 1
            $this->brokerModel->purgeQueue($pushSettings->getInputQueue());
59
        }
60 3
    }
61
62
    /**
63
     * Starts pushing messages
64
     */
65 1
    public function start()
66
    {
67 1
        $this->brokerModel->consume(
68 1
            $this->pushSettings->getInputQueue(),
69
            function (\AMQPEnvelope $envelope, \AMQPQueue $queue) {
70
                return $this->messageProcessor->process(
71
                    $envelope,
72
                    $queue,
73
                    $this->pushPipe,
74
                    $this->pushSettings->getErrorQueue(),
75
                    new ReceiveCancellationToken(),
76
                    new EndpointControlToken()
77
                );
78 1
            }
79
        );
80 1
    }
81
}
82