| @@ 74-94 (lines=21) @@ | ||
| 71 | /** |
|
| 72 | * {@inheritdoc} |
|
| 73 | */ |
|
| 74 | public function popMessage($queueName, $duration = 5) |
|
| 75 | { |
|
| 76 | $runtime = microtime(true) + $duration; |
|
| 77 | ||
| 78 | while (microtime(true) < $runtime) { |
|
| 79 | $result = $this->messages->findAndModify( |
|
| 80 | ['queue' => (string) $queueName, 'visible' => true], |
|
| 81 | ['$set' => ['visible' => false]], |
|
| 82 | ['message' => 1], |
|
| 83 | ['sort' => ['sentAt' => 1]] |
|
| 84 | ); |
|
| 85 | ||
| 86 | if ($result) { |
|
| 87 | return [(string) $result['message'], (string) $result['_id']]; |
|
| 88 | } |
|
| 89 | ||
| 90 | usleep(10000); |
|
| 91 | } |
|
| 92 | ||
| 93 | return [null, null]; |
|
| 94 | } |
|
| 95 | ||
| 96 | /** |
|
| 97 | * {@inheritdoc} |
|
| @@ 75-94 (lines=20) @@ | ||
| 72 | /** |
|
| 73 | * {@inheritdoc} |
|
| 74 | */ |
|
| 75 | public function popMessage($queueName, $duration = 5) |
|
| 76 | { |
|
| 77 | $runtime = microtime(true) + $duration; |
|
| 78 | ||
| 79 | while (microtime(true) < $runtime) { |
|
| 80 | $result = $this->messages->findOneAndUpdate( |
|
| 81 | ['queue' => (string) $queueName, 'visible' => true], |
|
| 82 | ['$set' => ['visible' => false]], |
|
| 83 | ['sort' => ['sentAt' => 1], 'projection' => ['message' => 1]] |
|
| 84 | ); |
|
| 85 | ||
| 86 | if ($result) { |
|
| 87 | return [(string) $result['message'], (string) $result['_id']]; |
|
| 88 | } |
|
| 89 | ||
| 90 | usleep(10000); |
|
| 91 | } |
|
| 92 | ||
| 93 | return [null, null]; |
|
| 94 | } |
|
| 95 | ||
| 96 | /** |
|
| 97 | * {@inheritdoc} |
|