1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Bernard\Driver\MongoDB; |
4
|
|
|
|
5
|
|
|
use MongoCollection; |
6
|
|
|
use MongoDate; |
7
|
|
|
use MongoId; |
8
|
|
|
|
9
|
|
|
/** |
10
|
|
|
* Driver supporting MongoDB. |
11
|
|
|
*/ |
12
|
|
|
final class Driver implements \Bernard\Driver |
13
|
|
|
{ |
14
|
|
|
private $messages; |
15
|
|
|
private $queues; |
16
|
|
|
|
17
|
|
|
/** |
18
|
|
|
* @param MongoCollection $queues Collection where queues will be stored |
19
|
|
|
* @param MongoCollection $messages Collection where messages will be stored |
20
|
|
|
*/ |
21
|
10 |
|
public function __construct(MongoCollection $queues, MongoCollection $messages) |
22
|
|
|
{ |
23
|
10 |
|
$this->queues = $queues; |
24
|
10 |
|
$this->messages = $messages; |
25
|
10 |
|
} |
26
|
|
|
|
27
|
|
|
/** |
28
|
|
|
* {@inheritdoc} |
29
|
|
|
*/ |
30
|
1 |
|
public function listQueues() |
31
|
|
|
{ |
32
|
1 |
|
return $this->queues->distinct('_id'); |
33
|
|
|
} |
34
|
|
|
|
35
|
|
|
/** |
36
|
|
|
* {@inheritdoc} |
37
|
|
|
*/ |
38
|
1 |
|
public function createQueue($queueName) |
39
|
|
|
{ |
40
|
1 |
|
$data = ['_id' => (string) $queueName]; |
41
|
|
|
|
42
|
1 |
|
$this->queues->update($data, $data, ['upsert' => true]); |
43
|
1 |
|
} |
44
|
|
|
|
45
|
|
|
/** |
46
|
|
|
* {@inheritdoc} |
47
|
|
|
*/ |
48
|
1 |
|
public function countMessages($queueName) |
49
|
|
|
{ |
50
|
1 |
|
return $this->messages->count([ |
51
|
1 |
|
'queue' => (string) $queueName, |
52
|
1 |
|
'visible' => true, |
53
|
1 |
|
]); |
54
|
|
|
} |
55
|
|
|
|
56
|
|
|
/** |
57
|
|
|
* {@inheritdoc} |
58
|
|
|
*/ |
59
|
1 |
|
public function pushMessage($queueName, $message) |
60
|
|
|
{ |
61
|
|
|
$data = [ |
62
|
1 |
|
'queue' => (string) $queueName, |
63
|
1 |
|
'message' => (string) $message, |
64
|
1 |
|
'sentAt' => new MongoDate(), |
65
|
1 |
|
'visible' => true, |
66
|
1 |
|
]; |
67
|
|
|
|
68
|
1 |
|
$this->messages->insert($data); |
69
|
1 |
|
} |
70
|
|
|
|
71
|
|
|
/** |
72
|
|
|
* {@inheritdoc} |
73
|
|
|
*/ |
74
|
2 |
|
public function popMessage($queueName, $duration = 5) |
75
|
|
|
{ |
76
|
2 |
|
$runtime = microtime(true) + $duration; |
77
|
|
|
|
78
|
2 |
|
while (microtime(true) < $runtime) { |
79
|
2 |
|
$result = $this->messages->findAndModify( |
80
|
2 |
|
['queue' => (string) $queueName, 'visible' => true], |
81
|
2 |
|
['$set' => ['visible' => false]], |
82
|
2 |
|
['message' => 1], |
83
|
2 |
|
['sort' => ['sentAt' => 1]] |
84
|
2 |
|
); |
85
|
|
|
|
86
|
2 |
|
if ($result) { |
|
|
|
|
87
|
1 |
|
return [(string) $result['message'], (string) $result['_id']]; |
88
|
|
|
} |
89
|
|
|
|
90
|
1 |
|
usleep(10000); |
91
|
1 |
|
} |
92
|
|
|
|
93
|
1 |
|
return [null, null]; |
94
|
|
|
} |
95
|
|
|
|
96
|
|
|
/** |
97
|
|
|
* {@inheritdoc} |
98
|
|
|
*/ |
99
|
1 |
|
public function acknowledgeMessage($queueName, $receipt) |
100
|
|
|
{ |
101
|
1 |
|
$this->messages->remove([ |
102
|
1 |
|
'_id' => new MongoId((string) $receipt), |
103
|
1 |
|
'queue' => (string) $queueName, |
104
|
1 |
|
]); |
105
|
1 |
|
} |
106
|
|
|
|
107
|
|
|
/** |
108
|
|
|
* {@inheritdoc} |
109
|
|
|
*/ |
110
|
1 |
|
public function peekQueue($queueName, $index = 0, $limit = 20) |
111
|
|
|
{ |
112
|
1 |
|
$query = ['queue' => (string) $queueName, 'visible' => true]; |
113
|
1 |
|
$fields = ['_id' => 0, 'message' => 1]; |
114
|
|
|
|
115
|
1 |
|
$cursor = $this->messages |
116
|
1 |
|
->find($query, $fields) |
117
|
1 |
|
->sort(['sentAt' => 1]) |
118
|
1 |
|
->limit($limit) |
119
|
1 |
|
->skip($index) |
120
|
1 |
|
; |
121
|
|
|
|
122
|
1 |
|
$mapper = function ($result) { |
123
|
1 |
|
return (string) $result['message']; |
124
|
1 |
|
}; |
125
|
|
|
|
126
|
1 |
|
return array_map($mapper, iterator_to_array($cursor, false)); |
127
|
|
|
} |
128
|
|
|
|
129
|
|
|
/** |
130
|
|
|
* {@inheritdoc} |
131
|
|
|
*/ |
132
|
1 |
|
public function removeQueue($queueName) |
133
|
|
|
{ |
134
|
1 |
|
$this->queues->remove(['_id' => $queueName]); |
135
|
1 |
|
$this->messages->remove(['queue' => (string) $queueName]); |
136
|
1 |
|
} |
137
|
|
|
|
138
|
|
|
/** |
139
|
|
|
* {@inheritdoc} |
140
|
|
|
*/ |
141
|
1 |
|
public function info() |
142
|
|
|
{ |
143
|
|
|
return [ |
144
|
1 |
|
'messages' => (string) $this->messages, |
145
|
1 |
|
'queues' => (string) $this->queues, |
146
|
1 |
|
]; |
147
|
|
|
} |
148
|
|
|
} |
149
|
|
|
|
This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.
Consider making the comparison explicit by using
empty(..)
or! empty(...)
instead.