@@ 275-292 (lines=18) @@ | ||
272 | * @return $this |
|
273 | * @throws QueueException |
|
274 | */ |
|
275 | public function bury($mixed) |
|
276 | { |
|
277 | $payload = $this->payload($mixed); |
|
278 | $this->checkMessageFlow($payload->getState(), 'bury'); |
|
279 | ||
280 | $result = $this->getRedis()->transaction(function ($tx) use ($payload) { |
|
281 | /** @var $tx \Predis\Client */ |
|
282 | $tx->zrem($this->getKey($payload->getQueue(), self::STATE_RESERVED), $payload->getToken()); |
|
283 | $tx->zadd($this->getKey($payload->getQueue(), self::STATE_BURIED), time(), $payload->getToken()); |
|
284 | $tx->hset($this->ns(self::NS_MESSAGE_TO_STATE), $payload->getToken(), self::STATE_BURIED); |
|
285 | }); |
|
286 | ||
287 | if (in_array(false, $result, true)) { |
|
288 | throw new QueueException("The message bury error."); |
|
289 | } |
|
290 | ||
291 | return $this; |
|
292 | } |
|
293 | ||
294 | /** |
|
295 | * @param Message|string $mixed |
|
@@ 328-345 (lines=18) @@ | ||
325 | * @return $this |
|
326 | * @throws QueueException |
|
327 | */ |
|
328 | public function kick($mixed) |
|
329 | { |
|
330 | $payload = $this->payload($mixed); |
|
331 | $this->checkMessageFlow($payload->getState(), 'kick'); |
|
332 | ||
333 | $result = $this->getRedis()->transaction(function ($tx) use ($payload) { |
|
334 | /** @var $tx \Predis\Client */ |
|
335 | $tx->zrem($this->getKey($payload->getQueue(), self::STATE_BURIED), $payload->getToken()); |
|
336 | $tx->rpush($this->getKey($payload->getQueue(), self::STATE_READY), $payload->getToken()); |
|
337 | $tx->hset($this->ns(self::NS_MESSAGE_TO_STATE), $payload->getToken(), self::STATE_READY); |
|
338 | }); |
|
339 | ||
340 | if (in_array(false, $result, true)) { |
|
341 | throw new QueueException("The message kicked error"); |
|
342 | } |
|
343 | ||
344 | return $this; |
|
345 | } |
|
346 | ||
347 | /** |
|
348 | * @param Message|string $mixed |