Completed
Push — master ( 3c0f5e...015fa5 )
by Thibaud
03:56
created

AmqpMessageQueueFactory::setLogger()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 1
Metric Value
c 1
b 0
f 1
dl 0
loc 4
ccs 0
cts 3
cp 0
rs 10
cc 1
eloc 2
nc 1
nop 1
crap 2
1
<?php
2
3
namespace Alchemy\Queue\Amqp;
4
5
use Alchemy\Queue\MessageQueue;
6
use Alchemy\Queue\MessageQueueFactory;
7
use Psr\Log\LoggerAwareInterface;
8
use Psr\Log\LoggerInterface;
9
use Psr\Log\NullLogger;
10
11
class AmqpMessageQueueFactory implements MessageQueueFactory, LoggerAwareInterface
12
{
13
    /**
14
     * @param array $configuration
15
     * @param LoggerInterface $logger
16
     * @return AmqpMessageQueueFactory
17
     */
18
    public static function create(array $configuration = [], LoggerInterface $logger = null)
19
    {
20
        return new self(AmqpConfiguration::parse($configuration), $logger);
21
    }
22
23
    /**
24
     * @var AmqpConfiguration
25
     */
26
    private $configuration;
27
28
    /**
29
     * @var \AMQPExchange
30
     */
31
    private $exchange;
32
33
    /**
34
     * @var \AMQPConnection
35
     */
36
    private $connection;
37
38
    /**
39
     * @var \AMQPChannel
40
     */
41
    private $channel;
42
43
    /**
44
     * @var LoggerInterface
45
     */
46
    private $logger;
47
48 4
    public function __construct(AmqpConfiguration $configuration = null, LoggerInterface $logger = null)
49
    {
50 4
        $this->configuration = $configuration ?: new AmqpConfiguration();
51 4
        $this->logger = $logger ?: new NullLogger();
52 4
    }
53
54
    /**
55
     * Sets a logger instance on the object
56
     *
57
     * @param LoggerInterface $logger
58
     * @return null
59
     */
60
    public function setLogger(LoggerInterface $logger)
61
    {
62
        $this->logger = $logger;
63
    }
64
65
    /**
66
     * @param $name
67
     * @return MessageQueue
68
     */
69 4
    public function getNamedQueue($name)
70
    {
71 4
        $exchange = $this->declareExchange();
72
73 4
        return $this->buildQueue($exchange, $name, $this->configuration->getDeadLetterExchange());
74
    }
75
76
    /**
77
     * @param \AMQPExchange $exchange
78
     * @param $queueName
79
     * @param string $errorExchangeName
80
     * @return \AMQPQueue
81
     */
82 4
    private function buildQueue(\AMQPExchange $exchange, $queueName, $errorExchangeName = '')
83
    {
84 4
        $queue = new \AMQPQueue($exchange->getChannel());
85 4
        $queue->setName($queueName);
86 4
        $queue->setFlags(AMQP_DURABLE);
87
88 4
        if (trim($errorExchangeName) != '') {
89
            $queue->setArgument('x-dead-letter-exchange', $errorExchangeName);
90
        }
91
92 4
        $queue->declareQueue();
93 4
        $queue->bind($exchange->getName(), $queueName);
94
95 4
        $messageQueue = new AmqpMessageQueue($exchange, $queue);
96
97 4
        $messageQueue->setLogger($this->logger);
98
99 4
        return $messageQueue;
100
    }
101
102
103 4
    protected function declareExchange()
104
    {
105 4
        if ($this->exchange == null) {
106 4
            $this->exchange = new \AMQPExchange($this->getChannel());
107
108 4
            $this->exchange->setFlags(AMQP_DURABLE);
109
110 4
            if ($this->configuration->getExchange() != null) {
111 4
                $this->exchange->setType(AMQP_EX_TYPE_DIRECT);
112 4
                $this->exchange->setName($this->configuration->getExchange());
113 4
                $this->exchange->declareExchange();
114 3
            }
115 3
        }
116
117 4
        return $this->exchange;
118
    }
119
120
    /**
121
     * @return \AMQPConnection
122
     */
123 4
    public function getConnection()
124
    {
125 4
        if ($this->connection == null) {
126 4
            $this->connection = new \AMQPConnection($this->configuration->toConnectionArray());
127 4
            $this->connection->connect();
128 3
        }
129
130 4
        return $this->connection;
131
    }
132
133
    /**
134
     * @return \AMQPChannel
135
     */
136 4
    public function getChannel()
137
    {
138 4
        if ($this->channel == null) {
139 4
            $this->channel = new \AMQPChannel($this->getConnection());
140 3
        }
141
142 4
        return $this->channel;
143
    }
144
}
145