Completed
Push — master ( 98de9a...c9581f )
by Thibaud
02:37
created

AmqpMessageQueueFactory::create()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 1
Metric Value
c 1
b 0
f 1
dl 0
loc 4
ccs 0
cts 2
cp 0
rs 10
cc 1
eloc 2
nc 1
nop 1
crap 2
1
<?php
2
3
namespace Alchemy\Queue\Amqp;
4
5
use Alchemy\Queue\MessageQueue;
6
use Alchemy\Queue\MessageQueueFactory;
7
8
class AmqpMessageQueueFactory implements MessageQueueFactory
9
{
10
    /**
11
     * @param array $configuration
12
     * @return AmqpMessageQueueFactory
13
     */
14
    public static function create(array $configuration = [])
15
    {
16
        return new self(AmqpConfiguration::parse($configuration));
17
    }
18
19
    /**
20
     * @var AmqpConfiguration
21
     */
22
    private $configuration;
23
24
    /**
25
     * @var \AMQPExchange
26
     */
27
    private $exchange;
28
29
    /**
30
     * @var \AMQPConnection
31
     */
32
    private $connection;
33
34
    /**
35
     * @var \AMQPChannel
36
     */
37
    private $channel;
38
39 4
    public function __construct(AmqpConfiguration $configuration = null)
40
    {
41 4
        $this->configuration = $configuration ?: new AmqpConfiguration();
42 4
    }
43
44
    /**
45
     * @param $name
46
     * @return MessageQueue
47
     */
48 4
    public function getNamedQueue($name)
49
    {
50 4
        $exchange = $this->declareExchange();
51
52 4
        return $this->buildQueue($exchange, $name, $this->configuration->getDeadLetterExchange());
53
    }
54
55
    /**
56
     * @param \AMQPExchange $exchange
57
     * @param $queueName
58
     * @param string $errorExchangeName
59
     * @return \AMQPQueue
60
     */
61 4
    private function buildQueue(\AMQPExchange $exchange, $queueName, $errorExchangeName = '')
62
    {
63 4
        $queue = new \AMQPQueue($exchange->getChannel());
64 4
        $queue->setName($queueName);
65 4
        $queue->setFlags(AMQP_DURABLE);
66
67 4
        if (trim($errorExchangeName) != '') {
68 4
            $queue->setArgument('x-dead-letter-exchange', $errorExchangeName);
69 3
        }
70
71 4
        $queue->declareQueue();
72 4
        $queue->bind($exchange->getName(), $queueName);
73
74 4
        return new AmqpMessageQueue($exchange, $queue);
75
    }
76
77 4
    protected function declareExchange()
78
    {
79 4
        if ($this->exchange == null) {
80 4
            $this->exchange = new \AMQPExchange($this->getChannel());
81
82 4
            $this->exchange->setFlags(AMQP_DURABLE);
83
84 4
            if ($this->configuration->getExchange() != null) {
85 4
                $this->exchange->setType(AMQP_EX_TYPE_DIRECT);
86 4
                $this->exchange->setName($this->configuration->getExchange());
87 4
                $this->exchange->declareExchange();
88 3
            }
89 3
        }
90
91 4
        return $this->exchange;
92
    }
93
94
95
    /**
96
     * @return \AMQPConnection
97
     */
98 4
    public function getConnection()
99
    {
100 4
        if ($this->connection == null) {
101 4
            $this->connection = new \AMQPConnection($this->configuration->toConnectionArray());
102 4
            $this->connection->connect();
103 3
        }
104
105 4
        return $this->connection;
106
    }
107
108
    /**
109
     * @return \AMQPChannel
110
     */
111 4
    public function getChannel()
112
    {
113 4
        if ($this->channel == null) {
114 4
            $this->channel = new \AMQPChannel($this->getConnection());
115 3
        }
116
117 4
        return $this->channel;
118
    }
119
}
120