Completed
Pull Request — master (#2)
by Thibaud
04:29
created

AmqpMessageQueueFactory::create()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 2
cts 2
cp 1
rs 10
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 2
crap 1
1
<?php
2
3
namespace Alchemy\Queue\Amqp;
4
5
use Alchemy\Queue\MessageQueue;
6
use Alchemy\Queue\MessageQueueFactory;
7
use Psr\Log\LoggerAwareInterface;
8
use Psr\Log\LoggerInterface;
9
use Psr\Log\NullLogger;
10
11
class AmqpMessageQueueFactory implements MessageQueueFactory, LoggerAwareInterface
12
{
13
    /**
14
     * @param array $configuration
15
     * @param LoggerInterface $logger
16
     * @return AmqpMessageQueueFactory
17
     */
18 4
    public static function create(array $configuration = [], LoggerInterface $logger = null)
19
    {
20 4
        return new self(AmqpConfiguration::parse($configuration), $logger);
21
    }
22
23
    /**
24
     * @var AmqpConfiguration
25
     */
26
    private $configuration;
27
28
    /**
29
     * @var \AMQPExchange
30
     */
31
    private $exchange;
32
33
    /**
34
     * @var \AMQPConnection
35
     */
36
    private $connection;
37
38
    /**
39
     * @var \AMQPChannel
40
     */
41
    private $channel;
42
43
    /**
44
     * @var LoggerInterface
45
     */
46
    private $logger;
47
48 8
    public function __construct(AmqpConfiguration $configuration = null, LoggerInterface $logger = null)
49
    {
50 8
        $this->configuration = $configuration ?: new AmqpConfiguration();
51 8
        $this->logger = $logger ?: new NullLogger();
52 8
    }
53
54
    /**
55
     * Sets a logger instance on the object
56
     *
57
     * @param LoggerInterface $logger
58
     * @return null
59
     */
60
    public function setLogger(LoggerInterface $logger)
61
    {
62
        $this->logger = $logger;
63
    }
64
65
    /**
66
     * @param $name
67
     * @return MessageQueue
68
     */
69 8
    public function getNamedQueue($name)
70
    {
71 8
        $exchange = $this->declareExchange();
72
73 8
        return $this->buildQueue($exchange, $name, $this->configuration->getDeadLetterExchange());
74
    }
75
76
    /**
77
     * @param \AMQPExchange $exchange
78
     * @param $queueName
79
     * @param string $errorExchangeName
80
     * @return \AMQPQueue
81
     */
82 8
    private function buildQueue(\AMQPExchange $exchange, $queueName, $errorExchangeName = '')
83
    {
84 8
        $queue = new \AMQPQueue($exchange->getChannel());
85 8
        $queue->setName($queueName);
86 8
        $queue->setFlags(AMQP_DURABLE);
87
88 8
        if (trim($errorExchangeName) != '') {
89
            $queue->setArgument('x-dead-letter-exchange', $errorExchangeName);
90
        }
91
92 8
        $queue->declareQueue();
93 8
        $queue->bind($exchange->getName(), $queueName);
94
95 8
        $messageQueue = new AmqpMessageQueue($exchange, $queue);
96
97 8
        $messageQueue->setLogger($this->logger);
98
99 8
        return $messageQueue;
100
    }
101
102
103 8
    protected function declareExchange()
104
    {
105 8
        if ($this->exchange == null) {
106 8
            $this->exchange = new \AMQPExchange($this->getChannel());
107
108 8
            $this->exchange->setFlags(AMQP_DURABLE);
109
110 8
            if ($this->configuration->getExchange() != null) {
111 8
                $this->exchange->setType(AMQP_EX_TYPE_DIRECT);
112 8
                $this->exchange->setName($this->configuration->getExchange());
113 8
                $this->exchange->declareExchange();
114 6
            }
115 6
        }
116
117 8
        return $this->exchange;
118
    }
119
120
    /**
121
     * @return \AMQPConnection
122
     */
123 8
    public function getConnection()
124
    {
125 8
        if ($this->connection == null) {
126 8
            $this->connection = new \AMQPConnection($this->configuration->toConnectionArray());
127
128 8
            if ($this->configuration->getTimeout() > 0) {
129
                $this->connection->setTimeout($this->configuration->getTimeout());
0 ignored issues
show
Deprecated Code introduced by
The method AMQPConnection::setTimeout() has been deprecated with message: use AMQPConnection::setReadTimout($timeout) instead

This method has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the method will be removed from the class and what other method or class to use instead.

Loading history...
130
            }
131
132 8
            $this->connection->connect();
133 6
        }
134
135 8
        return $this->connection;
136
    }
137
138
    /**
139
     * @return \AMQPChannel
140
     */
141 8
    public function getChannel()
142
    {
143 8
        if ($this->channel == null) {
144 8
            $this->channel = new \AMQPChannel($this->getConnection());
145 6
        }
146
147 8
        return $this->channel;
148
    }
149
}
150