1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Charcoal\Queue; |
4
|
|
|
|
5
|
|
|
// PSR-3 (logger) dependencies |
6
|
|
|
use \Psr\Log\LoggerAwareInterface; |
7
|
|
|
use \Psr\Log\LoggerAwareTrait; |
8
|
|
|
|
9
|
|
|
// From `charcoal-core` |
10
|
|
|
use \Charcoal\Loader\CollectionLoader; |
11
|
|
|
|
12
|
|
|
// From `charcoal-factory` |
13
|
|
|
use \Charcoal\Factory\FactoryInterface; |
14
|
|
|
|
15
|
|
|
/** |
16
|
|
|
* Abstract Queue Manager |
17
|
|
|
* |
18
|
|
|
* The queue manager is used to load queued items and batch-process them. |
19
|
|
|
* |
20
|
|
|
* ## Loading queued items |
21
|
|
|
* |
22
|
|
|
* If a "queue_id" is specified, only the item for this specific queue will be loaded. |
23
|
|
|
* Otherwise, all unprocessed queue items will be processed. |
24
|
|
|
* |
25
|
|
|
* ## Type of queue items |
26
|
|
|
* |
27
|
|
|
* The type of queue items can be set in extended concrete class with the |
28
|
|
|
* `queue_item_proto()` method. This method should return a QueueItemInterface instance. |
29
|
|
|
* |
30
|
|
|
* ## Callbacks |
31
|
|
|
* |
32
|
|
|
* There are 4 available callback methods that can be set: |
33
|
|
|
* |
34
|
|
|
* - `item_callback` |
35
|
|
|
* - Called after an item has been processed. |
36
|
|
|
* - Arguments: `QueueModelInterface $item` |
37
|
|
|
* - `item_success_callback` |
38
|
|
|
* - `item_failure_callback` |
39
|
|
|
* - `processed_callback` |
40
|
|
|
* - Called when the entire queue has been processed |
41
|
|
|
* - Arguments: `array $success`, `array $failures` |
42
|
|
|
*/ |
43
|
|
|
abstract class AbstractQueueManager implements |
44
|
|
|
QueueManagerInterface, |
45
|
|
|
LoggerAwareInterface |
46
|
|
|
{ |
47
|
|
|
use LoggerAwareTrait; |
48
|
|
|
|
49
|
|
|
/** |
50
|
|
|
* The queue ID. |
51
|
|
|
* |
52
|
|
|
* If set, then it will load only the items from this queue. |
53
|
|
|
* If NULL, load *all* queued items. |
54
|
|
|
* |
55
|
|
|
* @var mixed $queueId |
56
|
|
|
*/ |
57
|
|
|
private $queueId; |
58
|
|
|
|
59
|
|
|
/** |
60
|
|
|
* The callback routine when an item is processed (whether resolved or rejected). |
61
|
|
|
* |
62
|
|
|
* @var callable $itemCallback |
63
|
|
|
*/ |
64
|
|
|
private $itemCallback; |
65
|
|
|
|
66
|
|
|
/** |
67
|
|
|
* The callback routine when the item is resolved. |
68
|
|
|
* |
69
|
|
|
* @var callable $itemSuccessCallback |
70
|
|
|
*/ |
71
|
|
|
private $itemSuccessCallback; |
72
|
|
|
|
73
|
|
|
/** |
74
|
|
|
* The callback routine when the item is rejected. |
75
|
|
|
* |
76
|
|
|
* @var callable $itemFailureCallback |
77
|
|
|
*/ |
78
|
|
|
private $itemFailureCallback; |
79
|
|
|
|
80
|
|
|
/** |
81
|
|
|
* The callback routine when the queue is processed. |
82
|
|
|
* |
83
|
|
|
* @var callable $processedCallback |
84
|
|
|
*/ |
85
|
|
|
private $processedCallback; |
86
|
|
|
|
87
|
|
|
/** |
88
|
|
|
* @var FactoryInterface $queueItemFactory |
89
|
|
|
*/ |
90
|
|
|
private $queueItemFactory; |
91
|
|
|
|
92
|
|
|
/** |
93
|
|
|
* Construct new queue manager. |
94
|
|
|
* |
95
|
|
|
* @param array $data Dependencies and settings. |
96
|
|
|
*/ |
97
|
|
|
public function __construct(array $data = []) |
98
|
|
|
{ |
99
|
|
|
$this->setLogger($data['logger']); |
100
|
|
|
$this->setQueueItemFactory($data['queue_item_factory']); |
101
|
|
|
} |
102
|
|
|
|
103
|
|
|
/** |
104
|
|
|
* @param FactoryInterface $factory The factory used to create queue items. |
105
|
|
|
* @return QueueItemInterface Chainable |
106
|
|
|
*/ |
107
|
|
|
protected function setQueueItemFactory(FactoryInterface $factory) |
108
|
|
|
{ |
109
|
|
|
$this->queueItemFactory = $factory; |
110
|
|
|
return $this; |
111
|
|
|
} |
112
|
|
|
|
113
|
|
|
/** |
114
|
|
|
* @return FactoryInterface |
115
|
|
|
*/ |
116
|
|
|
protected function queueItemFactory() |
117
|
|
|
{ |
118
|
|
|
return $this->queueItemFactory; |
119
|
|
|
} |
120
|
|
|
|
121
|
|
|
/** |
122
|
|
|
* Set the manager's data. |
123
|
|
|
* |
124
|
|
|
* @param array $data The queue data to set. |
125
|
|
|
* @return AbstractQueueManager Chainable |
126
|
|
|
*/ |
127
|
|
|
public function setData(array $data) |
128
|
|
|
{ |
129
|
|
|
if (isset($data['queue_id']) && $data['queue_id']) { |
130
|
|
|
$this->setQueueId($data['queue_id']); |
131
|
|
|
} |
132
|
|
|
|
133
|
|
|
return $this; |
134
|
|
|
} |
135
|
|
|
|
136
|
|
|
/** |
137
|
|
|
* Set the queue's ID. |
138
|
|
|
* |
139
|
|
|
* @param mixed $id The unique queue identifier. |
140
|
|
|
* @return AbstractQueueManager Chainable |
141
|
|
|
*/ |
142
|
|
|
public function setQueueId($id) |
143
|
|
|
{ |
144
|
|
|
$this->queueId = $id; |
145
|
|
|
return $this; |
146
|
|
|
} |
147
|
|
|
|
148
|
|
|
/** |
149
|
|
|
* Get the queue's ID. |
150
|
|
|
* |
151
|
|
|
* @return mixed |
152
|
|
|
*/ |
153
|
|
|
public function queueId() |
154
|
|
|
{ |
155
|
|
|
return $this->queueId; |
156
|
|
|
} |
157
|
|
|
|
158
|
|
|
/** |
159
|
|
|
* Set the callback routine when an item is processed. |
160
|
|
|
* |
161
|
|
|
* @param callable $callback A item callback routine. |
162
|
|
|
* @return QueueManagerInterface Chainable |
163
|
|
|
*/ |
164
|
|
|
public function setItemCallback(callable $callback) |
165
|
|
|
{ |
166
|
|
|
$this->itemCallback = $callback; |
167
|
|
|
return $this; |
168
|
|
|
} |
169
|
|
|
|
170
|
|
|
/** |
171
|
|
|
* Set the callback routine when the item is resolved. |
172
|
|
|
* |
173
|
|
|
* @param callable $callback A item callback routine. |
174
|
|
|
* @return QueueManagerInterface Chainable |
175
|
|
|
*/ |
176
|
|
|
public function setItemSuccessCallback(callable $callback) |
177
|
|
|
{ |
178
|
|
|
$this->itemSuccessCallback = $callback; |
179
|
|
|
return $this; |
180
|
|
|
} |
181
|
|
|
|
182
|
|
|
/** |
183
|
|
|
* Set the callback routine when the item is rejected. |
184
|
|
|
* |
185
|
|
|
* @param callable $callback A item callback routine. |
186
|
|
|
* @return QueueManagerInterface Chainable |
187
|
|
|
*/ |
188
|
|
|
public function setItemFailureCallback(callable $callback) |
189
|
|
|
{ |
190
|
|
|
$this->itemSuccessCallback = $callback; |
191
|
|
|
return $this; |
192
|
|
|
} |
193
|
|
|
|
194
|
|
|
/** |
195
|
|
|
* Set the callback routine when the queue is processed. |
196
|
|
|
* |
197
|
|
|
* @param callable $callback A queue callback routine. |
198
|
|
|
* @return QueueManagerInterface Chainable |
199
|
|
|
*/ |
200
|
|
|
public function setProcessedCallback(callable $callback) |
201
|
|
|
{ |
202
|
|
|
$this->processedCallback = $callback; |
203
|
|
|
return $this; |
204
|
|
|
} |
205
|
|
|
|
206
|
|
|
/** |
207
|
|
|
* Process the items of the queue. |
208
|
|
|
* |
209
|
|
|
* If no callback is passed and a self::$processedCallback is set, the latter is used. |
210
|
|
|
* |
211
|
|
|
* @param callable $callback An optional alternative callback routine executed |
212
|
|
|
* after all queue items are processed. |
213
|
|
|
* @return boolean Success / Failure |
214
|
|
|
*/ |
215
|
|
|
public function processQueue(callable $callback = null) |
216
|
|
|
{ |
217
|
|
|
$queued = $this->loadQueueItems(); |
218
|
|
|
|
219
|
|
|
if (!is_callable($callback)) { |
220
|
|
|
$callback = $this->processedCallback; |
221
|
|
|
} |
222
|
|
|
|
223
|
|
|
$success = []; |
224
|
|
|
$failures = []; |
225
|
|
|
$skipped = []; |
226
|
|
|
foreach ($queued as $q) { |
227
|
|
|
try { |
228
|
|
|
$res = $q->process($this->itemCallback, $this->itemSuccessCallback, $this->itemFailureCallback); |
229
|
|
|
if ($res === true) { |
230
|
|
|
$success[] = $q; |
231
|
|
|
} elseif ($res === false) { |
232
|
|
|
$failures[] = $q; |
233
|
|
|
} else { |
234
|
|
|
$skipped[] = $q; |
235
|
|
|
} |
236
|
|
|
} catch (Exception $e) { |
|
|
|
|
237
|
|
|
$this->logger->error( |
238
|
|
|
sprintf('Could not process a queue item: %s', $e->getMessage()) |
239
|
|
|
); |
240
|
|
|
$failures[] = $q; |
241
|
|
|
continue; |
242
|
|
|
} |
243
|
|
|
} |
244
|
|
|
|
245
|
|
|
if (is_callable($callback)) { |
246
|
|
|
$callback($success, $failures, $skipped); |
247
|
|
|
} |
248
|
|
|
|
249
|
|
|
return true; |
250
|
|
|
} |
251
|
|
|
|
252
|
|
|
/** |
253
|
|
|
* Retrieve the items of the current queue. |
254
|
|
|
* |
255
|
|
|
* @return Collection |
256
|
|
|
*/ |
257
|
|
|
public function loadQueueItems() |
258
|
|
|
{ |
259
|
|
|
$loader = new CollectionLoader([ |
260
|
|
|
'logger' => $this->logger, |
261
|
|
|
'factory' => $this->queueItemFactory() |
262
|
|
|
]); |
263
|
|
|
$loader->setModel($this->queueItemProto()); |
|
|
|
|
264
|
|
|
$loader->addFilter([ |
265
|
|
|
'property' => 'processed', |
266
|
|
|
'val' => 0 |
267
|
|
|
]); |
268
|
|
|
$loader->addFilter([ |
269
|
|
|
'property' => 'processing_date', |
270
|
|
|
'val' => date('Y-m-d H:i:s'), |
271
|
|
|
'operator' => '<' |
272
|
|
|
]); |
273
|
|
|
|
274
|
|
|
$queueId = $this->queueId(); |
275
|
|
|
if ($queueId) { |
276
|
|
|
$loader->addFilter([ |
277
|
|
|
'property' => 'queue_id', |
278
|
|
|
'val' => $queueId |
279
|
|
|
]); |
280
|
|
|
} |
281
|
|
|
|
282
|
|
|
$loader->addOrder([ |
283
|
|
|
'property' => 'queued_date', |
284
|
|
|
'mode' => 'asc' |
285
|
|
|
]); |
286
|
|
|
$queued = $loader->load(); |
287
|
|
|
|
288
|
|
|
return $queued; |
|
|
|
|
289
|
|
|
} |
290
|
|
|
|
291
|
|
|
/** |
292
|
|
|
* Retrieve the queue item's model. |
293
|
|
|
* |
294
|
|
|
* @return QueueItemInterface |
295
|
|
|
*/ |
296
|
|
|
abstract public function queueItemProto(); |
297
|
|
|
} |
298
|
|
|
|
Scrutinizer analyzes your
composer.json
/composer.lock
file if available to determine the classes, and functions that are defined by your dependencies.It seems like the listed class was neither found in your dependencies, nor was it found in the analyzed files in your repository. If you are using some other form of dependency management, you might want to disable this analysis.