1 | <?php |
||||||
2 | |||||||
3 | namespace Freyo\LaravelQueueRocketMQ\Queue; |
||||||
4 | |||||||
5 | use Freyo\LaravelQueueRocketMQ\Queue\Jobs\RocketMQJob; |
||||||
6 | use Illuminate\Container\Container; |
||||||
7 | use Illuminate\Contracts\Queue\Queue as QueueContract; |
||||||
8 | use Illuminate\Queue\Queue; |
||||||
9 | use Illuminate\Support\Arr; |
||||||
10 | use MQ\Model\TopicMessage; |
||||||
11 | use MQ\MQClient; |
||||||
12 | |||||||
13 | class RocketMQQueue extends Queue implements QueueContract |
||||||
14 | { |
||||||
15 | /** |
||||||
16 | * @var array |
||||||
17 | */ |
||||||
18 | protected $config; |
||||||
19 | |||||||
20 | /** |
||||||
21 | * @var \ReflectionMethod |
||||||
22 | */ |
||||||
23 | private $createPayload; |
||||||
24 | |||||||
25 | /** |
||||||
26 | * @var \MQ\MQClient |
||||||
27 | */ |
||||||
28 | protected $client; |
||||||
29 | |||||||
30 | /** |
||||||
31 | * RocketMQQueue constructor. |
||||||
32 | * |
||||||
33 | * @param \MQ\MQClient $client |
||||||
34 | * @param array $config |
||||||
35 | * |
||||||
36 | * @throws \ReflectionException |
||||||
37 | */ |
||||||
38 | public function __construct(MQClient $client, array $config) |
||||||
39 | { |
||||||
40 | $this->client = $client; |
||||||
41 | $this->config = $config; |
||||||
42 | |||||||
43 | $this->createPayload = new \ReflectionMethod($this, 'createPayload'); |
||||||
44 | } |
||||||
45 | |||||||
46 | /** |
||||||
47 | * @return bool |
||||||
48 | */ |
||||||
49 | 1 | public function isPlain() |
|||||
50 | { |
||||||
51 | 1 | return (bool)Arr::get($this->config, 'plain.enable'); |
|||||
52 | } |
||||||
53 | |||||||
54 | /** |
||||||
55 | * @return string |
||||||
56 | */ |
||||||
57 | 1 | public function getPlainJob() |
|||||
58 | { |
||||||
59 | 1 | return Arr::get($this->config, 'plain.job'); |
|||||
60 | } |
||||||
61 | |||||||
62 | /** |
||||||
63 | * Get the size of the queue. |
||||||
64 | * |
||||||
65 | * @param string $queue |
||||||
66 | * |
||||||
67 | * @return int |
||||||
68 | */ |
||||||
69 | 1 | public function size($queue = null) |
|||||
70 | { |
||||||
71 | 1 | return 1; |
|||||
72 | } |
||||||
73 | |||||||
74 | /** |
||||||
75 | * Push a new job onto the queue. |
||||||
76 | * |
||||||
77 | * @param string|object $job |
||||||
78 | * @param mixed $data |
||||||
79 | * @param string $queue |
||||||
80 | * |
||||||
81 | * @throws \Exception |
||||||
82 | * |
||||||
83 | * @return mixed |
||||||
84 | */ |
||||||
85 | public function push($job, $data = '', $queue = null) |
||||||
86 | { |
||||||
87 | if ($this->isPlain()) { |
||||||
88 | return $this->pushRaw($job->getPayload(), $queue); |
||||||
89 | } |
||||||
90 | |||||||
91 | $payload = $this->createPayload->getNumberOfParameters() === 3 |
||||||
92 | ? $this->createPayload($job, $queue, $data) // version >= 5.7 |
||||||
0 ignored issues
–
show
It seems like
$job can also be of type object ; however, parameter $job of Illuminate\Queue\Queue::createPayload() does only seem to accept string , maybe add an additional type check?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||||
93 | : $this->createPayload($job, $data); |
||||||
94 | |||||||
95 | return $this->pushRaw($payload, $queue); |
||||||
96 | } |
||||||
97 | |||||||
98 | /** |
||||||
99 | * Push a raw payload onto the queue. |
||||||
100 | * |
||||||
101 | * @param string $payload |
||||||
102 | * @param string $queue |
||||||
103 | * @param array $options |
||||||
104 | * |
||||||
105 | * @throws \Exception |
||||||
106 | * |
||||||
107 | * @return TopicMessage |
||||||
108 | */ |
||||||
109 | public function pushRaw($payload, $queue = null, array $options = []) |
||||||
110 | { |
||||||
111 | $message = new TopicMessage($payload); |
||||||
112 | |||||||
113 | if ($this->config['use_message_tag'] && $queue) { |
||||||
114 | $message->setMessageTag($queue); |
||||||
115 | } |
||||||
116 | |||||||
117 | if ($delay = Arr::get($options, 'delay', 0)) { |
||||||
118 | $message->setStartDeliverTime(time() * 1000 + $delay * 1000); |
||||||
119 | } |
||||||
120 | |||||||
121 | return $this->getProducer( |
||||||
122 | $this->config['use_message_tag'] ? $this->config['queue'] : $queue |
||||||
123 | )->publishMessage($message); |
||||||
124 | } |
||||||
125 | |||||||
126 | /** |
||||||
127 | * Push a new job onto the queue after a delay. |
||||||
128 | * |
||||||
129 | * @param \DateTimeInterface|\DateInterval|int $delay |
||||||
130 | * @param string|object $job |
||||||
131 | * @param mixed $data |
||||||
132 | * @param string $queue |
||||||
133 | * |
||||||
134 | * @throws \Exception |
||||||
135 | * |
||||||
136 | * @return mixed |
||||||
137 | */ |
||||||
138 | public function later($delay, $job, $data = '', $queue = null) |
||||||
139 | { |
||||||
140 | $delay = method_exists($this, 'getSeconds') |
||||||
141 | ? $this->getSeconds($delay) |
||||||
142 | : $this->secondsUntil($delay); |
||||||
143 | |||||||
144 | if ($this->isPlain()) { |
||||||
145 | return $this->pushRaw($job->getPayload(), $queue, ['delay' => $delay]); |
||||||
146 | } |
||||||
147 | |||||||
148 | $payload = $this->createPayload->getNumberOfParameters() === 3 |
||||||
149 | ? $this->createPayload($job, $queue, $data) // version >= 5.7 |
||||||
0 ignored issues
–
show
It seems like
$job can also be of type object ; however, parameter $job of Illuminate\Queue\Queue::createPayload() does only seem to accept string , maybe add an additional type check?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() The call to
Illuminate\Queue\Queue::createPayload() has too many arguments starting with $data .
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
This check compares calls to functions or methods with their respective definitions. If the call has more arguments than are defined, it raises an issue. If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress. Please note the @ignore annotation hint above. ![]() |
|||||||
150 | : $this->createPayload($job, $data); |
||||||
151 | |||||||
152 | return $this->pushRaw($payload, $queue, ['delay' => $delay]); |
||||||
153 | } |
||||||
154 | |||||||
155 | /** |
||||||
156 | * Pop the next job off of the queue. |
||||||
157 | * |
||||||
158 | * @param string $queue |
||||||
159 | * |
||||||
160 | * @return \Illuminate\Contracts\Queue\Job|null |
||||||
161 | * |
||||||
162 | * @throws \Exception |
||||||
163 | */ |
||||||
164 | public function pop($queue = null) |
||||||
165 | { |
||||||
166 | try { |
||||||
167 | |||||||
168 | $consumer = $this->config['use_message_tag'] |
||||||
169 | ? $this->getConsumer($this->config['queue'], $queue) |
||||||
170 | : $this->getConsumer($queue); |
||||||
171 | |||||||
172 | /** @var array $messages */ |
||||||
173 | $messages = $consumer->consumeMessage(1, $this->config['wait_seconds']); |
||||||
174 | |||||||
175 | } catch (\Exception $e) { |
||||||
176 | if ($e instanceof \MQ\Exception\MessageNotExistException) { |
||||||
177 | return null; |
||||||
178 | } |
||||||
179 | |||||||
180 | throw $e; |
||||||
181 | } |
||||||
182 | |||||||
183 | return new RocketMQJob( |
||||||
184 | $this->container ?: Container::getInstance(), |
||||||
185 | $this, |
||||||
186 | Arr::first($messages), |
||||||
0 ignored issues
–
show
It seems like
Illuminate\Support\Arr::first($messages) can also be of type null ; however, parameter $message of Freyo\LaravelQueueRocket...ketMQJob::__construct() does only seem to accept MQ\Model\Message , maybe add an additional type check?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||||
187 | $this->config['use_message_tag'] ? $this->config['queue'] : $queue, |
||||||
188 | $this->connectionName ?? null |
||||||
189 | ); |
||||||
190 | } |
||||||
191 | |||||||
192 | /** |
||||||
193 | * Get the consumer. |
||||||
194 | * |
||||||
195 | * @param string $topicName |
||||||
196 | * @param string $messageTag |
||||||
197 | * |
||||||
198 | * @return \MQ\MQConsumer |
||||||
199 | */ |
||||||
200 | 1 | public function getConsumer($topicName = null, $messageTag = null) |
|||||
201 | { |
||||||
202 | 1 | return $this->client->getConsumer( |
|||||
203 | 1 | $this->config['instance_id'], |
|||||
204 | 1 | $topicName ?: $this->config['queue'], |
|||||
205 | 1 | $this->config['group_id'], |
|||||
206 | 1 | $messageTag |
|||||
207 | ); |
||||||
208 | } |
||||||
209 | |||||||
210 | /** |
||||||
211 | * Get the producer. |
||||||
212 | * |
||||||
213 | * @param string $topicName |
||||||
214 | * |
||||||
215 | * @return \MQ\MQProducer |
||||||
216 | */ |
||||||
217 | 1 | public function getProducer($topicName = null) |
|||||
218 | { |
||||||
219 | 1 | return $this->client->getProducer( |
|||||
220 | 1 | $this->config['instance_id'], |
|||||
221 | 1 | $topicName ?: $this->config['queue'] |
|||||
222 | ); |
||||||
223 | } |
||||||
224 | } |
||||||
225 |
This check compares calls to functions or methods with their respective definitions. If the call has more arguments than are defined, it raises an issue.
If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress. Please note the @ignore annotation hint above.