|
@@ 29-33 (lines=5) @@
|
| 26 |
|
public function queueCommand(string $queueName, string $id, string $serialized) |
| 27 |
|
{ |
| 28 |
|
if (!$this->storeCommandAndCheckIfIdReserved($queueName, $id, $serialized)) { |
| 29 |
|
$this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) { |
| 30 |
|
self::cReserveCommandId($client, $queueName, $id); |
| 31 |
|
self::cUpdateCommandStatus($client, $queueName, $id, self::STATUS_QUEUED); |
| 32 |
|
$client->lpush(":{$queueName}:queue", [ $id ]); |
| 33 |
|
}); |
| 34 |
|
} |
| 35 |
|
} |
| 36 |
|
|
|
@@ 56-60 (lines=5) @@
|
| 53 |
|
return $this->awaitCommand($queueName, $timeout); |
| 54 |
|
} |
| 55 |
|
/* @var $id string */ |
| 56 |
|
list(, $serialized) = $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) { |
| 57 |
|
self::cUpdateCommandStatus($client, $queueName, $id, self::STATUS_IN_PROGRESS); |
| 58 |
|
self::cRetrieveCommand($client, $queueName, $id); |
| 59 |
|
self::cReleaseReservedCommandIds($client, $queueName, [ $id ]); |
| 60 |
|
}); |
| 61 |
|
return new ReceivedCommand($queueName, $id, $serialized); |
| 62 |
|
} |
| 63 |
|
|