This project does not seem to handle request data directly as such no vulnerable execution paths were found.
include
, or for example
via PHP's auto-loading mechanism.
These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more
1 | <?php |
||
2 | |||
3 | namespace Drupal\rabbitmq\Queue; |
||
4 | |||
5 | use Drupal\Core\Config\ImmutableConfig; |
||
6 | use Drupal\Core\Extension\ModuleHandlerInterface; |
||
7 | use Drupal\rabbitmq\ConnectionFactory; |
||
8 | use PhpAmqpLib\Channel\AMQPChannel; |
||
9 | use PhpAmqpLib\Exception\AMQPProtocolChannelException; |
||
10 | use Psr\Log\LoggerInterface; |
||
11 | |||
12 | /** |
||
13 | * Low-level Queue API implementation for RabbitMQ on top of AMQPlib. |
||
14 | * |
||
15 | * This class contains the low-level properties and methods not exposed by |
||
16 | * the Queue API ReliableQueueInterface: those are implemented in Queue.php. |
||
17 | * |
||
18 | * @see \Drupal\rabbitmq\Queue\Queue |
||
19 | */ |
||
20 | abstract class QueueBase { |
||
21 | |||
22 | const LOGGER_CHANNEL = 'rabbitmq'; |
||
23 | |||
24 | const MODULE = 'rabbitmq'; |
||
25 | |||
26 | /** |
||
27 | * Object that holds a channel to RabbitMQ. |
||
28 | * |
||
29 | * @var \PhpAmqpLib\Channel\AMQPChannel |
||
30 | */ |
||
31 | protected $channel; |
||
32 | |||
33 | /** |
||
34 | * The RabbitMQ connection service. |
||
35 | * |
||
36 | * @var \PhpAmqpLib\Connection\AbstractConnection |
||
37 | */ |
||
38 | protected $connection; |
||
39 | |||
40 | /** |
||
41 | * The logger service. |
||
42 | * |
||
43 | * @var \Psr\Log\LoggerInterface |
||
44 | */ |
||
45 | protected $logger; |
||
46 | |||
47 | /** |
||
48 | * The module handler service. |
||
49 | * |
||
50 | * @var \Drupal\Core\Extension\ModuleHandlerInterface |
||
51 | */ |
||
52 | protected $moduleHandler; |
||
53 | |||
54 | /** |
||
55 | * The name of the queue. |
||
56 | * |
||
57 | * @var string |
||
58 | */ |
||
59 | protected $name; |
||
60 | |||
61 | /** |
||
62 | * The queue options. |
||
63 | * |
||
64 | * @var array |
||
65 | */ |
||
66 | protected $options; |
||
67 | |||
68 | /** |
||
69 | * A queue array: [writer, item count, consumer count]. |
||
70 | * |
||
71 | * @var array|null |
||
72 | */ |
||
73 | protected $queue; |
||
74 | |||
75 | /** |
||
76 | * Constructor. |
||
77 | * |
||
78 | * @param string $name |
||
79 | * The name of the queue to work with: an arbitrary string. |
||
80 | * @param \Drupal\rabbitmq\ConnectionFactory $connectionFactory |
||
81 | * The RabbitMQ connection factory. |
||
82 | * @param \Drupal\Core\Extension\ModuleHandlerInterface $moduleHandler |
||
83 | * The module handler service. |
||
84 | * @param \Psr\Log\LoggerInterface $logger |
||
85 | * The logger service. |
||
86 | * @param \Drupal\Core\Config\ImmutableConfig $moduleConfig |
||
87 | * The config.factory service. |
||
88 | */ |
||
89 | 5 | public function __construct( |
|
90 | string $name, |
||
91 | ConnectionFactory $connectionFactory, |
||
92 | ModuleHandlerInterface $moduleHandler, |
||
93 | LoggerInterface $logger, |
||
94 | ImmutableConfig $moduleConfig |
||
95 | ) { |
||
96 | $this->name = $name; |
||
97 | 5 | $this->connection = $connectionFactory->getConnection(); |
|
98 | $this->moduleHandler = $moduleHandler; |
||
99 | 5 | $this->logger = $logger; |
|
100 | 5 | ||
101 | 5 | // Check our active storage to find the the queue config. |
|
102 | $queues = $moduleConfig->get('queues'); |
||
103 | |||
104 | 5 | $this->options = ['name' => $name]; |
|
105 | 5 | ||
106 | 5 | if (isset($queues[$name])) { |
|
107 | 5 | $this->options += $queues[$name]; |
|
108 | } |
||
109 | 5 | ||
110 | 5 | // Declare any exchanges required if configured. |
|
111 | $exchanges = $moduleConfig->get('exchanges'); |
||
112 | if ($exchanges) { |
||
113 | foreach ($exchanges as $exchangeName => $exchangeOptions) { |
||
114 | $this->connection->channel()->exchange_declare( |
||
115 | $exchangeName, |
||
116 | $exchangeOptions['type'] ?? 'direct', |
||
117 | $exchangeOptions['passive'] ?? FALSE, |
||
118 | $exchangeOptions['durable'] ?? TRUE, |
||
119 | $exchangeOptions['auto_delete'] ?? FALSE, |
||
120 | $exchangeOptions['internal'] ?? FALSE, |
||
121 | $exchangeOptions['nowait'] ?? FALSE |
||
122 | ); |
||
123 | 5 | } |
|
124 | } |
||
125 | } |
||
126 | |||
127 | /** |
||
128 | * Obtain an initialized channel to the queue. |
||
129 | * |
||
130 | * @return \PhpAmqpLib\Channel\AMQPChannel |
||
131 | 5 | * The queue channel. |
|
132 | 5 | */ |
|
133 | 5 | public function getChannel() { |
|
134 | 5 | if (FALSE === isset($this->channel)) { |
|
135 | 5 | $this->channel = $this->connection |
|
136 | ->getConnection() |
||
137 | ->channel(); |
||
138 | 5 | ||
139 | // Initialize a queue on the channel. |
||
140 | $this->getQueue($this->channel); |
||
141 | 5 | } |
|
142 | |||
143 | return $this->channel; |
||
144 | } |
||
145 | |||
146 | /** |
||
147 | * Declare a queue and obtain information about the queue. |
||
148 | * |
||
149 | * @param \PhpAmqpLib\Channel\AMQPChannel $channel |
||
150 | * The queue channel. |
||
151 | * @param array $optionsOverrides |
||
152 | * Options overriding the queue defaults. |
||
153 | * |
||
154 | * @return mixed|null |
||
0 ignored issues
–
show
|
|||
155 | * Not strongly specified by php-amqplib. Expected to be a 3-item array: |
||
156 | * - ProtocolWriter |
||
157 | * - Number of items |
||
158 | 5 | * - Number of clients |
|
159 | 5 | */ |
|
160 | protected function getQueue(AMQPChannel $channel, array $optionsOverrides = []) { |
||
161 | 5 | if (FALSE === isset($this->queue)) { |
|
162 | // Declare the queue. |
||
163 | 5 | $options = array_merge($this->options, $optionsOverrides); |
|
164 | 5 | try { |
|
165 | 5 | $this->queue = $channel->queue_declare( |
|
166 | 5 | $this->name, |
|
167 | 5 | $options['passive'] ?? FALSE, |
|
168 | 5 | $options['durable'] ?? TRUE, |
|
169 | 5 | $options['exclusive'] ?? FALSE, |
|
170 | 5 | $options['auto_delete'] ?? TRUE, |
|
171 | 5 | $options['nowait'] ?? FALSE, |
|
172 | $options['arguments'] ?? NULL, |
||
173 | $options['ticket'] ?? NULL |
||
174 | 2 | ); |
|
175 | 2 | } |
|
176 | catch (AMQPProtocolChannelException $e) { |
||
177 | return NULL; |
||
178 | } |
||
179 | 5 | ||
180 | // Bind the queue to an exchange if defined. |
||
181 | if ($this->queue && !empty($options['routing_keys'])) { |
||
182 | foreach ($options['routing_keys'] as $routingKey) { |
||
183 | list($exchange) = explode('.', $routingKey, 2); |
||
184 | $this->channel->queue_bind($this->name, $exchange, $routingKey); |
||
185 | } |
||
186 | } |
||
187 | 5 | } |
|
188 | |||
189 | return $this->queue; |
||
190 | } |
||
191 | |||
192 | /** |
||
193 | * Shutdown. |
||
194 | */ |
||
195 | public function shutdown() { |
||
196 | if ($this->channel) { |
||
197 | $this->channel->close(); |
||
198 | } |
||
199 | if ($this->connection) { |
||
200 | $this->connection->close(); |
||
201 | } |
||
202 | } |
||
203 | |||
204 | } |
||
205 |
This check looks for the generic type
array
as a return type and suggests a more specific type. This type is inferred from the actual code.