Completed
Push — master ( 5da8f0...b74684 )
by Andrii
12:49
created

src/console/QueueController.php (2 issues)

Upgrade to new PHP Analysis Engine

These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more

1
<?php
2
3
namespace hiapi\console;
4
5
use hiapi\exceptions\NotProcessableException;
6
use hiqdev\yii2\autobus\components\AutoBusFactoryInterface;
7
use hiqdev\yii2\autobus\components\AutoBusInterface;
8
use hiqdev\yii2\autobus\exceptions\WrongCommandException;
9
use PhpAmqpLib\Channel\AMQPChannel;
10
use PhpAmqpLib\Connection\AMQPStreamConnection;
11
use PhpAmqpLib\Message\AMQPMessage;
12
use PhpAmqpLib\Wire\AMQPTable;
13
use Psr\Log\LoggerInterface;
14
use yii\base\Module;
15
use yii\console\ExitCode;
16
use yii\helpers\Console;
17
18
/**
19
 * Class QueueController
20
 *
21
 * @author Dmytro Naumenko <[email protected]>
22
 */
23
class QueueController extends \yii\console\Controller
24
{
25
    /**
26
     * @var LoggerInterface
27
     */
28
    protected $logger;
29
    /**
30
     * @var AMQPStreamConnection
31
     */
32
    protected $amqp;
33
    /**
34
     * @var AutoBusFactoryInterface
35
     */
36
    private $busFactory;
37
38
    public function __construct(
39
        $id,
40
        Module $module,
41
        AMQPStreamConnection $amqp,
42
        LoggerInterface $logger,
43
        AutoBusFactoryInterface $busFactory,
44
        array $config = []
45
    ) {
46
        $this->logger = $logger;
47
        $this->amqp = $amqp;
48
        $this->busFactory = $busFactory;
49
50
        parent::__construct($id, $module, $config);
51
    }
52
53
    /**
54
     * @return \PhpAmqpLib\Channel\AMQPChannel
55
     */
56
    private function createChannel(string $queue): AMQPChannel
57
    {
58
        $channel = $this->amqp->channel();
59
        $channel->queue_declare($queue, false, true, false, false);
60
61
        return $channel;
62
    }
63
64
    /**
65
     * @param string $queueName
66
     * @param int $messagesCount
67
     * @return int
68
     */
69
    public function actionConsume(string $queueName, $messagesCount = 100)
70
    {
71
        $channel = $this->createChannel($queueName);
72
        $bus = $this->busFactory->get($queueName);
73
74
        Console::output(' [*] Waiting for messages. To exit press CTRL+C');
75
76
        $callback = function (AMQPMessage $msg) use (&$messagesCount, $queueName, $channel, $bus) {
77
            Console::output(' [x] Received ' . $msg->body);
78
            $channel->basic_ack($msg->delivery_info['delivery_tag']);
79
            $messagesCount--;
80
81
            try {
82
                $this->handle($bus, $msg);
83
            } catch (NotProcessableException $e) {
84
                $this->requeue($queueName, $msg, $e);
85
            } catch (\Exception $e) {
86
                $this->handleError($queueName, $msg, $e);
87
            }
88
        };
89
90
        $channel->basic_qos(null, 1, null);
0 ignored issues
show
null is of type null, but the function expects a boolean.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
91
        $channel->basic_consume($queueName, '', false, false, false, false, $callback);
92
93
        while ($channel->callbacks && $messagesCount > 0) {
94
            $channel->wait();
95
        }
96
97
        Console::output(' [x] Reached consumed messages limit. Stopping process.');
98
99
        return ExitCode::OK;
100
    }
101
102
    private function handleError(string $queueName, AMQPMessage $message, \Exception $exception)
103
    {
104
        Console::error(' [E] Error: ' . $exception->getMessage());
105
        $this->logger->warning('Failed to handle message: ' . $exception->getMessage(), ['message' => $message, 'exception' => $exception]);
106
        $this->storeRejected($queueName, $message, $exception);
107
    }
108
109
    /**
110
     * Decodes AMQP message and sends it to the handler
111
     * // TODO: move to separate class?
112
     *
113
     * @param AMQPMessage $msg
114
     * @throws WrongCommandException
115
     */
116
    protected function handle(AutoBusInterface $bus, AMQPMessage $msg): void
117
    {
118
        if ($msg->get_properties()['content_type'] !== 'application/json') {
119
            throw new \RuntimeException('Do not know how to decode ' . $msg->getContentEncoding());
120
        }
121
122
        $body = json_decode($msg->getBody(), true);
123
        if (!isset($body['name'])) {
124
            throw new WrongCommandException('Message must have a name');
125
        }
126
        $parts = explode('\\', $body['name']);
127
        $name = array_pop($parts);
128
129
        $bus->runCommand($name, $body);
130
    }
131
132
    /**
133
     * Resends message to queue with a delay
134
     *
135
     * @param string $queueName
136
     * @param AMQPMessage $msg
137
     * @param NotProcessableException $exception
138
     */
139
    private function requeue(string $queueName, AMQPMessage $msg, NotProcessableException $exception): void
140
    {
141
        $tries = 0;
142
        $headers = $msg->get_properties()['application_headers'];
143
        if ($headers instanceof AMQPTable) {
144
            $tries = $headers->getNativeData()['x-number-of-tries'] ?? 0;
145
        }
146
147
        if ($exception->getMaxTries() !== null && $tries >= $exception->getMaxTries()) {
148
            $this->logger->debug('No tries left for message. Marking it as an error', ['message' => $msg, 'exception' => $exception]);
149
            $this->handleError($queueName, $msg, $exception);
150
            return;
151
        }
152
153
        // Init delay exchange
154
        $channel = $this->amqp->channel();
155
        $delayExchange = "$queueName.delayed";
156
        $channel->exchange_declare($delayExchange, 'x-delayed-message', false, true, true, false, false, new AMQPTable([
0 ignored issues
show
new \PhpAmqpLib\Wire\AMQ...yed-type' => 'direct')) is of type object<PhpAmqpLib\Wire\AMQPTable>, but the function expects a array.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
157
            'x-delayed-type' => 'direct',
158
        ]));
159
        $channel->queue_bind($queueName, $delayExchange);
160
161
        // Send message
162
        $delayDuration = 1000 * $exception->getSecondsBeforeRetry() * (int)pow($exception->getProgressionMultiplier(), $tries);
163
        $delayMessage = new AMQPMessage($msg->getBody(), array_merge($msg->get_properties(), [
164
            'application_headers' => new AMQPTable([
165
                'x-delay' => $delayDuration,
166
                'x-number-of-tries' => $tries + 1,
167
            ]),
168
        ]));
169
        $channel->basic_publish($delayMessage, $delayExchange, '');
170
        $this->logger->debug('Delayed message for ' . $delayDuration, ['message' => $msg, 'exception' => $exception]);
171
    }
172
173
    private function storeRejected(string $queueName, AMQPMessage $message, \Exception $exception): void
174
    {
175
        // TODO: store $exception as well
176
        $channel = $this->createChannel("$queueName.failed");
177
        $channel->basic_publish($message, "$queueName.failed");
178
    }
179
}
180