Issues (12)

Security Analysis    no request data  

This project does not seem to handle request data directly as such no vulnerable execution paths were found.

  File Inclusion
File Inclusion enables an attacker to inject custom files into PHP's file loading mechanism, either explicitly passed to include, or for example via PHP's auto-loading mechanism.
  Regex Injection
Regex Injection enables an attacker to execute arbitrary code in your PHP process.
  SQL Injection
SQL Injection enables an attacker to execute arbitrary SQL code on your database server gaining access to user data, or manipulating user data.
  Response Splitting
Response Splitting can be used to send arbitrary responses.
  File Manipulation
File Manipulation enables an attacker to write custom data to files. This potentially leads to injection of arbitrary code on the server.
  Object Injection
Object Injection enables an attacker to inject an object into PHP code, and can lead to arbitrary code execution, file exposure, or file manipulation attacks.
  File Exposure
File Exposure allows an attacker to gain access to local files that he should not be able to access. These files can for example include database credentials, or other configuration files.
  XML Injection
XML Injection enables an attacker to read files on your local filesystem including configuration files, or can be abused to freeze your web-server process.
  Code Injection
Code Injection enables an attacker to execute arbitrary code on the server.
  Variable Injection
Variable Injection enables an attacker to overwrite program variables with custom data, and can lead to further vulnerabilities.
  XPath Injection
XPath Injection enables an attacker to modify the parts of XML document that are read. If that XML document is for example used for authentication, this can lead to further vulnerabilities similar to SQL Injection.
  Other Vulnerability
This category comprises other attack vectors such as manipulating the PHP runtime, loading custom extensions, freezing the runtime, or similar.
  Command Injection
Command Injection enables an attacker to inject a shell command that is execute with the privileges of the web-server. This can be used to expose sensitive data, or gain access of your server.
  LDAP Injection
LDAP Injection enables an attacker to inject LDAP statements potentially granting permission to run unauthorized queries, or modify content inside the LDAP tree.
  Cross-Site Scripting
Cross-Site Scripting enables an attacker to inject code into the response of a web-request that is viewed by other users. It can for example be used to bypass access controls, or even to take over other users' accounts.
  Header Injection
Unfortunately, the security analysis is currently not available for your project. If you are a non-commercial open-source project, please contact support to gain access.

src/Entity/QueueEntity.php (4 issues)

1
<?php
2
3
namespace NeedleProject\LaravelRabbitMq\Entity;
4
5
use NeedleProject\LaravelRabbitMq\AMQPConnection;
6
use NeedleProject\LaravelRabbitMq\ConsumerInterface;
7
use NeedleProject\LaravelRabbitMq\Interpreter\EntityArgumentsInterpreter;
8
use NeedleProject\LaravelRabbitMq\Processor\AbstractMessageProcessor;
9
use NeedleProject\LaravelRabbitMq\Processor\MessageProcessorInterface;
10
use NeedleProject\LaravelRabbitMq\PublisherInterface;
11
use PhpAmqpLib\Channel\AMQPChannel;
12
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
13
use PhpAmqpLib\Exception\AMQPTimeoutException;
14
use PhpAmqpLib\Message\AMQPMessage;
15
use PhpAmqpLib\Wire\AMQPTable;
16
use Psr\Log\LoggerAwareInterface;
17
use Psr\Log\LoggerAwareTrait;
18
use PhpAmqpLib\Exception\AMQPChannelClosedException;
19
20
/**
21
 * Class QueueEntity
22
 *
23
 * @package NeedleProject\LaravelRabbitMq\Entity
24
 * @author  Adrian Tilita <[email protected]>
25
 */
26
class QueueEntity implements PublisherInterface, ConsumerInterface, AMQPEntityInterface, LoggerAwareInterface
27
{
28
    use LoggerAwareTrait;
29
30
    /**
31
     * @const int   Retry count when a Channel Closed exeption is thrown
32
     */
33
    const MAX_RETRIES = 3;
34
35
    /**
36
     * @const array Default connections parameters
37
     */
38
    const DEFAULTS = [
39
        // Whether to check if it exists or to verify existance using argument types (Throws PRECONDITION_FAILED)
40
        'passive'                      => false,
41
        // Entities with durable will be re-created uppon server restart
42
        'durable'                      => false,
43
        // whether to use it by only one channel, then it gets deleted
44
        'exclusive'                    => false,
45
        // Whether to delete it when the queue has no event on it
46
        'auto_delete'                  => false,
47
        // Whether to receive a Declare confirmation
48
        'nowait'                       => false,
49
        // Additional arguments for queue creation
50
        'arguments'                    => [],
51
        // Whether to auto create the entity before publishing/consuming it
52
        'auto_create'                  => false,
53
        // whether to "hide" the exception on re-declare.
54
        // if the `passive` filter is set true, this is redundant
55
        'throw_exception_on_redeclare' => true,
56
        // whether to throw on exception when trying to
57
        // bind to an in-existent queue/exchange
58
        'throw_exception_on_bind_fail' => true,
59
        // no ideea what it represents - @todo - find a documentation that states it's role
60
        'ticket'                       => null
61
    ];
62
63
    /**
64
     * @var AMQPConnection
65
     */
66
    protected $connection;
67
68
    /**
69
     * @var string
70
     */
71
    protected $aliasName;
72
73
    /**
74
     * @var array
75
     */
76
    protected $attributes;
77
78
    /**
79
     * @var int
80
     */
81
    protected $prefetchCount = 1;
82
83
    /**
84
     * @var null|string|MessageProcessorInterface
85
     */
86
    protected $messageProcessor = null;
87
88
    /**
89
     * @var int
90
     */
91
    protected $limitMessageCount;
92
93
    /**
94
     * @var int
95
     */
96
    protected $limitSecondsUptime;
97
98
    /**
99
     * @var int
100
     */
101
    protected $limitMemoryConsumption;
102
103
    /**
104
     * @var double
105
     */
106
    protected $startTime = 0;
107
108
    /**
109
     * @var int
110
     */
111
    protected $retryCount = 0;
112
    /**
113
     * @var bool
114 19
     */
115
    protected $globalPrefetch = true;
116 19
117
    /**
118
     * @param AMQPConnection $connection
119 19
     * @param string $aliasName
120
     * @param array $queueDetails
121
     * @return QueueEntity
122
     */
123
    public static function createQueue(AMQPConnection $connection, string $aliasName, array $queueDetails)
124
    {
125
        return new static(
126 3
            $connection,
127
            $aliasName,
128 3
            array_merge(self::DEFAULTS, $queueDetails)
129
        );
130
    }
131
132
    /**
133
     * @return string
134
     */
135
    public function getAliasName(): string
136
    {
137
        return $this->aliasName;
138 19
    }
139
140 19
    /**
141 19
     * ExchangeEntity constructor.
142 19
     *
143 19
     * @param AMQPConnection $connection
144
     * @param string $aliasName
145
     * @param array $attributes
146
     */
147
    public function __construct(AMQPConnection $connection, string $aliasName, array $attributes = [])
148
    {
149 2
        $this->connection = $connection;
150
        $this->aliasName  = $aliasName;
151 2
        $this->attributes = $attributes;
152 2
    }
153
154
    /**
155
     * @param int $prefetchCount
156
     * @return ConsumerInterface
157
     */
158
    public function setPrefetchCount(int $prefetchCount): ConsumerInterface
159 2
    {
160
        $this->prefetchCount = $prefetchCount;
161 2
        return $this;
162 2
    }
163
164
    /**
165
     * @param string $messageProcessor
166
     * @return ConsumerInterface
167
     */
168 10
    public function setMessageProcessor(string $messageProcessor): ConsumerInterface
169
    {
170 10
        $this->messageProcessor = $messageProcessor;
171
        return $this;
172
    }
173
174
    /**
175
     * @param bool $globalPrefetch
176 10
     * @return ConsumerInterface
177
     */
178 10
    public function setGlobalPrefetch(bool $globalPrefetch): ConsumerInterface
179
    {
180
        $this->globalPrefetch = $globalPrefetch;
181
182
        return $this;
183
    }
184 4
185
    /**
186
     * @return AMQPConnection
187 4
     */
188 4
    protected function getConnection(): AMQPConnection
189 4
    {
190 4
        return $this->connection;
191 4
    }
192 4
193 4
    /**
194 4
     * @return AMQPChannel
195 4
     */
196
    protected function getChannel(): AMQPChannel
197 2
    {
198
        return $this->getConnection()->getChannel();
199 2
    }
200 1
201
    /**
202
     * Create the Queue
203 1
     */
204
    public function create()
205 3
    {
206
        try {
207 4
            $this->getChannel()
208
                ->queue_declare(
209 4
                    $this->attributes['name'],
210 1
                    $this->attributes['passive'],
211
                    $this->attributes['durable'],
212 3
                    $this->attributes['exclusive'],
213
                    $this->attributes['auto_delete'],
214 3
                    $this->attributes['nowait'],
215 3
                    EntityArgumentsInterpreter::interpretArguments(
216 3
                        $this->attributes['arguments']
217 3
                    ),
218 3
                    $this->attributes['ticket']
219
                );
220 1
        } catch (AMQPProtocolChannelException $e) {
221
            // 406 is a soft error triggered for precondition failure (when redeclaring with different parameters)
222 1
            if (true === $this->attributes['throw_exception_on_redeclare'] || $e->amqp_reply_code !== 406) {
223 1
                throw $e;
224
            }
225 2
            // a failure trigger channels closing process
226
            $this->reconnect();
227
        }
228 2
    }
229
230
    public function bind()
231
    {
232
        if (!isset($this->attributes['bind']) || empty($this->attributes['bind'])) {
233 1
            return;
234
        }
235 1
        foreach ($this->attributes['bind'] as $bindItem) {
236 1
            try {
237
                $this->getChannel()
238
                    ->queue_bind(
239
                        $this->attributes['name'],
240
                        $bindItem['exchange'],
241 1
                        $bindItem['routing_key'] ?? ''
242
                    );
243 1
            } catch (AMQPProtocolChannelException $e) {
244 1
                // 404 is the code for trying to bind to an non-existing entity
245
                if (true === $this->attributes['throw_exception_on_bind_fail'] || $e->amqp_reply_code !== 404) {
246
                    throw $e;
247
                }
248
                $this->reconnect();
249
            }
250
        }
251
    }
252
253
    /**
254 4
     * Delete the queue
255
     */
256 4
    public function delete()
257 1
    {
258 1
        $this->getChannel()->queue_delete($this->attributes['name']);
259
    }
260
261
    /**
262 4
     * {@inheritdoc}
263 4
     */
264 4
    public function reconnect()
265 4
    {
266 4
        $this->getConnection()->reconnect();
267 4
    }
268
269 3
    /**
270 2
     * Publish a message
271 2
     *
272
     * @param string $message
273 2
     * @param string $routingKey
274 2
     * @param array $properties
275 2
     * @return mixed|void
276 1
     * @throws AMQPProtocolChannelException
277
     */
278 1
    public function publish(string $message, string $routingKey = '', array $properties = [])
279
    {
280 3
        if ($this->attributes['auto_create'] === true) {
281
            $this->create();
282
            $this->bind();
283
        }
284
285
        try {
286
            $this->getChannel()
287
                ->basic_publish(
288
                    new AMQPMessage(
289
                        $message,
290
                        EntityArgumentsInterpreter::interpretProperties(
291
                            $this->attributes,
292
                            $properties
293
                        )
294
                    ),
295
                    '',
296
                    $this->attributes['name'],
297
                    true
298
                );
299
            $this->retryCount = 0;
300
        } catch (AMQPChannelClosedException $exception) {
301
            $this->retryCount++;
302
            // Retry publishing with re-connect
303
            if ($this->retryCount < self::MAX_RETRIES) {
304
                $this->getConnection()->reconnect();
305
                $this->publish($message, $routingKey);
306
307
                return;
308
            }
309
            throw $exception;
310
        }
311
    }
312
313
    /**
314
     * {@inheritdoc}
315
     *
316
     * @param int $messages
317
     * @param int $seconds
318
     * @param int $maxMemory
319
     * @return int
320
     */
321
    public function startConsuming(int $messages, int $seconds, int $maxMemory)
322
    {
323
        $this->setupConsumer($messages, $seconds, $maxMemory);
324
        while (false === $this->shouldStopConsuming()) {
325
            try {
326
                $this->getChannel()->wait(null, false, $seconds);
327
            } catch (AMQPTimeoutException $e) {
328
                if ($this->shouldStopConsuming()) {
329
                    break;
330
                }
331
                usleep(1000);
332
                $this->getConnection()->reconnect();
333
                $this->setupChannelConsumer();
334
            } catch (\Throwable $e) {
335
                // stop the consumer
336
                $this->stopConsuming();
337
                $this->logger->notice(sprintf(
0 ignored issues
show
The method notice() does not exist on null. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

337
                $this->logger->/** @scrutinizer ignore-call */ 
338
                               notice(sprintf(

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
338
                    "Stopped consuming: %s in %s:%d",
339
                    get_class($e) . ' - ' . $e->getMessage(),
340
                    (string)$e->getFile(),
341
                    (int)$e->getLine()
342
                ));
343
                return 1;
344
            }
345
        }
346
        return 0;
347
    }
348
349
    /**
350
     * @return bool
351
     */
352
    protected function shouldStopConsuming(): bool
353
    {
354
        if ((microtime(true) - $this->startTime) > $this->limitSecondsUptime) {
355
            $this->logger->debug(
356
                "Stopped consumer",
357
                [
358
                    'limit' => 'time_limit',
359
                    'value' => sprintf("%.2f", microtime(true) - $this->startTime)
360
                ]
361
            );
362
            return true;
363
        }
364
        if (memory_get_peak_usage(true) >= ($this->limitMemoryConsumption * 1048576)) {
365
            $this->logger->debug(
366
                "Stopped consumer",
367
                [
368
                    'limit' => 'memory_limit',
369
                    'value' => (int)round(memory_get_peak_usage(true) / 1048576, 2)
370
                ]
371
            );
372
            return true;
373
        }
374
375
        if ($this->getMessageProcessor()->getProcessedMessages() >= $this->limitMessageCount) {
376
            $this->logger->debug(
377
                "Stopped consumer",
378
                ['limit' => 'message_count', 'value' => (int)$this->getMessageProcessor()->getProcessedMessages()]
379
            );
380
            return true;
381
        }
382
        return false;
383
    }
384
385
    /**
386
     * Stop the consumer
387
     */
388
    public function stopConsuming()
389
    {
390
        try {
391
            $this->getChannel()->basic_cancel($this->getConsumerTag(), false, true);
392
        } catch (\Throwable $e) {
393
            $this->logger->notice("Got " . $e->getMessage() . " of type " . get_class($e));
394
        }
395
    }
396
397
    /**
398
     * Setup the consumer
399
     *
400
     * @param int $messages
401
     * @param int $seconds
402
     * @param int $maxMemory
403
     */
404
    protected function setupConsumer(int $messages, int $seconds, int $maxMemory)
405
    {
406
        $this->limitMessageCount = $messages;
407
        $this->limitSecondsUptime = $seconds;
408
        $this->limitMemoryConsumption = $maxMemory;
409
410
        $this->startTime = microtime(true);
0 ignored issues
show
Documentation Bug introduced by
It seems like microtime(true) can also be of type string. However, the property $startTime is declared as type double. Maybe add an additional type check?

Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a mixed type is assigned to a property that is type hinted more strictly.

For example, imagine you have a variable $accountId that can either hold an Id object or false (if there is no account id yet). Your code now assigns that value to the id property of an instance of the Account class. This class holds a proper account, so the id value must no longer be false.

Either this assignment is in error or a type check should be added for that assignment.

class Id
{
    public $id;

    public function __construct($id)
    {
        $this->id = $id;
    }

}

class Account
{
    /** @var  Id $id */
    public $id;
}

$account_id = false;

if (starsAreRight()) {
    $account_id = new Id(42);
}

$account = new Account();
if ($account instanceof Id)
{
    $account->id = $account_id;
}
Loading history...
411
412
        $this->setupChannelConsumer();
413
414
        $this->registerShutdownHandler();
415
        $this->handleKillSignals();
416
    }
417
418
    private function setupChannelConsumer()
419
    {
420
        if ($this->attributes['auto_create'] === true) {
421
            $this->create();
422
            $this->bind();
423
        }
424
425
        $this->getChannel()
426
             ->basic_qos(null, $this->prefetchCount, $this->globalPrefetch);
427
428
        $this->getChannel()
429
            ->basic_consume(
430
                $this->attributes['name'],
431
                $this->getConsumerTag(),
432
                false,
433
                false,
434
                false,
435
                false,
436
                [
437
                    $this,
438
                    'consume'
439
                ]
440
            );
441
    }
442
443
    /**
444
     * Handle shutdown - Usually in case "Allowed memory size of x bytes exhausted"
445
     */
446
    private function registerShutdownHandler()
447
    {
448
        $consumer = $this;
449
        register_shutdown_function(function () use ($consumer) {
450
            $consumer->stopConsuming();
451
        });
452
    }
453
454
    /**
455
     * Register signals
456
     */
457
    protected function handleKillSignals()
458
    {
459 1
        if (extension_loaded('pcntl')) {
460
            pcntl_signal(SIGTERM, [$this, 'catchKillSignal']);
461 1
            pcntl_signal(SIGINT, [$this, 'catchKillSignal']);
462
463
            if (function_exists('pcntl_signal_dispatch')) {
464
                // let the signal go forward
465
                pcntl_signal_dispatch();
466
            }
467 1
        }
468
    }
469
470
    /**
471
     * Handle Kill Signals
472
     * @param int $signalNumber
473
     */
474 1
    public function catchKillSignal(int $signalNumber)
475
    {
476
        $this->stopConsuming();
477 1
        $this->logger->debug(sprintf("Caught signal %d", $signalNumber));
478 1
    }
479
480
    /**
481
     * It is the tag that is listed in RabbitMQ UI as the consumer "name"
482
     *
483
     * @return string
484
     */
485
    private function getConsumerTag(): string
486
    {
487
        return sprintf("%s_%s_%s", $this->aliasName, gethostname(), getmypid());
488
    }
489
490
    /**
491
     * @return MessageProcessorInterface
492
     */
493 1
    private function getMessageProcessor(): MessageProcessorInterface
494
    {
495
        if (!($this->messageProcessor instanceof MessageProcessorInterface)) {
496
            $this->messageProcessor = app($this->messageProcessor);
0 ignored issues
show
Documentation Bug introduced by
It seems like app($this->messageProcessor) can also be of type Illuminate\Contracts\Foundation\Application or Illuminate\Foundation\Application. However, the property $messageProcessor is declared as type NeedleProject\LaravelRab...orInterface|null|string. Maybe add an additional type check?

Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a mixed type is assigned to a property that is type hinted more strictly.

For example, imagine you have a variable $accountId that can either hold an Id object or false (if there is no account id yet). Your code now assigns that value to the id property of an instance of the Account class. This class holds a proper account, so the id value must no longer be false.

Either this assignment is in error or a type check should be added for that assignment.

class Id
{
    public $id;

    public function __construct($id)
    {
        $this->id = $id;
    }

}

class Account
{
    /** @var  Id $id */
    public $id;
}

$account_id = false;

if (starsAreRight()) {
    $account_id = new Id(42);
}

$account = new Account();
if ($account instanceof Id)
{
    $account->id = $account_id;
}
Loading history...
497
            if ($this->messageProcessor instanceof AbstractMessageProcessor) {
498
                $this->messageProcessor->setLogger($this->logger);
0 ignored issues
show
It seems like $this->logger can also be of type null; however, parameter $logger of NeedleProject\LaravelRab...eProcessor::setLogger() does only seem to accept Psr\Log\LoggerInterface, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

498
                $this->messageProcessor->setLogger(/** @scrutinizer ignore-type */ $this->logger);
Loading history...
499
            }
500
        }
501
        return $this->messageProcessor;
502
    }
503
504
    /**
505
     * @param AMQPMessage $message
506
     * @throws \Throwable
507
     */
508
    public function consume(AMQPMessage $message)
509
    {
510
        try {
511
            $this->getMessageProcessor()->consume($message);
512
            $this->logger->debug("Consumed message", ['message' => $message->getBody()]);
513
        } catch (\Throwable $e) {
514
            $this->logger->notice(
515
                sprintf(
516
                    "Got %s from %s in %d",
517
                    $e->getMessage(),
518
                    (string)$e->getFile(),
519
                    (int)$e->getLine()
520
                )
521
            );
522
            // let the exception slide, the processor should handle
523
            // exception, this is just a notice that should not
524
            // ever appear
525
            throw $e;
526
        }
527
    }
528
}
529