1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Charcoal\Queue; |
4
|
|
|
|
5
|
|
|
use Exception; |
6
|
|
|
|
7
|
|
|
// From PSR-3 |
8
|
|
|
use Psr\Log\LoggerAwareInterface; |
9
|
|
|
use Psr\Log\LoggerAwareTrait; |
10
|
|
|
|
11
|
|
|
// From 'charcoal-core' |
12
|
|
|
use Charcoal\Loader\CollectionLoader; |
13
|
|
|
|
14
|
|
|
// From 'charcoal-factory' |
15
|
|
|
use Charcoal\Factory\FactoryInterface; |
16
|
|
|
|
17
|
|
|
/** |
18
|
|
|
* Abstract Queue Manager |
19
|
|
|
* |
20
|
|
|
* The queue manager is used to load queued items and batch-process them. |
21
|
|
|
* |
22
|
|
|
* ## Loading queued items |
23
|
|
|
* |
24
|
|
|
* If a "queue_id" is specified, only the item for this specific queue will be loaded. |
25
|
|
|
* Otherwise, all unprocessed queue items will be processed. |
26
|
|
|
* |
27
|
|
|
* ## Type of queue items |
28
|
|
|
* |
29
|
|
|
* The type of queue items can be set in extended concrete class with the |
30
|
|
|
* `queue_item_proto()` method. This method should return a QueueItemInterface instance. |
31
|
|
|
* |
32
|
|
|
* ## Callbacks |
33
|
|
|
* |
34
|
|
|
* There are 4 available callback methods that can be set: |
35
|
|
|
* |
36
|
|
|
* - `item_callback` |
37
|
|
|
* - Called after an item has been processed. |
38
|
|
|
* - Arguments: `QueueModelInterface $item` |
39
|
|
|
* - `item_success_callback` |
40
|
|
|
* - `item_failure_callback` |
41
|
|
|
* - `processed_callback` |
42
|
|
|
* - Called when the entire queue has been processed |
43
|
|
|
* - Arguments: `array $success`, `array $failures` |
44
|
|
|
*/ |
45
|
|
|
abstract class AbstractQueueManager implements |
46
|
|
|
QueueManagerInterface, |
47
|
|
|
LoggerAwareInterface |
48
|
|
|
{ |
49
|
|
|
use LoggerAwareTrait; |
50
|
|
|
|
51
|
|
|
/** |
52
|
|
|
* The queue processing rate (throttle), in items per second. |
53
|
|
|
* |
54
|
|
|
* @var integer |
55
|
|
|
*/ |
56
|
|
|
private $rate = 0; |
57
|
|
|
|
58
|
|
|
/** |
59
|
|
|
* The batch limit. |
60
|
|
|
* |
61
|
|
|
* @var integer |
62
|
|
|
*/ |
63
|
|
|
private $limit = 0; |
64
|
|
|
|
65
|
|
|
/** |
66
|
|
|
* The chunk size to batch the queue with. |
67
|
|
|
* |
68
|
|
|
* @var integer |
69
|
|
|
*/ |
70
|
|
|
private $chunkSize = 0; |
71
|
|
|
|
72
|
|
|
/** |
73
|
|
|
* The queue ID. |
74
|
|
|
* |
75
|
|
|
* If set, then it will load only the items from this queue. |
76
|
|
|
* If NULL, load *all* queued items. |
77
|
|
|
* |
78
|
|
|
* @var mixed |
79
|
|
|
*/ |
80
|
|
|
private $queueId; |
81
|
|
|
|
82
|
|
|
/** |
83
|
|
|
* Items that were successfully processed |
84
|
|
|
* |
85
|
|
|
* @var array |
86
|
|
|
*/ |
87
|
|
|
private $successItems = []; |
88
|
|
|
|
89
|
|
|
/** |
90
|
|
|
* Item that failed to process |
91
|
|
|
* |
92
|
|
|
* @var array |
93
|
|
|
*/ |
94
|
|
|
private $failedItems = []; |
95
|
|
|
|
96
|
|
|
/** |
97
|
|
|
* Items that were skipped during the processing |
98
|
|
|
* |
99
|
|
|
* @var array |
100
|
|
|
*/ |
101
|
|
|
private $skippedItems = []; |
102
|
|
|
|
103
|
|
|
/** |
104
|
|
|
* The callback routine when an item is processed (whether resolved or rejected). |
105
|
|
|
* |
106
|
|
|
* @var callable $itemCallback |
107
|
|
|
*/ |
108
|
|
|
private $itemCallback; |
109
|
|
|
|
110
|
|
|
/** |
111
|
|
|
* The callback routine when the item is resolved. |
112
|
|
|
* |
113
|
|
|
* @var callable $itemSuccessCallback |
114
|
|
|
*/ |
115
|
|
|
private $itemSuccessCallback; |
116
|
|
|
|
117
|
|
|
/** |
118
|
|
|
* The callback routine when the item is rejected. |
119
|
|
|
* |
120
|
|
|
* @var callable $itemFailureCallback |
121
|
|
|
*/ |
122
|
|
|
private $itemFailureCallback; |
123
|
|
|
|
124
|
|
|
/** |
125
|
|
|
* The callback routine when the queue is processed. |
126
|
|
|
* |
127
|
|
|
* @var callable $processedCallback |
128
|
|
|
*/ |
129
|
|
|
private $processedCallback; |
130
|
|
|
|
131
|
|
|
/** |
132
|
|
|
* @var FactoryInterface $queueItemFactory |
133
|
|
|
*/ |
134
|
|
|
private $queueItemFactory; |
135
|
|
|
|
136
|
|
|
/** |
137
|
|
|
* Construct new queue manager. |
138
|
|
|
* |
139
|
|
|
* @param array $data Dependencies and settings. |
140
|
|
|
*/ |
141
|
|
|
public function __construct(array $data = []) |
142
|
|
|
{ |
143
|
|
|
$this->setLogger($data['logger']); |
144
|
|
|
$this->setQueueItemFactory($data['queue_item_factory']); |
145
|
|
|
|
146
|
|
|
if (isset($data['rate'])) { |
147
|
|
|
$this->rate = intval($data['rate']); |
148
|
|
|
} |
149
|
|
|
|
150
|
|
|
if (isset($data['limit'])) { |
151
|
|
|
$this->limit = intval($data['limit']); |
152
|
|
|
} |
153
|
|
|
|
154
|
|
|
if (isset($data['chunkSize'])) { |
155
|
|
|
$this->chunkSize = intval($data['chunkSize']); |
156
|
|
|
} |
157
|
|
|
} |
158
|
|
|
|
159
|
|
|
/** |
160
|
|
|
* Set the manager's data. |
161
|
|
|
* |
162
|
|
|
* @param array $data The queue data to set. |
163
|
|
|
* @return AbstractQueueManager Chainable |
164
|
|
|
*/ |
165
|
|
|
public function setData(array $data) |
166
|
|
|
{ |
167
|
|
|
if (isset($data['queue_id']) && $data['queue_id']) { |
168
|
|
|
$this->setQueueId($data['queue_id']); |
169
|
|
|
} |
170
|
|
|
|
171
|
|
|
return $this; |
172
|
|
|
} |
173
|
|
|
|
174
|
|
|
/** |
175
|
|
|
* Set the queue's ID. |
176
|
|
|
* |
177
|
|
|
* @param mixed $id The unique queue identifier. |
178
|
|
|
* @return self |
179
|
|
|
*/ |
180
|
|
|
public function setQueueId($id) |
181
|
|
|
{ |
182
|
|
|
$this->queueId = $id; |
183
|
|
|
return $this; |
184
|
|
|
} |
185
|
|
|
|
186
|
|
|
/** |
187
|
|
|
* Get the queue's ID. |
188
|
|
|
* |
189
|
|
|
* @return mixed |
190
|
|
|
*/ |
191
|
|
|
public function queueId() |
192
|
|
|
{ |
193
|
|
|
return $this->queueId; |
194
|
|
|
} |
195
|
|
|
|
196
|
|
|
/** |
197
|
|
|
* @param integer $rate The throttling / processing rate, in items per second. |
198
|
|
|
* @return self |
199
|
|
|
*/ |
200
|
|
|
public function setRate($rate) |
201
|
|
|
{ |
202
|
|
|
$this->rate = intval($rate); |
203
|
|
|
return $this; |
204
|
|
|
} |
205
|
|
|
|
206
|
|
|
/** |
207
|
|
|
* @return integer |
208
|
|
|
*/ |
209
|
|
|
public function rate() |
210
|
|
|
{ |
211
|
|
|
return $this->rate; |
212
|
|
|
} |
213
|
|
|
|
214
|
|
|
/** |
215
|
|
|
* @param integer $limit The maximum number of items to load. |
216
|
|
|
* @return self |
217
|
|
|
*/ |
218
|
|
|
public function setLimit($limit) |
219
|
|
|
{ |
220
|
|
|
$this->limit = intval($limit); |
221
|
|
|
return $this; |
222
|
|
|
} |
223
|
|
|
|
224
|
|
|
/** |
225
|
|
|
* @return integer |
226
|
|
|
*/ |
227
|
|
|
public function limit() |
228
|
|
|
{ |
229
|
|
|
return $this->limit; |
230
|
|
|
} |
231
|
|
|
|
232
|
|
|
/** |
233
|
|
|
* @param integer $chunkSize The size of the chunk of items to process at the same time in the queue. |
234
|
|
|
* @return self |
235
|
|
|
*/ |
236
|
|
|
public function setChunkSize($chunkSize) |
237
|
|
|
{ |
238
|
|
|
$this->chunkSize = intval($chunkSize); |
239
|
|
|
return $this; |
240
|
|
|
} |
241
|
|
|
|
242
|
|
|
/** |
243
|
|
|
* @return integer |
244
|
|
|
*/ |
245
|
|
|
public function chunkSize() |
246
|
|
|
{ |
247
|
|
|
return $this->chunkSize; |
248
|
|
|
} |
249
|
|
|
|
250
|
|
|
/** |
251
|
|
|
* Set the callback routine when an item is processed. |
252
|
|
|
* |
253
|
|
|
* @param callable $callback A item callback routine. |
254
|
|
|
* @return self |
255
|
|
|
*/ |
256
|
|
|
public function setItemCallback(callable $callback) |
257
|
|
|
{ |
258
|
|
|
$this->itemCallback = $callback; |
259
|
|
|
return $this; |
260
|
|
|
} |
261
|
|
|
|
262
|
|
|
/** |
263
|
|
|
* Set the callback routine when the item is resolved. |
264
|
|
|
* |
265
|
|
|
* @param callable $callback A item callback routine. |
266
|
|
|
* @return self |
267
|
|
|
*/ |
268
|
|
|
public function setItemSuccessCallback(callable $callback) |
269
|
|
|
{ |
270
|
|
|
$this->itemSuccessCallback = $callback; |
271
|
|
|
return $this; |
272
|
|
|
} |
273
|
|
|
|
274
|
|
|
/** |
275
|
|
|
* Set the callback routine when the item is rejected. |
276
|
|
|
* |
277
|
|
|
* @param callable $callback A item callback routine. |
278
|
|
|
* @return self |
279
|
|
|
*/ |
280
|
|
|
public function setItemFailureCallback(callable $callback) |
281
|
|
|
{ |
282
|
|
|
$this->itemSuccessCallback = $callback; |
283
|
|
|
return $this; |
284
|
|
|
} |
285
|
|
|
|
286
|
|
|
/** |
287
|
|
|
* Set the callback routine when the queue is processed. |
288
|
|
|
* |
289
|
|
|
* @param callable $callback A queue callback routine. |
290
|
|
|
* @return self |
291
|
|
|
*/ |
292
|
|
|
public function setProcessedCallback(callable $callback) |
293
|
|
|
{ |
294
|
|
|
$this->processedCallback = $callback; |
295
|
|
|
return $this; |
296
|
|
|
} |
297
|
|
|
|
298
|
|
|
/** |
299
|
|
|
* Process the queue. |
300
|
|
|
* |
301
|
|
|
* It can be process in a single batch or in multiple chunks to reduce memory |
302
|
|
|
* If no callback is passed and a self::$processedCallback is set, the latter is used. |
303
|
|
|
* |
304
|
|
|
* @param callable $callback An optional alternative callback routine executed |
305
|
|
|
* after all queue items are processed. |
306
|
|
|
* @return boolean Success / Failure |
307
|
|
|
*/ |
308
|
|
|
public function processQueue(callable $callback = null) |
309
|
|
|
{ |
310
|
|
|
if (!is_callable($callback)) { |
311
|
|
|
$callback = $this->processedCallback; |
312
|
|
|
} |
313
|
|
|
|
314
|
|
|
if ($this->chunkSize() > 0) { |
315
|
|
|
$totalChunks = $this->totalChunks(); |
316
|
|
|
for ($i = 0; $i <= $totalChunks; $i++) { |
317
|
|
|
$queuedItems = $this->loadQueueItems(); |
318
|
|
|
$this->processItems($queuedItems); |
319
|
|
|
} |
320
|
|
|
} else { |
321
|
|
|
$queuedItems = $this->loadQueueItems(); |
322
|
|
|
$this->processItems($queuedItems); |
323
|
|
|
} |
324
|
|
|
|
325
|
|
|
if (is_callable($callback)) { |
326
|
|
|
$callback($this->successItems, $this->failedItems, $this->skippedItems); |
327
|
|
|
} |
328
|
|
|
|
329
|
|
|
$summary = sprintf( |
330
|
|
|
'%d successful, %d skipped, %d failed', |
331
|
|
|
count($this->successItems), |
332
|
|
|
count($this->failedItems), |
333
|
|
|
count($this->skippedItems) |
334
|
|
|
); |
335
|
|
|
|
336
|
|
|
$queueId = $this->queueId(); |
337
|
|
|
if ($queueId) { |
338
|
|
|
$this->logger->notice(sprintf( |
339
|
|
|
'Completed processing of queue [%s]: %s', |
340
|
|
|
$queueId, |
341
|
|
|
$summary |
342
|
|
|
), [ |
343
|
|
|
'manager' => get_called_class(), |
344
|
|
|
]); |
345
|
|
|
} else { |
346
|
|
|
$this->logger->notice(sprintf( |
347
|
|
|
'Completed processing of queues: %s', |
348
|
|
|
$summary |
349
|
|
|
), [ |
350
|
|
|
'manager' => get_called_class(), |
351
|
|
|
]); |
352
|
|
|
} |
353
|
|
|
|
354
|
|
|
return true; |
355
|
|
|
} |
356
|
|
|
|
357
|
|
|
/** |
358
|
|
|
* @param mixed $queuedItems The items to process. |
359
|
|
|
* @return void |
360
|
|
|
*/ |
361
|
|
|
private function processItems($queuedItems) |
362
|
|
|
{ |
363
|
|
|
foreach ($queuedItems as $q) { |
364
|
|
|
try { |
365
|
|
|
$result = $q->process( |
366
|
|
|
$this->itemCallback, |
367
|
|
|
$this->itemSuccessCallback, |
368
|
|
|
$this->itemFailureCallback |
369
|
|
|
); |
370
|
|
|
if ($result === true) { |
371
|
|
|
$this->successItems[] = $q; |
372
|
|
|
} elseif ($result === false) { |
373
|
|
|
$this->failedItems[] = $q; |
374
|
|
|
} else { |
375
|
|
|
$this->skippedItems[] = $q; |
376
|
|
|
} |
377
|
|
|
} catch (Exception $e) { |
378
|
|
|
$this->logger->error( |
379
|
|
|
sprintf('Could not process a queue item: %s', $e->getMessage()), |
380
|
|
|
[ |
381
|
|
|
'manager' => get_called_class(), |
382
|
|
|
'queueId' => $q['queueId'], |
383
|
|
|
'itemId' => $q['id'], |
384
|
|
|
] |
385
|
|
|
); |
386
|
|
|
$this->failedItems[] = $q; |
387
|
|
|
continue; |
388
|
|
|
} |
389
|
|
|
|
390
|
|
|
$this->throttle(); |
391
|
|
|
} |
392
|
|
|
} |
393
|
|
|
|
394
|
|
|
/** |
395
|
|
|
* Throttle processing of items. |
396
|
|
|
* |
397
|
|
|
* @return void |
398
|
|
|
*/ |
399
|
|
|
private function throttle() |
400
|
|
|
{ |
401
|
|
|
if ($this->rate > 0) { |
402
|
|
|
usleep(1000000 / $this->rate); |
403
|
|
|
} |
404
|
|
|
} |
405
|
|
|
|
406
|
|
|
/** |
407
|
|
|
* Create a queue items collection loader. |
408
|
|
|
* |
409
|
|
|
* @return CollectionLoader |
410
|
|
|
*/ |
411
|
|
|
public function createQueueItemsLoader() |
412
|
|
|
{ |
413
|
|
|
$loader = new CollectionLoader([ |
414
|
|
|
'logger' => $this->logger, |
415
|
|
|
'factory' => $this->queueItemFactory(), |
416
|
|
|
'model' => $this->queueItemProto(), |
417
|
|
|
]); |
418
|
|
|
|
419
|
|
|
return $loader; |
420
|
|
|
} |
421
|
|
|
|
422
|
|
|
/** |
423
|
|
|
* Configure the queue items collection loader. |
424
|
|
|
* |
425
|
|
|
* @param CollectionLoader $loader The collection loader to prepare. |
426
|
|
|
* @return void |
427
|
|
|
*/ |
428
|
|
|
protected function configureQueueItemsLoader(CollectionLoader $loader) |
429
|
|
|
{ |
430
|
|
|
$loader->addFilter([ |
431
|
|
|
'property' => 'processed', |
432
|
|
|
'value' => 0, |
433
|
|
|
]); |
434
|
|
|
$loader->addFilter([ |
435
|
|
|
'property' => 'processing_date', |
436
|
|
|
'operator' => '<', |
437
|
|
|
'value' => date('Y-m-d H:i:s'), |
438
|
|
|
]); |
439
|
|
|
|
440
|
|
|
$queueId = $this->queueId(); |
441
|
|
|
if ($queueId) { |
442
|
|
|
$loader->addFilter([ |
443
|
|
|
'property' => 'queue_id', |
444
|
|
|
'value' => $queueId, |
445
|
|
|
]); |
446
|
|
|
} |
447
|
|
|
|
448
|
|
|
$loader->addOrder([ |
449
|
|
|
'property' => 'queued_date', |
450
|
|
|
'mode' => 'asc', |
451
|
|
|
]); |
452
|
|
|
|
453
|
|
|
$loader->isConfigured = true; |
|
|
|
|
454
|
|
|
} |
455
|
|
|
|
456
|
|
|
/** |
457
|
|
|
* Retrieve the items of the current queue. |
458
|
|
|
* |
459
|
|
|
* @return \Charcoal\Model\Collection|array |
460
|
|
|
*/ |
461
|
|
|
public function loadQueueItems() |
462
|
|
|
{ |
463
|
|
|
$loader = $this->createQueueItemsLoader(); |
464
|
|
|
$this->configureQueueItemsLoader($loader); |
465
|
|
|
|
466
|
|
|
if ($this->chunkSize() > 0) { |
467
|
|
|
$loader->setNumPerPage($this->chunkSize()); |
468
|
|
|
} elseif ($this->limit() > 0) { |
469
|
|
|
$loader->setNumPerPage($this->limit()); |
470
|
|
|
} |
471
|
|
|
|
472
|
|
|
$queued = $loader->load(); |
473
|
|
|
return $queued; |
474
|
|
|
} |
475
|
|
|
|
476
|
|
|
/** |
477
|
|
|
* Retrieve the total of queued items. |
478
|
|
|
* |
479
|
|
|
* @return integer |
480
|
|
|
*/ |
481
|
|
|
public function totalQueuedItems() |
482
|
|
|
{ |
483
|
|
|
$loader = $this->createQueueItemsLoader(); |
484
|
|
|
$this->configureQueueItemsLoader($loader); |
485
|
|
|
|
486
|
|
|
$total = $loader->loadCount(); |
487
|
|
|
return $total; |
488
|
|
|
} |
489
|
|
|
|
490
|
|
|
/** |
491
|
|
|
* Retrieve the number of chunks to process. |
492
|
|
|
* |
493
|
|
|
* @return integer |
494
|
|
|
*/ |
495
|
|
|
public function totalChunks() |
496
|
|
|
{ |
497
|
|
|
$total = $this->totalQueuedItems(); |
498
|
|
|
|
499
|
|
|
$limit = $this->limit(); |
500
|
|
|
if ($limit > 0 && $total > $limit) { |
501
|
|
|
$total = $limit; |
502
|
|
|
} |
503
|
|
|
|
504
|
|
|
return (int)ceil($total / $this->chunkSize()); |
505
|
|
|
} |
506
|
|
|
|
507
|
|
|
/** |
508
|
|
|
* Retrieve the queue item prototype model. |
509
|
|
|
* |
510
|
|
|
* @return QueueItemInterface |
511
|
|
|
*/ |
512
|
|
|
public function queueItemProto() |
513
|
|
|
{ |
514
|
|
|
return $this->queueItemFactory()->get($this->getQueueItemClass()); |
515
|
|
|
} |
516
|
|
|
|
517
|
|
|
/** |
518
|
|
|
* Retrieve the class name of the queue item model. |
519
|
|
|
* |
520
|
|
|
* @return string |
521
|
|
|
*/ |
522
|
|
|
abstract public function getQueueItemClass(); |
523
|
|
|
|
524
|
|
|
/** |
525
|
|
|
* @return FactoryInterface |
526
|
|
|
*/ |
527
|
|
|
protected function queueItemFactory() |
528
|
|
|
{ |
529
|
|
|
return $this->queueItemFactory; |
530
|
|
|
} |
531
|
|
|
|
532
|
|
|
/** |
533
|
|
|
* @param FactoryInterface $factory The factory used to create queue items. |
534
|
|
|
* @return self |
535
|
|
|
*/ |
536
|
|
|
private function setQueueItemFactory(FactoryInterface $factory) |
537
|
|
|
{ |
538
|
|
|
$this->queueItemFactory = $factory; |
539
|
|
|
return $this; |
540
|
|
|
} |
541
|
|
|
} |
542
|
|
|
|
An attempt at access to an undefined property has been detected. This may either be a typographical error or the property has been renamed but there are still references to its old name.
If you really want to allow access to undefined properties, you can define magic methods to allow access. See the php core documentation on Overloading.