These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more
1 | <?php |
||
2 | |||
3 | namespace hiapi\console; |
||
4 | |||
5 | use hiapi\exceptions\NotProcessableException; |
||
6 | use hiqdev\yii2\autobus\components\AutoBusFactoryInterface; |
||
7 | use hiqdev\yii2\autobus\components\AutoBusInterface; |
||
8 | use hiqdev\yii2\autobus\exceptions\WrongCommandException; |
||
9 | use PhpAmqpLib\Channel\AMQPChannel; |
||
10 | use PhpAmqpLib\Connection\AMQPStreamConnection; |
||
11 | use PhpAmqpLib\Message\AMQPMessage; |
||
12 | use PhpAmqpLib\Wire\AMQPTable; |
||
13 | use Psr\Log\LoggerInterface; |
||
14 | use yii\base\Module; |
||
15 | use yii\console\ExitCode; |
||
16 | use yii\helpers\Console; |
||
17 | |||
18 | /** |
||
19 | * Class QueueController |
||
20 | * |
||
21 | * @author Dmytro Naumenko <[email protected]> |
||
22 | */ |
||
23 | class QueueController extends \yii\console\Controller |
||
24 | { |
||
25 | /** |
||
26 | * @var LoggerInterface |
||
27 | */ |
||
28 | protected $logger; |
||
29 | /** |
||
30 | * @var AMQPStreamConnection |
||
31 | */ |
||
32 | protected $amqp; |
||
33 | /** |
||
34 | * @var AutoBusFactoryInterface |
||
35 | */ |
||
36 | private $busFactory; |
||
37 | |||
38 | public function __construct( |
||
39 | $id, |
||
40 | Module $module, |
||
41 | AMQPStreamConnection $amqp, |
||
42 | LoggerInterface $logger, |
||
43 | AutoBusFactoryInterface $busFactory, |
||
44 | array $config = [] |
||
45 | ) { |
||
46 | $this->logger = $logger; |
||
47 | $this->amqp = $amqp; |
||
48 | $this->busFactory = $busFactory; |
||
49 | |||
50 | parent::__construct($id, $module, $config); |
||
51 | } |
||
52 | |||
53 | /** |
||
54 | * @return \PhpAmqpLib\Channel\AMQPChannel |
||
55 | */ |
||
56 | private function createChannel(string $queue): AMQPChannel |
||
57 | { |
||
58 | $channel = $this->amqp->channel(); |
||
59 | $channel->queue_declare($queue, false, true, false, false); |
||
60 | |||
61 | return $channel; |
||
62 | } |
||
63 | |||
64 | /** |
||
65 | * @param string $queueName |
||
66 | * @param int $messagesCount |
||
67 | * @return int |
||
68 | */ |
||
69 | public function actionConsume(string $queueName, $messagesCount = 100) |
||
70 | { |
||
71 | $channel = $this->createChannel($queueName); |
||
72 | $bus = $this->busFactory->get($queueName); |
||
73 | |||
74 | Console::output(' [*] Waiting for messages. To exit press CTRL+C'); |
||
75 | |||
76 | $callback = function (AMQPMessage $msg) use (&$messagesCount, $queueName, $channel, $bus) { |
||
77 | Console::output(' [x] Received ' . $msg->body); |
||
78 | $channel->basic_ack($msg->delivery_info['delivery_tag']); |
||
79 | $messagesCount--; |
||
80 | |||
81 | try { |
||
82 | $this->handle($bus, $msg); |
||
83 | } catch (NotProcessableException $e) { |
||
84 | $this->requeue($queueName, $msg, $e); |
||
85 | } catch (\Exception $e) { |
||
86 | $this->handleError($queueName, $msg, $e); |
||
87 | } |
||
88 | }; |
||
89 | |||
90 | $channel->basic_qos(null, 1, null); |
||
0 ignored issues
–
show
|
|||
91 | $channel->basic_consume($queueName, '', false, false, false, false, $callback); |
||
92 | |||
93 | while ($channel->callbacks && $messagesCount > 0) { |
||
94 | $channel->wait(); |
||
95 | } |
||
96 | |||
97 | Console::output(' [x] Reached consumed messages limit. Stopping process.'); |
||
98 | |||
99 | return ExitCode::OK; |
||
100 | } |
||
101 | |||
102 | private function handleError(string $queueName, AMQPMessage $message, \Exception $exception) |
||
103 | { |
||
104 | Console::error(' [E] Error: ' . $exception->getMessage()); |
||
105 | $this->logger->warning('Failed to handle message: ' . $exception->getMessage(), ['message' => $message, 'exception' => $exception]); |
||
106 | $this->storeRejected($queueName, $message, $exception); |
||
107 | } |
||
108 | |||
109 | /** |
||
110 | * Decodes AMQP message and sends it to the handler |
||
111 | * // TODO: move to separate class? |
||
112 | * |
||
113 | * @param AMQPMessage $msg |
||
114 | * @throws WrongCommandException |
||
115 | */ |
||
116 | protected function handle(AutoBusInterface $bus, AMQPMessage $msg): void |
||
117 | { |
||
118 | if ($msg->get_properties()['content_type'] !== 'application/json') { |
||
119 | throw new \RuntimeException('Do not know how to decode ' . $msg->getContentEncoding()); |
||
120 | } |
||
121 | |||
122 | $body = json_decode($msg->getBody(), true); |
||
123 | if (!isset($body['name'])) { |
||
124 | throw new WrongCommandException('Message must have a name'); |
||
125 | } |
||
126 | $parts = explode('\\', $body['name']); |
||
127 | $name = array_pop($parts); |
||
128 | |||
129 | $bus->runCommand($name, $body); |
||
130 | } |
||
131 | |||
132 | /** |
||
133 | * Resends message to queue with a delay |
||
134 | * |
||
135 | * @param string $queueName |
||
136 | * @param AMQPMessage $msg |
||
137 | * @param NotProcessableException $exception |
||
138 | */ |
||
139 | private function requeue(string $queueName, AMQPMessage $msg, NotProcessableException $exception): void |
||
140 | { |
||
141 | $tries = 0; |
||
142 | $headers = $msg->get_properties()['application_headers']; |
||
143 | if ($headers instanceof AMQPTable) { |
||
144 | $tries = $headers->getNativeData()['x-number-of-tries'] ?? 0; |
||
145 | } |
||
146 | |||
147 | if ($exception->getMaxTries() !== null && $tries >= $exception->getMaxTries()) { |
||
148 | $this->logger->debug('No tries left for message. Marking it as an error', ['message' => $msg, 'exception' => $exception]); |
||
149 | $this->handleError($queueName, $msg, $exception); |
||
150 | return; |
||
151 | } |
||
152 | |||
153 | // Init delay exchange |
||
154 | $channel = $this->amqp->channel(); |
||
155 | $delayExchange = "$queueName.delayed"; |
||
156 | $channel->exchange_declare($delayExchange, 'x-delayed-message', false, true, true, false, false, new AMQPTable([ |
||
0 ignored issues
–
show
new \PhpAmqpLib\Wire\AMQ...yed-type' => 'direct')) is of type object<PhpAmqpLib\Wire\AMQPTable> , but the function expects a array .
It seems like the type of the argument is not accepted by the function/method which you are calling. In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug. We suggest to add an explicit type cast like in the following example: function acceptsInteger($int) { }
$x = '123'; // string "123"
// Instead of
acceptsInteger($x);
// we recommend to use
acceptsInteger((integer) $x);
Loading history...
|
|||
157 | 'x-delayed-type' => 'direct', |
||
158 | ])); |
||
159 | $channel->queue_bind($queueName, $delayExchange); |
||
160 | |||
161 | // Send message |
||
162 | $delayDuration = 1000 * $exception->getSecondsBeforeRetry() * (int)pow($exception->getProgressionMultiplier(), $tries); |
||
163 | $delayMessage = new AMQPMessage($msg->getBody(), array_merge($msg->get_properties(), [ |
||
164 | 'application_headers' => new AMQPTable([ |
||
165 | 'x-delay' => $delayDuration, |
||
166 | 'x-number-of-tries' => $tries + 1, |
||
167 | ]), |
||
168 | ])); |
||
169 | $channel->basic_publish($delayMessage, $delayExchange, ''); |
||
170 | $this->logger->debug('Delayed message for ' . $delayDuration, ['message' => $msg, 'exception' => $exception]); |
||
171 | } |
||
172 | |||
173 | private function storeRejected(string $queueName, AMQPMessage $message, \Exception $exception): void |
||
174 | { |
||
175 | // TODO: store $exception as well |
||
176 | $channel = $this->createChannel("$queueName.failed"); |
||
177 | $channel->basic_publish($message, "$queueName.failed"); |
||
178 | } |
||
179 | } |
||
180 |
It seems like the type of the argument is not accepted by the function/method which you are calling.
In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.
We suggest to add an explicit type cast like in the following example: