Completed
Push — master ( 932cc5...b796f0 )
by Andrii
12:16
created

QueueController::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 14
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 14
c 0
b 0
f 0
rs 9.4285
cc 1
eloc 11
nc 1
nop 6
1
<?php
2
3
namespace hiapi\console;
4
5
use hiapi\bus\ApiCommandsBusInterface;
6
use hiapi\commands\BaseCommand;
7
use hiqdev\yii2\autobus\components\AutoBusFactoryInterface;
8
use hiqdev\yii2\autobus\components\AutoBusInterface;
9
use PhpAmqpLib\Channel\AMQPChannel;
10
use PhpAmqpLib\Connection\AMQPStreamConnection;
11
use PhpAmqpLib\Message\AMQPMessage;
12
use Psr\Log\LoggerInterface;
13
use yii\base\Module;
14
use yii\console\ExitCode;
15
use yii\helpers\Console;
16
17
/**
18
 * Class QueueController
19
 *
20
 * @author Dmytro Naumenko <[email protected]>
21
 */
22
class QueueController extends \yii\console\Controller
23
{
24
    /**
25
     * @var LoggerInterface
26
     */
27
    private $logger;
28
    /**
29
     * @var AMQPStreamConnection
30
     */
31
    private $amqp;
32
    /**
33
     * @var BusFactoryInterface
34
     */
35
    private $busFactory;
36
37
    public function __construct(
38
        $id,
39
        Module $module,
40
        AMQPStreamConnection $amqp,
41
        LoggerInterface $logger,
42
        AutoBusFactoryInterface $busFactory,
43
        array $config = []
44
    ) {
45
        parent::__construct($id, $module, $config);
46
47
        $this->logger = $logger;
48
        $this->amqp = $amqp;
49
        $this->busFactory = $busFactory;
0 ignored issues
show
Documentation Bug introduced by
It seems like $busFactory of type object<hiqdev\yii2\autob...utoBusFactoryInterface> is incompatible with the declared type object<hiapi\console\BusFactoryInterface> of property $busFactory.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
50
    }
51
52
    /**
53
     * @return \PhpAmqpLib\Channel\AMQPChannel
54
     */
55
    private function createChannel(string $queue): AMQPChannel
56
    {
57
        $channel = $this->amqp->channel();
58
        $channel->queue_declare($queue, false, true, false, false);
59
60
        return $channel;
61
    }
62
63
    /**
64
     * @param string $queueName
65
     * @param int $messagesCount
66
     * @return int
67
     */
68
    public function actionConsume(string $queueName, $messagesCount = 100)
69
    {
70
        $channel = $this->createChannel($queueName);
71
        $bus = $this->busFactory->get($queueName);
72
73
        Console::output(' [*] Waiting for messages. To exit press CTRL+C');
74
75
        $callback = function (AMQPMessage $msg) use (&$messagesCount, $channel, $bus) {
76
            Console::output(' [x] Received ' . $msg->body);
77
            $channel->basic_ack($msg->delivery_info['delivery_tag']);
78
            $messagesCount--;
79
80
            try {
81
                $this->handle($bus, $msg);
82
            } catch (\Error $e) {
0 ignored issues
show
Bug introduced by
The class Error does not exist. Did you forget a USE statement, or did you not list all dependencies?

Scrutinizer analyzes your composer.json/composer.lock file if available to determine the classes, and functions that are defined by your dependencies.

It seems like the listed class was neither found in your dependencies, nor was it found in the analyzed files in your repository. If you are using some other form of dependency management, you might want to disable this analysis.

Loading history...
83
                Console::error(' [E] Error: ' . $e->getMessage());
84
                $this->logger->error('Failed to handle message', ['message' => $msg, 'exception' => $e]);
85
            }
86
        };
87
88
        $channel->basic_qos(null, 1, null);
0 ignored issues
show
Documentation introduced by
null is of type null, but the function expects a boolean.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
89
        $channel->basic_consume($queueName, '', false, false, false, false, $callback);
90
91
        while ($channel->callbacks && $messagesCount > 0) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $channel->callbacks of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
92
            $channel->wait();
93
        }
94
95
        Console::output(' [x] Reached consumed messages limit. Stopping process.');
96
97
        return ExitCode::OK;
98
    }
99
100
    /**
101
     * Decodes AMQP message and sends it to the handler
102
     * // TODO: move to separate class
103
     *
104
     * @param AMQPMessage $msg
105
     */
106
    protected function handle(AutoBusInterface $bus, AMQPMessage $msg)
107
    {
108
        if ($msg->get_properties()['content_type'] !== 'application/json') {
109
            throw new \RuntimeException('Do not know how to decode ' . $msg->getContentEncoding());
110
        }
111
112
        $body = json_decode($msg->getBody(), true);
113
        if (!isset($body['name'])) {
114
            $this->logger->error('Message is not supported', ['message' => $body]);
115
        }
116
        $parts = explode('\\', $body['name']);
117
        $name =  array_pop($parts);
118
119
        try {
120
            $bus->runCommand($name, $body);
121
        } catch (\Exception $e) {
122
            $this->logger->error('Failed to load message to command', ['message' => $body, 'exception' => $e]);
123
        }
124
    }
125
}
126