|
@@ 245-260 (lines=16) @@
|
| 242 |
|
* @return $this |
| 243 |
|
* @throws QueueException |
| 244 |
|
*/ |
| 245 |
|
public function bury($mixed) |
| 246 |
|
{ |
| 247 |
|
$payload = $this->payload($mixed); |
| 248 |
|
$this->checkMessageFlow($payload->getState(), 'bury'); |
| 249 |
|
|
| 250 |
|
$result = $this->getRedis()->transaction(function ($tx) use ($payload) { |
| 251 |
|
/** @var $tx \Predis\Client */ |
| 252 |
|
$tx->zrem($this->getKey($payload->getQueue(), self::STATE_RESERVED), $payload->getToken()); |
| 253 |
|
$tx->zadd($this->getKey($payload->getQueue(), self::STATE_BURIED), time(), $payload->getToken()); |
| 254 |
|
$tx->hset($this->ns(self::NS_MESSAGE_TO_STATE), $payload->getToken(), self::STATE_BURIED); |
| 255 |
|
}); |
| 256 |
|
|
| 257 |
|
$this->checkTransactionResult($result, 'bury'); |
| 258 |
|
|
| 259 |
|
return $this; |
| 260 |
|
} |
| 261 |
|
|
| 262 |
|
/** |
| 263 |
|
* @param string|MessageInterface $mixed |
|
@@ 292-307 (lines=16) @@
|
| 289 |
|
* @return $this |
| 290 |
|
* @throws QueueException |
| 291 |
|
*/ |
| 292 |
|
public function kick($mixed) |
| 293 |
|
{ |
| 294 |
|
$payload = $this->payload($mixed); |
| 295 |
|
$this->checkMessageFlow($payload->getState(), 'kick'); |
| 296 |
|
|
| 297 |
|
$result = $this->getRedis()->transaction(function ($tx) use ($payload) { |
| 298 |
|
/** @var $tx \Predis\Client */ |
| 299 |
|
$tx->zrem($this->getKey($payload->getQueue(), self::STATE_BURIED), $payload->getToken()); |
| 300 |
|
$tx->rpush($this->getKey($payload->getQueue(), self::STATE_READY), $payload->getToken()); |
| 301 |
|
$tx->hset($this->ns(self::NS_MESSAGE_TO_STATE), $payload->getToken(), self::STATE_READY); |
| 302 |
|
}); |
| 303 |
|
|
| 304 |
|
$this->checkTransactionResult($result, 'kick'); |
| 305 |
|
|
| 306 |
|
return $this; |
| 307 |
|
} |
| 308 |
|
|
| 309 |
|
/** |
| 310 |
|
* @param MessageInterface|string $mixed |