1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Bernard\Queue; |
4
|
|
|
|
5
|
|
|
use Bernard\Envelope; |
6
|
|
|
use Bernard\Queue; |
7
|
|
|
|
8
|
|
|
class RoundRobinQueue implements Queue |
9
|
|
|
{ |
10
|
|
|
/** |
11
|
|
|
* @var Queue[] |
12
|
|
|
*/ |
13
|
|
|
protected $queues; |
14
|
|
|
|
15
|
|
|
/** |
16
|
|
|
* @var bool |
17
|
|
|
*/ |
18
|
|
|
protected $closed; |
19
|
|
|
|
20
|
|
|
/** |
21
|
|
|
* @var \SplObjectStorage |
22
|
|
|
*/ |
23
|
|
|
protected $envelopes; |
24
|
|
|
|
25
|
|
|
/** |
26
|
|
|
* @param Queue[] $queues |
27
|
|
|
*/ |
28
|
11 |
|
public function __construct(array $queues) |
29
|
|
|
{ |
30
|
11 |
|
$this->validateQueues($queues); |
31
|
|
|
|
32
|
11 |
|
$this->queues = $this->indexQueues($queues); |
33
|
11 |
|
$this->envelopes = new \SplObjectStorage(); |
34
|
11 |
|
$this->closed = false; |
35
|
11 |
|
} |
36
|
|
|
|
37
|
|
|
/** |
38
|
|
|
* {@inheritdoc} |
39
|
|
|
*/ |
40
|
8 |
|
public function enqueue(Envelope $envelope) |
41
|
|
|
{ |
42
|
8 |
|
$this->verifyEnvelope($envelope); |
43
|
|
|
|
44
|
6 |
|
$this->queues[$envelope->getName()]->enqueue($envelope); |
45
|
6 |
|
} |
46
|
|
|
|
47
|
|
|
/** |
48
|
|
|
* {@inheritdoc} |
49
|
|
|
*/ |
50
|
5 |
|
public function dequeue() |
51
|
|
|
{ |
52
|
5 |
|
$envelope = null; |
53
|
5 |
|
$checked = []; |
54
|
|
|
|
55
|
5 |
|
while (count($checked) < count($this->queues)) { |
56
|
5 |
|
$queue = current($this->queues); |
57
|
5 |
|
$envelope = $queue->dequeue(); |
58
|
5 |
|
if (false === next($this->queues)) { |
59
|
2 |
|
reset($this->queues); |
60
|
2 |
|
} |
61
|
5 |
|
if ($envelope) { |
62
|
4 |
|
$this->envelopes->attach($envelope, $queue); |
63
|
4 |
|
break; |
64
|
|
|
} else { |
65
|
4 |
|
$checked[] = $queue; |
66
|
|
|
} |
67
|
4 |
|
} |
68
|
|
|
|
69
|
5 |
|
return $envelope; |
70
|
|
|
} |
71
|
|
|
|
72
|
|
|
/** |
73
|
|
|
* {@inheritdoc} |
74
|
|
|
*/ |
75
|
1 |
|
public function close() |
76
|
|
|
{ |
77
|
1 |
|
if ($this->closed) { |
78
|
|
|
return; |
79
|
|
|
} |
80
|
|
|
|
81
|
1 |
|
foreach ($this->queues as $queue) { |
82
|
1 |
|
$queue->close(); |
83
|
1 |
|
} |
84
|
|
|
|
85
|
1 |
|
$this->closed = true; |
86
|
1 |
|
} |
87
|
|
|
|
88
|
|
|
/** |
89
|
|
|
* {@inheritdoc} |
90
|
|
|
*/ |
91
|
1 |
|
public function peek($index = 0, $limit = 20) |
92
|
|
|
{ |
93
|
1 |
|
$it = new \InfiniteIterator(new \ArrayIterator($this->queues)); |
94
|
1 |
|
$envelopes = $drained = $indexes = []; |
95
|
1 |
|
foreach (array_keys($this->queues) as $name) { |
96
|
1 |
|
$indexes[$name] = 0; |
97
|
1 |
|
} |
98
|
1 |
|
$shift = 0; |
99
|
|
|
|
100
|
1 |
|
$key = key($this->queues); |
101
|
1 |
|
for ($it->rewind(); $it->key() != $key; $it->next()) { |
|
|
|
|
102
|
|
|
// noop |
103
|
|
|
} |
104
|
|
|
|
105
|
1 |
|
while (count($envelopes) < $limit && count($drained) < $it->count()) { |
106
|
1 |
|
$queue = $it->current(); |
107
|
1 |
|
$name = $it->key(); |
|
|
|
|
108
|
1 |
|
if ($peeked = $queue->peek($indexes[$name], 1)) { |
109
|
1 |
|
if ($shift < $index) { |
110
|
1 |
|
++$shift; |
111
|
1 |
|
++$indexes[$name]; |
112
|
1 |
|
} else { |
113
|
1 |
|
$envelopes[] = array_shift($peeked); |
114
|
|
|
} |
115
|
1 |
|
} else { |
116
|
1 |
|
$drained[$name] = true; |
117
|
|
|
} |
118
|
1 |
|
$it->next(); |
119
|
1 |
|
} |
120
|
|
|
|
121
|
1 |
|
return $envelopes; |
122
|
|
|
} |
123
|
|
|
|
124
|
|
|
/** |
125
|
|
|
* {@inheritdoc} |
126
|
|
|
*/ |
127
|
1 |
|
public function acknowledge(Envelope $envelope) |
128
|
|
|
{ |
129
|
1 |
|
if (!$this->envelopes->contains($envelope)) { |
130
|
|
|
throw new \DomainException( |
131
|
|
|
'Unrecognized queue specified: '.$envelope->getName() |
132
|
|
|
); |
133
|
|
|
} |
134
|
|
|
|
135
|
1 |
|
$queue = $this->envelopes[$envelope]; |
136
|
1 |
|
$queue->acknowledge($envelope); |
137
|
1 |
|
$this->envelopes->detach($envelope); |
138
|
1 |
|
} |
139
|
|
|
|
140
|
|
|
/** |
141
|
|
|
* {@inheritdoc} |
142
|
|
|
*/ |
143
|
1 |
|
public function __toString() |
144
|
|
|
{ |
145
|
1 |
|
return (string) current($this->queues); |
146
|
|
|
} |
147
|
|
|
|
148
|
|
|
/** |
149
|
|
|
* @return int |
150
|
|
|
*/ |
151
|
1 |
|
public function count() |
152
|
|
|
{ |
153
|
1 |
|
return array_sum(array_map('count', $this->queues)); |
154
|
|
|
} |
155
|
|
|
|
156
|
|
|
/** |
157
|
|
|
* @param Queue[] $queues |
158
|
|
|
*/ |
159
|
11 |
|
protected function validateQueues(array $queues) |
160
|
|
|
{ |
161
|
11 |
|
if (empty($queues)) { |
162
|
|
|
throw new \DomainException('$queues cannot be empty'); |
163
|
|
|
} |
164
|
|
|
|
165
|
11 |
|
$filtered = array_filter( |
166
|
11 |
|
$queues, |
167
|
|
|
function ($queue) { |
168
|
11 |
|
return !$queue instanceof Queue; |
169
|
|
|
} |
170
|
11 |
|
); |
171
|
11 |
|
if (!empty($filtered)) { |
172
|
|
|
throw new \DomainException('All elements of $queues must implement Queue'); |
173
|
|
|
} |
174
|
11 |
|
} |
175
|
|
|
|
176
|
|
|
/** |
177
|
|
|
* @param Queue[] $queues |
178
|
|
|
* |
179
|
|
|
* @return Queue[] |
180
|
|
|
*/ |
181
|
11 |
|
protected function indexQueues(array $queues) |
182
|
|
|
{ |
183
|
11 |
|
return array_combine( |
184
|
11 |
|
array_map( |
185
|
11 |
|
function ($queue) { |
186
|
11 |
|
return (string) $queue; |
187
|
11 |
|
}, |
188
|
|
|
$queues |
189
|
11 |
|
), |
190
|
|
|
$queues |
191
|
11 |
|
); |
192
|
|
|
} |
193
|
|
|
|
194
|
|
|
/** |
195
|
|
|
* @param Envelope $envelope |
196
|
|
|
*/ |
197
|
8 |
|
protected function verifyEnvelope(Envelope $envelope) |
198
|
|
|
{ |
199
|
8 |
|
$queue = $envelope->getName(); |
200
|
8 |
|
if (isset($this->queues[$queue])) { |
201
|
6 |
|
return; |
202
|
|
|
} |
203
|
2 |
|
throw new \DomainException('Unrecognized queue specified: '.$queue); |
204
|
|
|
} |
205
|
|
|
} |
206
|
|
|
|
This check looks for
for
loops that have no statements or where all statements have been commented out. This may be the result of changes for debugging or the code may simply be obsolete.Consider removing the loop.