1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Bernard\Driver; |
4
|
|
|
|
5
|
|
|
use MongoCollection; |
6
|
|
|
use MongoDate; |
7
|
|
|
use MongoId; |
8
|
|
|
use Symfony\Component\OptionsResolver\OptionsResolver; |
9
|
|
|
|
10
|
|
|
/** |
11
|
|
|
* Driver supporting MongoDB |
12
|
|
|
* |
13
|
|
|
* @package Bernard |
14
|
|
|
*/ |
15
|
|
|
class MongoDBDriver extends AbstractDriver |
16
|
|
|
{ |
17
|
|
|
private $messages; |
18
|
|
|
private $queues; |
19
|
|
|
|
20
|
|
|
/** |
21
|
|
|
* @param MongoCollection $queues Collection where queues will be stored |
22
|
|
|
* @param MongoCollection $messages Collection where messages will be stored |
23
|
|
|
*/ |
24
|
|
|
public function __construct(MongoCollection $queues, MongoCollection $messages) |
25
|
|
|
{ |
26
|
|
|
$this->queues = $queues; |
27
|
|
|
$this->messages = $messages; |
28
|
|
|
} |
29
|
|
|
|
30
|
|
|
/** |
31
|
|
|
* {@inheritdoc} |
32
|
|
|
*/ |
33
|
|
|
public function listQueues() |
34
|
|
|
{ |
35
|
|
|
return $this->queues->distinct('_id'); |
36
|
|
|
} |
37
|
|
|
|
38
|
|
|
/** |
39
|
|
|
* {@inheritdoc} |
40
|
|
|
*/ |
41
|
|
|
public function createQueue($queueName, array $options = []) |
42
|
|
|
{ |
43
|
|
|
$options = $this->validateQueueOptions($options); |
44
|
|
|
$data = ['_id' => (string) $queueName]; |
45
|
|
|
|
46
|
|
|
$this->queues->update($data, $data, $options); |
47
|
|
|
} |
48
|
|
|
|
49
|
|
|
/** |
50
|
|
|
* {@inheritdoc} |
51
|
|
|
*/ |
52
|
|
|
public function countMessages($queueName) |
53
|
|
|
{ |
54
|
|
|
return $this->messages->count([ |
55
|
|
|
'queue' => (string) $queueName, |
56
|
|
|
'visible' => true, |
57
|
|
|
]); |
58
|
|
|
} |
59
|
|
|
|
60
|
|
|
/** |
61
|
|
|
* {@inheritdoc} |
62
|
|
|
*/ |
63
|
|
|
public function pushMessage($queueName, $message, array $options = []) |
64
|
|
|
{ |
65
|
|
|
$options = $this->validatePushOptions($options); |
66
|
|
|
$data = [ |
67
|
|
|
'queue' => (string) $queueName, |
68
|
|
|
'message' => (string) $message, |
69
|
|
|
'sentAt' => new MongoDate(), |
70
|
|
|
'visible' => true, |
71
|
|
|
]; |
72
|
|
|
|
73
|
|
|
$this->messages->insert($data, $options); |
74
|
|
|
} |
75
|
|
|
|
76
|
|
|
/** |
77
|
|
|
* {@inheritdoc} |
78
|
|
|
*/ |
79
|
|
|
public function popMessage($queueName, $duration = 5) |
80
|
|
|
{ |
81
|
|
|
$runtime = microtime(true) + $duration; |
82
|
|
|
|
83
|
|
|
while (microtime(true) < $runtime) { |
84
|
|
|
$result = $this->messages->findAndModify( |
85
|
|
|
['queue' => (string) $queueName, 'visible' => true], |
86
|
|
|
['$set' => ['visible' => false]], |
87
|
|
|
['message' => 1], |
88
|
|
|
['sort' => ['sentAt' => 1]] |
89
|
|
|
); |
90
|
|
|
|
91
|
|
|
if ($result) { |
|
|
|
|
92
|
|
|
return [(string) $result['message'], (string) $result['_id']]; |
93
|
|
|
} |
94
|
|
|
|
95
|
|
|
usleep(10000); |
96
|
|
|
} |
97
|
|
|
|
98
|
|
|
return [null, null]; |
99
|
|
|
} |
100
|
|
|
|
101
|
|
|
/** |
102
|
|
|
* {@inheritdoc} |
103
|
|
|
*/ |
104
|
|
|
public function acknowledgeMessage($queueName, $receipt) |
105
|
|
|
{ |
106
|
|
|
$this->messages->remove([ |
107
|
|
|
'_id' => new MongoId((string) $receipt), |
108
|
|
|
'queue' => (string) $queueName, |
109
|
|
|
]); |
110
|
|
|
} |
111
|
|
|
|
112
|
|
|
/** |
113
|
|
|
* {@inheritdoc} |
114
|
|
|
*/ |
115
|
|
|
public function peekQueue($queueName, $index = 0, $limit = 20) |
116
|
|
|
{ |
117
|
|
|
$query = ['queue' => (string) $queueName, 'visible' => true]; |
118
|
|
|
$fields = ['_id' => 0, 'message' => 1]; |
119
|
|
|
|
120
|
|
|
$cursor = $this->messages |
121
|
|
|
->find($query, $fields) |
122
|
|
|
->sort(['sentAt' => 1]) |
123
|
|
|
->limit($limit) |
124
|
|
|
->skip($index) |
125
|
|
|
; |
126
|
|
|
|
127
|
|
|
$mapper = function ($result) { |
128
|
|
|
return (string) $result['message']; |
129
|
|
|
}; |
130
|
|
|
|
131
|
|
|
return array_map($mapper, iterator_to_array($cursor, false)); |
132
|
|
|
} |
133
|
|
|
|
134
|
|
|
/** |
135
|
|
|
* {@inheritdoc} |
136
|
|
|
*/ |
137
|
|
|
public function removeQueue($queueName) |
138
|
|
|
{ |
139
|
|
|
$this->queues->remove(['_id' => $queueName]); |
140
|
|
|
$this->messages->remove(['queue' => (string) $queueName]); |
141
|
|
|
} |
142
|
|
|
|
143
|
|
|
/** |
144
|
|
|
* {@inheritdoc} |
145
|
|
|
*/ |
146
|
|
|
public function info() |
147
|
|
|
{ |
148
|
|
|
return [ |
149
|
|
|
'messages' => (string) $this->messages, |
150
|
|
|
'queues' => (string) $this->queues, |
151
|
|
|
]; |
152
|
|
|
} |
153
|
|
|
|
154
|
|
|
/** |
155
|
|
|
* {@inheritdoc} |
156
|
|
|
*/ |
157
|
|
|
public function configureQueueOptions(OptionsResolver $resolver) |
158
|
|
|
{ |
159
|
|
|
$resolver->setDefaults(array( |
160
|
|
|
'upsert' => true |
161
|
|
|
)); |
162
|
|
|
} |
163
|
|
|
|
164
|
|
|
/** |
165
|
|
|
* {@inheritdoc} |
166
|
|
|
*/ |
167
|
|
|
public function configurePushOptions(OptionsResolver $resolver) |
168
|
|
|
{ |
169
|
|
|
//BC layer to support 2.3+ and 2.7+/3.0+ versions |
170
|
|
|
if (interface_exists('Symfony\Component\OptionsResolver\OptionsResolverInterface')) { |
171
|
|
|
//2.3+ |
172
|
|
|
$resolver |
|
|
|
|
173
|
|
|
->setDefaults(array( |
174
|
|
|
'fsync' => false, |
175
|
|
|
'j' => false, |
176
|
|
|
)) |
177
|
|
|
->setOptional(array( |
178
|
|
|
'socketTimeoutMS', |
179
|
|
|
'w', |
180
|
|
|
'wTimeoutMS', |
181
|
|
|
)) |
182
|
|
|
; |
183
|
|
|
} else { |
184
|
|
|
//2.7+ |
185
|
|
|
$resolver |
186
|
|
|
->setDefaults(array( |
187
|
|
|
'fsync' => false, |
188
|
|
|
'j' => false, |
189
|
|
|
)) |
190
|
|
|
->setDefined('socketTimeoutMS') |
191
|
|
|
->setDefined('w') |
192
|
|
|
->setDefined('wTimeoutMS') |
193
|
|
|
; |
194
|
|
|
} |
195
|
|
|
} |
196
|
|
|
} |
197
|
|
|
|
This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.
Consider making the comparison explicit by using
empty(..)
or! empty(...)
instead.