Consumer::work()   A
last analyzed

Complexity

Conditions 3
Paths 4

Size

Total Lines 33
Code Lines 22

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 3
eloc 22
c 1
b 0
f 0
nc 4
nop 1
dl 0
loc 33
rs 9.568
1
<?php
2
/**
3
 * This file is part of order_management
4
 * User: Sinan TURGUT <[email protected]>
5
 * Date: 24.06.2019
6
 * php version 7.2
7
 *
8
 * @category Assessment
9
 * @package  OrderManagement
10
 * @author   Sinan TURGUT <[email protected]>
11
 * @license  See LICENSE file
12
 * @link     https://dev.sinanturgut.com.tr
13
 */
14
15
namespace OrderManagement\RabbitMQ;
16
17
/**
18
 * Class Consumer
19
 * @package OrderManagement\RabbitMQ
20
 */
21
class Consumer
22
{
23
    /**
24
     * @var AbstractQueue
25
     */
26
    protected $queue;
27
28
    /**
29
     * @var array
30
     */
31
    protected $callbacks = [];
32
33
    /**
34
     * @param QueueInterface $queue
35
     */
36
    public function __construct(QueueInterface $queue)
37
    {
38
        $this->queue = $queue;
0 ignored issues
show
Documentation Bug introduced by
$queue is of type OrderManagement\RabbitMQ\QueueInterface, but the property $queue was declared to be of type OrderManagement\RabbitMQ\AbstractQueue. Are you sure that you always receive this specific sub-class here, or does it make sense to add an instanceof check?

Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a given class or a super-class is assigned to a property that is type hinted more strictly.

Either this assignment is in error or an instanceof check should be added for that assignment.

class Alien {}

class Dalek extends Alien {}

class Plot
{
    /** @var  Dalek */
    public $villain;
}

$alien = new Alien();
$plot = new Plot();
if ($alien instanceof Dalek) {
    $plot->villain = $alien;
}
Loading history...
39
    }
40
41
    /**
42
     * @param $callable
43
     */
44
    public function addCallback($callable)
45
    {
46
        $this->callbacks[] = $callable;
47
    }
48
49
    /**
50
     * @param callable $stream
51
     * @throws \ErrorException
52
     */
53
    public function work(callable $stream)
54
    {
55
        $connection = new Connection();
56
        $channel = $connection->channel();
57
58
        $channel->queue_declare(
59
            $this->queue->getName(),
60
            $this->queue->isPassive(),
61
            $this->queue->isDurable(),
62
            $this->queue->isExclusive(),
63
            $this->queue->isAutoDelete()
64
        );
65
66
        $stream('[*] Waiting for messages. To exit press CTRL+C');
67
68
        foreach ($this->callbacks as $callback) {
69
            $channel->basic_consume(
70
                $this->queue->getName(),
71
                '',
72
                false,
73
                true,
74
                false,
75
                false,
76
                $callback
77
            );
78
        }
79
80
        while (count($channel->callbacks)) {
81
            $channel->wait();
82
        }
83
84
        $channel->close();
85
        $connection->close();
86
    }
87
}
88