|
@@ 275-292 (lines=18) @@
|
| 272 |
|
* @return $this |
| 273 |
|
* @throws QueueException |
| 274 |
|
*/ |
| 275 |
|
public function bury($mixed) |
| 276 |
|
{ |
| 277 |
|
$payload = $this->payload($mixed); |
| 278 |
|
$this->checkMessageFlow($payload->getState(), 'bury'); |
| 279 |
|
|
| 280 |
|
$result = $this->getRedis()->transaction(function ($tx) use ($payload) { |
| 281 |
|
/** @var $tx \Predis\Client */ |
| 282 |
|
$tx->zrem($this->getKey($payload->getQueue(), self::STATE_RESERVED), $payload->getToken()); |
| 283 |
|
$tx->zadd($this->getKey($payload->getQueue(), self::STATE_BURIED), time(), $payload->getToken()); |
| 284 |
|
$tx->hset($this->ns(self::NS_MESSAGE_TO_STATE), $payload->getToken(), self::STATE_BURIED); |
| 285 |
|
}); |
| 286 |
|
|
| 287 |
|
if (in_array(false, $result, true)) { |
| 288 |
|
throw new QueueException("The message bury error."); |
| 289 |
|
} |
| 290 |
|
|
| 291 |
|
return $this; |
| 292 |
|
} |
| 293 |
|
|
| 294 |
|
/** |
| 295 |
|
* @param Message|string $mixed |
|
@@ 328-345 (lines=18) @@
|
| 325 |
|
* @return $this |
| 326 |
|
* @throws QueueException |
| 327 |
|
*/ |
| 328 |
|
public function kick($mixed) |
| 329 |
|
{ |
| 330 |
|
$payload = $this->payload($mixed); |
| 331 |
|
$this->checkMessageFlow($payload->getState(), 'kick'); |
| 332 |
|
|
| 333 |
|
$result = $this->getRedis()->transaction(function ($tx) use ($payload) { |
| 334 |
|
/** @var $tx \Predis\Client */ |
| 335 |
|
$tx->zrem($this->getKey($payload->getQueue(), self::STATE_BURIED), $payload->getToken()); |
| 336 |
|
$tx->rpush($this->getKey($payload->getQueue(), self::STATE_READY), $payload->getToken()); |
| 337 |
|
$tx->hset($this->ns(self::NS_MESSAGE_TO_STATE), $payload->getToken(), self::STATE_READY); |
| 338 |
|
}); |
| 339 |
|
|
| 340 |
|
if (in_array(false, $result, true)) { |
| 341 |
|
throw new QueueException("The message kicked error"); |
| 342 |
|
} |
| 343 |
|
|
| 344 |
|
return $this; |
| 345 |
|
} |
| 346 |
|
|
| 347 |
|
/** |
| 348 |
|
* @param Message|string $mixed |