Passed
Push — master ( b58afd...9a1e08 )
by Mathieu
11:19 queued 11s
created

AbstractQueueManager::configureQueueItemsLoader()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 27

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 27
rs 9.488
c 0
b 0
f 0
cc 2
nc 2
nop 1
1
<?php
2
3
namespace Charcoal\Queue;
4
5
use Exception;
6
7
// From PSR-3
8
use Psr\Log\LoggerAwareInterface;
9
use Psr\Log\LoggerAwareTrait;
10
11
// From 'charcoal-core'
12
use Charcoal\Loader\CollectionLoader;
13
14
// From 'charcoal-factory'
15
use Charcoal\Factory\FactoryInterface;
16
17
/**
18
 * Abstract Queue Manager
19
 *
20
 * The queue manager is used to load queued items and batch-process them.
21
 *
22
 * ## Loading queued items
23
 *
24
 * If a "queue_id" is specified, only the item for this specific queue will be loaded.
25
 * Otherwise, all unprocessed queue items will be processed.
26
 *
27
 * ## Type of queue items
28
 *
29
 * The type of queue items can be set in extended concrete class with the
30
 * `queue_item_proto()` method. This method should return a QueueItemInterface instance.
31
 *
32
 * ## Callbacks
33
 *
34
 * There are 4 available callback methods that can be set:
35
 *
36
 * - `item_callback`
37
 *   - Called after an item has been processed.
38
 *   - Arguments: `QueueModelInterface $item`
39
 * - `item_success_callback`
40
 * - `item_failure_callback`
41
 * - `processed_callback`
42
 *   - Called when the entire queue has been processed
43
 *   - Arguments: `array $success`, `array $failures`
44
 */
45
abstract class AbstractQueueManager implements
46
    QueueManagerInterface,
47
    LoggerAwareInterface
48
{
49
    use LoggerAwareTrait;
50
51
    /**
52
     * The queue processing rate (throttle), in items per second.
53
     *
54
     * @var integer
55
     */
56
    private $rate = 0;
57
58
    /**
59
     * The batch limit.
60
     *
61
     * @var integer
62
     */
63
    private $limit = 0;
64
65
    /**
66
     * The chunk size to batch the queue with.
67
     *
68
     * @var integer
69
     */
70
    private $chunkSize = 0;
71
72
    /**
73
     * The queue ID.
74
     *
75
     * If set, then it will load only the items from this queue.
76
     * If NULL, load *all* queued items.
77
     *
78
     * @var mixed
79
     */
80
    private $queueId;
81
82
    /**
83
     * Items that were successfully processed
84
     *
85
     * @var array
86
     */
87
    private $successItems = [];
88
89
    /**
90
     * Item that failed to process
91
     *
92
     * @var array
93
     */
94
    private $failedItems = [];
95
96
    /**
97
     * Items that were skipped during the processing
98
     *
99
     * @var array
100
     */
101
    private $skippedItems = [];
102
103
    /**
104
     * The callback routine when an item is processed (whether resolved or rejected).
105
     *
106
     * @var callable $itemCallback
107
     */
108
    private $itemCallback;
109
110
    /**
111
     * The callback routine when the item is resolved.
112
     *
113
     * @var callable $itemSuccessCallback
114
     */
115
    private $itemSuccessCallback;
116
117
    /**
118
     * The callback routine when the item is rejected.
119
     *
120
     * @var callable $itemFailureCallback
121
     */
122
    private $itemFailureCallback;
123
124
    /**
125
     * The callback routine when the queue is processed.
126
     *
127
     * @var callable $processedCallback
128
     */
129
    private $processedCallback;
130
131
    /**
132
     * @var FactoryInterface $queueItemFactory
133
     */
134
    private $queueItemFactory;
135
136
    /**
137
     * Construct new queue manager.
138
     *
139
     * @param array $data Dependencies and settings.
140
     */
141
    public function __construct(array $data = [])
142
    {
143
        $this->setLogger($data['logger']);
144
        $this->setQueueItemFactory($data['queue_item_factory']);
145
146
        if (isset($data['rate'])) {
147
            $this->rate = intval($data['rate']);
148
        }
149
150
        if (isset($data['limit'])) {
151
            $this->limit = intval($data['limit']);
152
        }
153
154
        if (isset($data['chunkSize'])) {
155
            $this->chunkSize = intval($data['chunkSize']);
156
        }
157
    }
158
159
    /**
160
     * Set the manager's data.
161
     *
162
     * @param array $data The queue data to set.
163
     * @return AbstractQueueManager Chainable
164
     */
165
    public function setData(array $data)
166
    {
167
        if (isset($data['queue_id']) && $data['queue_id']) {
168
            $this->setQueueId($data['queue_id']);
169
        }
170
171
        return $this;
172
    }
173
174
    /**
175
     * Set the queue's ID.
176
     *
177
     * @param mixed $id The unique queue identifier.
178
     * @return self
179
     */
180
    public function setQueueId($id)
181
    {
182
        $this->queueId = $id;
183
        return $this;
184
    }
185
186
    /**
187
     * Get the queue's ID.
188
     *
189
     * @return mixed
190
     */
191
    public function queueId()
192
    {
193
        return $this->queueId;
194
    }
195
196
    /**
197
     * @param integer $rate The throttling / processing rate, in items per second.
198
     * @return self
199
     */
200
    public function setRate($rate)
201
    {
202
        $this->rate = intval($rate);
203
        return $this;
204
    }
205
206
    /**
207
     * @return integer
208
     */
209
    public function rate()
210
    {
211
        return $this->rate;
212
    }
213
214
    /**
215
     * @param integer $limit The maximum number of items to load.
216
     * @return self
217
     */
218
    public function setLimit($limit)
219
    {
220
        $this->limit = intval($limit);
221
        return $this;
222
    }
223
224
    /**
225
     * @return integer
226
     */
227
    public function limit()
228
    {
229
        return $this->limit;
230
    }
231
232
    /**
233
     * @param integer $chunkSize The size of the chunk of items to process at the same time in the queue.
234
     * @return self
235
     */
236
    public function setChunkSize($chunkSize)
237
    {
238
        $this->chunkSize = intval($chunkSize);
239
        return $this;
240
    }
241
242
    /**
243
     * @return integer
244
     */
245
    public function chunkSize()
246
    {
247
        return $this->chunkSize;
248
    }
249
250
    /**
251
     * Set the callback routine when an item is processed.
252
     *
253
     * @param callable $callback A item callback routine.
254
     * @return self
255
     */
256
    public function setItemCallback(callable $callback)
257
    {
258
        $this->itemCallback = $callback;
259
        return $this;
260
    }
261
262
    /**
263
     * Set the callback routine when the item is resolved.
264
     *
265
     * @param callable $callback A item callback routine.
266
     * @return self
267
     */
268
    public function setItemSuccessCallback(callable $callback)
269
    {
270
        $this->itemSuccessCallback = $callback;
271
        return $this;
272
    }
273
274
    /**
275
     * Set the callback routine when the item is rejected.
276
     *
277
     * @param callable $callback A item callback routine.
278
     * @return self
279
     */
280
    public function setItemFailureCallback(callable $callback)
281
    {
282
        $this->itemSuccessCallback = $callback;
283
        return $this;
284
    }
285
286
    /**
287
     * Set the callback routine when the queue is processed.
288
     *
289
     * @param callable $callback A queue callback routine.
290
     * @return self
291
     */
292
    public function setProcessedCallback(callable $callback)
293
    {
294
        $this->processedCallback = $callback;
295
        return $this;
296
    }
297
298
    /**
299
     * Process the queue.
300
     *
301
     * It can be process in a single batch or in multiple chunks to reduce memory
302
     * If no callback is passed and a self::$processedCallback is set, the latter is used.
303
     *
304
     * @param  callable $callback An optional alternative callback routine executed
305
     *                            after all queue items are processed.
306
     * @return boolean  Success / Failure
307
     */
308
    public function processQueue(callable $callback = null)
309
    {
310
        if (!is_callable($callback)) {
311
            $callback = $this->processedCallback;
312
        }
313
314
        if ($this->chunkSize() > 0) {
315
            $totalChunks = $this->totalChunks();
316
            for ($i = 0; $i <= $totalChunks; $i++) {
317
                $queuedItems = $this->loadQueueItems();
318
                $this->processItems($queuedItems);
319
            }
320
        } else {
321
            $queuedItems = $this->loadQueueItems();
322
            $this->processItems($queuedItems);
323
        }
324
325
        if (is_callable($callback)) {
326
            $callback($this->successItems, $this->failedItems, $this->skippedItems);
327
        }
328
329
        $summary = sprintf(
330
            '%d successful, %d skipped, %d failed',
331
            count($this->successItems),
332
            count($this->failedItems),
333
            count($this->skippedItems)
334
        );
335
336
        $queueId = $this->queueId();
337
        if ($queueId) {
338
            $this->logger->notice(sprintf(
339
                'Completed processing of queue [%s]: %s',
340
                $queueId,
341
                $summary
342
            ), [
343
                'manager' => get_called_class(),
344
            ]);
345
        } else {
346
            $this->logger->notice(sprintf(
347
                'Completed processing of queues: %s',
348
                $summary
349
            ), [
350
                'manager' => get_called_class(),
351
            ]);
352
        }
353
354
        return true;
355
    }
356
357
    /**
358
     * @param mixed $queuedItems The items to process.
359
     * @return void
360
     */
361
    private function processItems($queuedItems)
362
    {
363
        foreach ($queuedItems as $q) {
364
            try {
365
                $result = $q->process(
366
                    $this->itemCallback,
367
                    $this->itemSuccessCallback,
368
                    $this->itemFailureCallback
369
                );
370
                if ($result === true) {
371
                    $this->successItems[] = $q;
372
                } elseif ($result === false) {
373
                    $this->failedItems[] = $q;
374
                } else {
375
                    $this->skippedItems[] = $q;
376
                }
377
            } catch (Exception $e) {
378
                $this->logger->error(
379
                    sprintf('Could not process a queue item: %s', $e->getMessage()),
380
                    [
381
                        'manager' => get_called_class(),
382
                        'queueId' => $q['queueId'],
383
                        'itemId'  => $q['id'],
384
                    ]
385
                );
386
                $this->failedItems[] = $q;
387
                continue;
388
            }
389
390
            $this->throttle();
391
        }
392
    }
393
394
    /**
395
     * Throttle processing of items.
396
     *
397
     * @return void
398
     */
399
    private function throttle()
400
    {
401
        if ($this->rate > 0) {
402
            usleep(1000000 / $this->rate);
403
        }
404
    }
405
406
    /**
407
     * Create a queue items collection loader.
408
     *
409
     * @return CollectionLoader
410
     */
411
    public function createQueueItemsLoader()
412
    {
413
        $loader = new CollectionLoader([
414
            'logger'  => $this->logger,
415
            'factory' => $this->queueItemFactory(),
416
            'model'   => $this->queueItemProto(),
417
        ]);
418
419
        return $loader;
420
    }
421
422
    /**
423
     * Configure the queue items collection loader.
424
     *
425
     * @param  CollectionLoader $loader The collection loader to prepare.
426
     * @return void
427
     */
428
    protected function configureQueueItemsLoader(CollectionLoader $loader)
429
    {
430
        $loader->addFilter([
431
            'property' => 'processed',
432
            'value'    => 0,
433
        ]);
434
        $loader->addFilter([
435
            'property' => 'processing_date',
436
            'operator' => '<',
437
            'value'    => date('Y-m-d H:i:s'),
438
        ]);
439
440
        $queueId = $this->queueId();
441
        if ($queueId) {
442
            $loader->addFilter([
443
                'property' => 'queue_id',
444
                'value'    => $queueId,
445
            ]);
446
        }
447
448
        $loader->addOrder([
449
            'property' => 'queued_date',
450
            'mode'     => 'asc',
451
        ]);
452
453
        $loader->isConfigured = true;
0 ignored issues
show
Bug introduced by
The property isConfigured does not seem to exist in Charcoal\Loader\CollectionLoader.

An attempt at access to an undefined property has been detected. This may either be a typographical error or the property has been renamed but there are still references to its old name.

If you really want to allow access to undefined properties, you can define magic methods to allow access. See the php core documentation on Overloading.

Loading history...
454
    }
455
456
    /**
457
     * Retrieve the items of the current queue.
458
     *
459
     * @return \Charcoal\Model\Collection|array
460
     */
461
    public function loadQueueItems()
462
    {
463
        $loader = $this->createQueueItemsLoader();
464
        $this->configureQueueItemsLoader($loader);
465
466
        if ($this->chunkSize() > 0) {
467
            $loader->setNumPerPage($this->chunkSize());
468
        } elseif ($this->limit() > 0) {
469
            $loader->setNumPerPage($this->limit());
470
        }
471
472
        $queued = $loader->load();
473
        return $queued;
474
    }
475
476
    /**
477
     * Retrieve the total of queued items.
478
     *
479
     * @return integer
480
     */
481
    public function totalQueuedItems()
482
    {
483
        $loader = $this->createQueueItemsLoader();
484
        $this->configureQueueItemsLoader($loader);
485
486
        $total = $loader->loadCount();
487
        return $total;
488
    }
489
490
    /**
491
     * Retrieve the number of chunks to process.
492
     *
493
     * @return integer
494
     */
495
    public function totalChunks()
496
    {
497
        $total = $this->totalQueuedItems();
498
499
        $limit = $this->limit();
500
        if ($limit > 0 && $total > $limit) {
501
            $total = $limit;
502
        }
503
504
        return (int)ceil($total / $this->chunkSize());
505
    }
506
507
    /**
508
     * Retrieve the queue item prototype model.
509
     *
510
     * @return QueueItemInterface
511
     */
512
    public function queueItemProto()
513
    {
514
        return $this->queueItemFactory()->get($this->getQueueItemClass());
515
    }
516
517
    /**
518
     * Retrieve the class name of the queue item model.
519
     *
520
     * @return string
521
     */
522
    abstract public function getQueueItemClass();
523
524
    /**
525
     * @return FactoryInterface
526
     */
527
    protected function queueItemFactory()
528
    {
529
        return $this->queueItemFactory;
530
    }
531
532
    /**
533
     * @param FactoryInterface $factory The factory used to create queue items.
534
     * @return self
535
     */
536
    private function setQueueItemFactory(FactoryInterface $factory)
537
    {
538
        $this->queueItemFactory = $factory;
539
        return $this;
540
    }
541
}
542