1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Alchemy\Queue\Amqp; |
4
|
|
|
|
5
|
|
|
use Alchemy\Queue\MessageQueue; |
6
|
|
|
use Alchemy\Queue\MessageQueueFactory; |
7
|
|
|
use Psr\Log\LoggerAwareInterface; |
8
|
|
|
use Psr\Log\LoggerInterface; |
9
|
|
|
use Psr\Log\NullLogger; |
10
|
|
|
|
11
|
|
|
class AmqpMessageQueueFactory implements MessageQueueFactory, LoggerAwareInterface |
12
|
|
|
{ |
13
|
|
|
/** |
14
|
|
|
* @param array $configuration |
15
|
|
|
* @param LoggerInterface $logger |
16
|
|
|
* @return AmqpMessageQueueFactory |
17
|
|
|
*/ |
18
|
|
|
public static function create(array $configuration = [], LoggerInterface $logger = null) |
19
|
|
|
{ |
20
|
|
|
return new self(AmqpConfiguration::parse($configuration), $logger); |
21
|
|
|
} |
22
|
|
|
|
23
|
|
|
/** |
24
|
|
|
* @var AmqpConfiguration |
25
|
|
|
*/ |
26
|
|
|
private $configuration; |
27
|
|
|
|
28
|
|
|
/** |
29
|
|
|
* @var \AMQPExchange |
30
|
|
|
*/ |
31
|
|
|
private $exchange; |
32
|
|
|
|
33
|
|
|
/** |
34
|
|
|
* @var \AMQPConnection |
35
|
|
|
*/ |
36
|
|
|
private $connection; |
37
|
|
|
|
38
|
|
|
/** |
39
|
|
|
* @var \AMQPChannel |
40
|
|
|
*/ |
41
|
|
|
private $channel; |
42
|
|
|
|
43
|
|
|
/** |
44
|
|
|
* @var LoggerInterface |
45
|
|
|
*/ |
46
|
|
|
private $logger; |
47
|
|
|
|
48
|
4 |
|
public function __construct(AmqpConfiguration $configuration = null, LoggerInterface $logger = null) |
49
|
|
|
{ |
50
|
4 |
|
$this->configuration = $configuration ?: new AmqpConfiguration(); |
51
|
4 |
|
$this->logger = $logger ?: new NullLogger(); |
52
|
4 |
|
} |
53
|
|
|
|
54
|
|
|
/** |
55
|
|
|
* Sets a logger instance on the object |
56
|
|
|
* |
57
|
|
|
* @param LoggerInterface $logger |
58
|
|
|
* @return null |
59
|
|
|
*/ |
60
|
|
|
public function setLogger(LoggerInterface $logger) |
61
|
|
|
{ |
62
|
|
|
$this->logger = $logger; |
63
|
|
|
} |
64
|
|
|
|
65
|
|
|
/** |
66
|
|
|
* @param $name |
67
|
|
|
* @return MessageQueue |
68
|
|
|
*/ |
69
|
4 |
|
public function getNamedQueue($name) |
70
|
|
|
{ |
71
|
4 |
|
$exchange = $this->declareExchange(); |
72
|
|
|
|
73
|
4 |
|
return $this->buildQueue($exchange, $name, $this->configuration->getDeadLetterExchange()); |
74
|
|
|
} |
75
|
|
|
|
76
|
|
|
/** |
77
|
|
|
* @param \AMQPExchange $exchange |
78
|
|
|
* @param $queueName |
79
|
|
|
* @param string $errorExchangeName |
80
|
|
|
* @return \AMQPQueue |
81
|
|
|
*/ |
82
|
4 |
|
private function buildQueue(\AMQPExchange $exchange, $queueName, $errorExchangeName = '') |
83
|
|
|
{ |
84
|
4 |
|
$queue = new \AMQPQueue($exchange->getChannel()); |
85
|
4 |
|
$queue->setName($queueName); |
86
|
4 |
|
$queue->setFlags(AMQP_DURABLE); |
87
|
|
|
|
88
|
4 |
|
if (trim($errorExchangeName) != '') { |
89
|
|
|
$queue->setArgument('x-dead-letter-exchange', $errorExchangeName); |
90
|
|
|
} |
91
|
|
|
|
92
|
4 |
|
$queue->declareQueue(); |
93
|
4 |
|
$queue->bind($exchange->getName(), $queueName); |
94
|
|
|
|
95
|
4 |
|
$messageQueue = new AmqpMessageQueue($exchange, $queue); |
96
|
|
|
|
97
|
4 |
|
$messageQueue->setLogger($this->logger); |
98
|
|
|
|
99
|
4 |
|
return $messageQueue; |
100
|
|
|
} |
101
|
|
|
|
102
|
|
|
|
103
|
4 |
|
protected function declareExchange() |
104
|
|
|
{ |
105
|
4 |
|
if ($this->exchange == null) { |
106
|
4 |
|
$this->exchange = new \AMQPExchange($this->getChannel()); |
107
|
|
|
|
108
|
4 |
|
$this->exchange->setFlags(AMQP_DURABLE); |
109
|
|
|
|
110
|
4 |
|
if ($this->configuration->getExchange() != null) { |
111
|
4 |
|
$this->exchange->setType(AMQP_EX_TYPE_DIRECT); |
112
|
4 |
|
$this->exchange->setName($this->configuration->getExchange()); |
113
|
4 |
|
$this->exchange->declareExchange(); |
114
|
3 |
|
} |
115
|
3 |
|
} |
116
|
|
|
|
117
|
4 |
|
return $this->exchange; |
118
|
|
|
} |
119
|
|
|
|
120
|
|
|
/** |
121
|
|
|
* @return \AMQPConnection |
122
|
|
|
*/ |
123
|
4 |
|
public function getConnection() |
124
|
|
|
{ |
125
|
4 |
|
if ($this->connection == null) { |
126
|
4 |
|
$this->connection = new \AMQPConnection($this->configuration->toConnectionArray()); |
127
|
|
|
|
128
|
4 |
|
if ($this->configuration->getTimeout() > 0) { |
129
|
|
|
$this->connection->setTimeout($this->configuration->getTimeout()); |
|
|
|
|
130
|
|
|
} |
131
|
|
|
|
132
|
4 |
|
$this->connection->connect(); |
133
|
3 |
|
} |
134
|
|
|
|
135
|
4 |
|
return $this->connection; |
136
|
|
|
} |
137
|
|
|
|
138
|
|
|
/** |
139
|
|
|
* @return \AMQPChannel |
140
|
|
|
*/ |
141
|
4 |
|
public function getChannel() |
142
|
|
|
{ |
143
|
4 |
|
if ($this->channel == null) { |
144
|
4 |
|
$this->channel = new \AMQPChannel($this->getConnection()); |
145
|
3 |
|
} |
146
|
|
|
|
147
|
4 |
|
return $this->channel; |
148
|
|
|
} |
149
|
|
|
} |
150
|
|
|
|
This method has been deprecated. The supplier of the class has supplied an explanatory message.
The explanatory message should give you some clue as to whether and when the method will be removed from the class and what other method or class to use instead.