| 1 | <?php |
||
| 2 | |||
| 3 | namespace Freyo\LaravelQueueCMQ\Queue; |
||
| 4 | |||
| 5 | use Freyo\LaravelQueueCMQ\Queue\Driver\Account; |
||
| 6 | use Freyo\LaravelQueueCMQ\Queue\Driver\CMQServerException; |
||
| 7 | use Freyo\LaravelQueueCMQ\Queue\Driver\Message; |
||
| 8 | use Freyo\LaravelQueueCMQ\Queue\Driver\Topic; |
||
| 9 | use Freyo\LaravelQueueCMQ\Queue\Jobs\CMQJob; |
||
| 10 | use Illuminate\Container\Container; |
||
| 11 | use Illuminate\Contracts\Queue\Queue as QueueContract; |
||
| 12 | use Illuminate\Queue\Queue; |
||
| 13 | use Illuminate\Support\Arr; |
||
| 14 | |||
| 15 | class CMQQueue extends Queue implements QueueContract |
||
| 16 | { |
||
| 17 | const CMQ_QUEUE_NO_MESSAGE_CODE = 7000; |
||
| 18 | |||
| 19 | const CMQ_TOPIC_TAG_FILTER_NAME = 'msgtag'; |
||
| 20 | const CMQ_TOPIC_ROUTING_FILTER_NAME = 'routing'; |
||
| 21 | |||
| 22 | /** |
||
| 23 | * @var array |
||
| 24 | */ |
||
| 25 | protected $queueOptions; |
||
| 26 | |||
| 27 | /** |
||
| 28 | * @var array |
||
| 29 | */ |
||
| 30 | protected $topicOptions; |
||
| 31 | |||
| 32 | /** |
||
| 33 | * @var \Freyo\LaravelQueueCMQ\Queue\Driver\Account |
||
| 34 | */ |
||
| 35 | private $queueAccount; |
||
| 36 | |||
| 37 | /** |
||
| 38 | * @var \Freyo\LaravelQueueCMQ\Queue\Driver\Account |
||
| 39 | */ |
||
| 40 | private $topicAccount; |
||
| 41 | |||
| 42 | /** |
||
| 43 | * @var array |
||
| 44 | */ |
||
| 45 | protected $plainOptions; |
||
| 46 | |||
| 47 | /** |
||
| 48 | * @var \ReflectionMethod |
||
| 49 | */ |
||
| 50 | private $createPayload; |
||
| 51 | |||
| 52 | /** |
||
| 53 | * CMQQueue constructor. |
||
| 54 | * |
||
| 55 | * @param \Freyo\LaravelQueueCMQ\Queue\Driver\Account $queueAccount |
||
| 56 | * @param \Freyo\LaravelQueueCMQ\Queue\Driver\Account $topicAccount |
||
| 57 | * @param array $config |
||
| 58 | * |
||
| 59 | * @throws \ReflectionException |
||
| 60 | */ |
||
| 61 | public function __construct(Account $queueAccount, Account $topicAccount, array $config) |
||
| 62 | { |
||
| 63 | $this->queueAccount = $queueAccount; |
||
| 64 | $this->topicAccount = $topicAccount; |
||
| 65 | |||
| 66 | $this->queueOptions = $config['options']['queue']; |
||
| 67 | $this->topicOptions = $config['options']['topic']; |
||
| 68 | |||
| 69 | $this->plainOptions = Arr::get($config, 'plain', []); |
||
| 70 | |||
| 71 | $this->createPayload = new \ReflectionMethod($this, 'createPayload'); |
||
| 72 | } |
||
| 73 | |||
| 74 | /** |
||
| 75 | * @return bool |
||
| 76 | */ |
||
| 77 | public function isPlain() |
||
| 78 | { |
||
| 79 | return (bool) Arr::get($this->plainOptions, 'enable'); |
||
| 80 | } |
||
| 81 | |||
| 82 | /** |
||
| 83 | * @return string |
||
| 84 | */ |
||
| 85 | public function getPlainJob() |
||
| 86 | { |
||
| 87 | return Arr::get($this->plainOptions, 'job'); |
||
| 88 | } |
||
| 89 | |||
| 90 | /** |
||
| 91 | * Get the size of the queue. |
||
| 92 | * |
||
| 93 | * @param string $queue |
||
| 94 | * |
||
| 95 | * @return int |
||
| 96 | */ |
||
| 97 | public function size($queue = null) |
||
| 98 | { |
||
| 99 | $attributes = $this->getQueue($queue)->get_attributes(); |
||
| 100 | |||
| 101 | return (int) $attributes->activeMsgNum; |
||
| 102 | } |
||
| 103 | |||
| 104 | /** |
||
| 105 | * Push a new job onto the queue. |
||
| 106 | * |
||
| 107 | * @param string|object $job |
||
| 108 | * @param mixed $data |
||
| 109 | * @param string $queue |
||
| 110 | * |
||
| 111 | * @throws \Exception |
||
| 112 | * |
||
| 113 | * @return mixed |
||
| 114 | */ |
||
| 115 | public function push($job, $data = '', $queue = null) |
||
| 116 | { |
||
| 117 | if ($this->isPlain()) { |
||
| 118 | return $this->pushRaw($job->getPayload(), $queue); |
||
| 119 | } |
||
| 120 | |||
| 121 | $payload = $this->createPayload->getNumberOfParameters() === 3 |
||
| 122 | ? $this->createPayload($job, $queue, $data) // version >= 5.7 |
||
| 123 | : $this->createPayload($job, $data); |
||
| 124 | |||
| 125 | return $this->pushRaw($payload, $queue); |
||
| 126 | } |
||
| 127 | |||
| 128 | /** |
||
| 129 | * Push a raw payload onto the queue. |
||
| 130 | * |
||
| 131 | * @param string $payload |
||
| 132 | * @param string $queue |
||
| 133 | * @param array $options |
||
| 134 | * |
||
| 135 | * @throws \Freyo\LaravelQueueCMQ\Queue\Driver\CMQServerNetworkException |
||
| 136 | * @throws \Freyo\LaravelQueueCMQ\Queue\Driver\CMQServerException |
||
| 137 | * @throws \Exception |
||
| 138 | * |
||
| 139 | * @return \Freyo\LaravelQueueCMQ\Queue\Driver\Message|array |
||
| 140 | */ |
||
| 141 | public function pushRaw($payload, $queue = null, array $options = []) |
||
| 142 | { |
||
| 143 | $message = new Message($payload); |
||
| 144 | |||
| 145 | $driver = $this->parseQueue($queue); |
||
| 146 | |||
| 147 | if ($driver instanceof Topic) { |
||
| 148 | switch ($this->topicOptions['filter']) { |
||
| 149 | case self::CMQ_TOPIC_TAG_FILTER_NAME: |
||
| 150 | return retry(Arr::get($this->topicOptions, 'retries', 3), |
||
| 151 | function () use ($driver, $message, $queue) { |
||
| 152 | return $driver->publish_message($message->msgBody, explode(',', $queue), null); |
||
|
0 ignored issues
–
show
Bug
introduced
by
Loading history...
|
|||
| 153 | }); |
||
| 154 | case self::CMQ_TOPIC_ROUTING_FILTER_NAME: |
||
| 155 | return retry(Arr::get($this->topicOptions, 'retries', 3), |
||
| 156 | function () use ($driver, $message, $queue) { |
||
| 157 | $driver->publish_message($message->msgBody, [], $queue); |
||
| 158 | }); |
||
| 159 | default: |
||
| 160 | throw new \InvalidArgumentException( |
||
| 161 | 'Invalid CMQ topic filter: '.$this->topicOptions['filter'] |
||
| 162 | ); |
||
| 163 | } |
||
| 164 | } |
||
| 165 | |||
| 166 | return retry(Arr::get($this->queueOptions, 'retries', 3), function () use ($driver, $message, $options) { |
||
| 167 | return $driver->send_message($message, Arr::get($options, 'delay', 0)); |
||
| 168 | }); |
||
| 169 | } |
||
| 170 | |||
| 171 | /** |
||
| 172 | * Push a new job onto the queue after a delay. |
||
| 173 | * |
||
| 174 | * @param \DateTimeInterface|\DateInterval|int $delay |
||
| 175 | * @param string|object $job |
||
| 176 | * @param mixed $data |
||
| 177 | * @param string $queue |
||
| 178 | * |
||
| 179 | * @throws \Exception |
||
| 180 | * |
||
| 181 | * @return mixed |
||
| 182 | */ |
||
| 183 | public function later($delay, $job, $data = '', $queue = null) |
||
| 184 | { |
||
| 185 | $delay = method_exists($this, 'getSeconds') |
||
| 186 | ? $this->getSeconds($delay) |
||
| 187 | : $this->secondsUntil($delay); |
||
| 188 | |||
| 189 | if ($this->isPlain()) { |
||
| 190 | return $this->pushRaw($job->getPayload(), $queue, ['delay' => $delay]); |
||
| 191 | } |
||
| 192 | |||
| 193 | $payload = $this->createPayload->getNumberOfParameters() === 3 |
||
| 194 | ? $this->createPayload($job, $queue, $data) // version >= 5.7 |
||
| 195 | : $this->createPayload($job, $data); |
||
| 196 | |||
| 197 | return $this->pushRaw($payload, $queue, ['delay' => $delay]); |
||
| 198 | } |
||
| 199 | |||
| 200 | /** |
||
| 201 | * Pop the next job off of the queue. |
||
| 202 | * |
||
| 203 | * @param string $queue |
||
| 204 | * |
||
| 205 | * @return \Illuminate\Contracts\Queue\Job|null |
||
| 206 | */ |
||
| 207 | public function pop($queue = null) |
||
| 208 | { |
||
| 209 | try { |
||
| 210 | $queue = $this->getQueue($queue); |
||
| 211 | $message = $queue->receive_message($this->queueOptions['polling_wait_seconds']); |
||
| 212 | } catch (CMQServerException $e) { |
||
| 213 | if (self::CMQ_QUEUE_NO_MESSAGE_CODE === (int) $e->getCode()) { // ignore no message |
||
| 214 | return; |
||
| 215 | } |
||
| 216 | |||
| 217 | throw $e; |
||
| 218 | } |
||
| 219 | |||
| 220 | return new CMQJob( |
||
| 221 | $this->container ?: Container::getInstance(), |
||
| 222 | $this, $message, $queue, $this->connectionName |
||
| 223 | ); |
||
| 224 | } |
||
| 225 | |||
| 226 | /** |
||
| 227 | * Get the queue. |
||
| 228 | * |
||
| 229 | * @param string $queue |
||
| 230 | * |
||
| 231 | * @return Driver\Queue |
||
| 232 | */ |
||
| 233 | public function getQueue($queue = null) |
||
| 234 | { |
||
| 235 | return $this->queueAccount->get_queue($queue ?: $this->queueOptions['name']); |
||
| 236 | } |
||
| 237 | |||
| 238 | /** |
||
| 239 | * Get the topic. |
||
| 240 | * |
||
| 241 | * @param string $topic |
||
| 242 | * |
||
| 243 | * @return Driver\Topic |
||
| 244 | */ |
||
| 245 | public function getTopic($topic = null) |
||
| 246 | { |
||
| 247 | return $this->topicAccount->get_topic($topic ?: $this->topicOptions['name']); |
||
| 248 | } |
||
| 249 | |||
| 250 | /** |
||
| 251 | * Parse name to topic or queue. |
||
| 252 | * |
||
| 253 | * @param string $queue |
||
| 254 | * |
||
| 255 | * @return Driver\Queue|Driver\Topic |
||
| 256 | */ |
||
| 257 | public function parseQueue($queue = null) |
||
| 258 | { |
||
| 259 | if ($this->topicOptions['enable']) { |
||
| 260 | return $this->getTopic($this->topicOptions['name'] ?: $queue); |
||
| 261 | } |
||
| 262 | |||
| 263 | return $this->getQueue($queue ?: $this->queueOptions['name']); |
||
| 264 | } |
||
| 265 | } |
||
| 266 |