AbstractQueueManager   B
last analyzed

Complexity

Total Complexity 46

Size/Duplication

Total Lines 513
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 3

Importance

Changes 0
Metric Value
wmc 46
lcom 1
cbo 3
dl 0
loc 513
rs 8.72
c 0
b 0
f 0

26 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 17 4
A setData() 0 8 3
A setQueueId() 0 5 1
A queueId() 0 4 1
A setRate() 0 5 1
A rate() 0 4 1
A setLimit() 0 5 1
A limit() 0 4 1
A setChunkSize() 0 5 1
A chunkSize() 0 4 1
A setItemCallback() 0 5 1
A setItemSuccessCallback() 0 5 1
A setItemFailureCallback() 0 5 1
A setProcessedCallback() 0 5 1
B processQueue() 0 48 6
B processItems() 0 47 6
A throttle() 0 6 2
A createQueueItemsLoader() 0 10 1
A configureQueueItemsLoader() 0 30 2
A loadQueueItems() 0 13 3
A totalQueuedItems() 0 7 1
A totalChunks() 0 11 3
A queueItemProto() 0 4 1
getQueueItemClass() 0 1 ?
A queueItemFactory() 0 4 1
A setQueueItemFactory() 0 5 1

How to fix   Complexity   

Complex Class

Complex classes like AbstractQueueManager often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use AbstractQueueManager, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
namespace Charcoal\Queue;
4
5
use Exception;
6
7
// From PSR-3
8
use Psr\Log\LoggerAwareInterface;
9
use Psr\Log\LoggerAwareTrait;
10
11
// From 'charcoal-core'
12
use Charcoal\Loader\CollectionLoader;
13
14
// From 'charcoal-factory'
15
use Charcoal\Factory\FactoryInterface;
16
17
/**
18
 * Abstract Queue Manager
19
 *
20
 * The queue manager is used to load queued items and batch-process them.
21
 *
22
 * ## Loading queued items
23
 *
24
 * If a "queue_id" is specified, only the item for this specific queue will be loaded.
25
 * Otherwise, all unprocessed queue items will be processed.
26
 *
27
 * ## Type of queue items
28
 *
29
 * The type of queue items can be set in extended concrete class with the
30
 * `queue_item_proto()` method. This method should return a QueueItemInterface instance.
31
 *
32
 * ## Callbacks
33
 *
34
 * There are 4 available callback methods that can be set:
35
 *
36
 * - `item_callback`
37
 *   - Called after an item has been processed.
38
 *   - Arguments: `QueueModelInterface $item`
39
 * - `item_success_callback`
40
 * - `item_failure_callback`
41
 * - `processed_callback`
42
 *   - Called when the entire queue has been processed
43
 *   - Arguments: `array $success`, `array $failures`
44
 */
45
abstract class AbstractQueueManager implements
46
    QueueManagerInterface,
47
    LoggerAwareInterface
48
{
49
    use LoggerAwareTrait;
50
51
    /**
52
     * The queue processing rate (throttle), in items per second.
53
     *
54
     * @var integer
55
     */
56
    private $rate = 0;
57
58
    /**
59
     * The batch limit.
60
     *
61
     * @var integer
62
     */
63
    private $limit = 0;
64
65
    /**
66
     * The chunk size to batch the queue with.
67
     *
68
     * @var integer
69
     */
70
    private $chunkSize = 0;
71
72
    /**
73
     * The queue ID.
74
     *
75
     * If set, then it will load only the items from this queue.
76
     * If NULL, load *all* queued items.
77
     *
78
     * @var mixed
79
     */
80
    private $queueId;
81
82
    /**
83
     * Items that were successfully processed
84
     *
85
     * @var array
86
     */
87
    private $successItems = [];
88
89
    /**
90
     * Item that failed to process
91
     *
92
     * @var array
93
     */
94
    private $failedItems = [];
95
96
    /**
97
     * Items that were skipped during the processing
98
     *
99
     * @var array
100
     */
101
    private $skippedItems = [];
102
103
    /**
104
     * The callback routine when an item is processed (whether resolved or rejected).
105
     *
106
     * @var callable $itemCallback
107
     */
108
    private $itemCallback;
109
110
    /**
111
     * The callback routine when the item is resolved.
112
     *
113
     * @var callable $itemSuccessCallback
114
     */
115
    private $itemSuccessCallback;
116
117
    /**
118
     * The callback routine when the item is rejected.
119
     *
120
     * @var callable $itemFailureCallback
121
     */
122
    private $itemFailureCallback;
123
124
    /**
125
     * The callback routine when the queue is processed.
126
     *
127
     * @var callable $processedCallback
128
     */
129
    private $processedCallback;
130
131
    /**
132
     * @var FactoryInterface $queueItemFactory
133
     */
134
    private $queueItemFactory;
135
136
    /**
137
     * Construct new queue manager.
138
     *
139
     * @param array $data Dependencies and settings.
140
     */
141
    public function __construct(array $data = [])
142
    {
143
        $this->setLogger($data['logger']);
144
        $this->setQueueItemFactory($data['queue_item_factory']);
145
146
        if (isset($data['rate'])) {
147
            $this->rate = intval($data['rate']);
148
        }
149
150
        if (isset($data['limit'])) {
151
            $this->limit = intval($data['limit']);
152
        }
153
154
        if (isset($data['chunkSize'])) {
155
            $this->chunkSize = intval($data['chunkSize']);
156
        }
157
    }
158
159
    /**
160
     * Set the manager's data.
161
     *
162
     * @param array $data The queue data to set.
163
     * @return AbstractQueueManager Chainable
164
     */
165
    public function setData(array $data)
166
    {
167
        if (isset($data['queue_id']) && $data['queue_id']) {
168
            $this->setQueueId($data['queue_id']);
169
        }
170
171
        return $this;
172
    }
173
174
    /**
175
     * Set the queue's ID.
176
     *
177
     * @param mixed $id The unique queue identifier.
178
     * @return self
179
     */
180
    public function setQueueId($id)
181
    {
182
        $this->queueId = $id;
183
        return $this;
184
    }
185
186
    /**
187
     * Get the queue's ID.
188
     *
189
     * @return mixed
190
     */
191
    public function queueId()
192
    {
193
        return $this->queueId;
194
    }
195
196
    /**
197
     * @param integer $rate The throttling / processing rate, in items per second.
198
     * @return self
199
     */
200
    public function setRate($rate)
201
    {
202
        $this->rate = intval($rate);
203
        return $this;
204
    }
205
206
    /**
207
     * @return integer
208
     */
209
    public function rate()
210
    {
211
        return $this->rate;
212
    }
213
214
    /**
215
     * @param integer $limit The maximum number of items to load.
216
     * @return self
217
     */
218
    public function setLimit($limit)
219
    {
220
        $this->limit = intval($limit);
221
        return $this;
222
    }
223
224
    /**
225
     * @return integer
226
     */
227
    public function limit()
228
    {
229
        return $this->limit;
230
    }
231
232
    /**
233
     * @param integer $chunkSize The size of the chunk of items to process at the same time in the queue.
234
     * @return self
235
     */
236
    public function setChunkSize($chunkSize)
237
    {
238
        $this->chunkSize = intval($chunkSize);
239
        return $this;
240
    }
241
242
    /**
243
     * @return integer
244
     */
245
    public function chunkSize()
246
    {
247
        return $this->chunkSize;
248
    }
249
250
    /**
251
     * Set the callback routine when an item is processed.
252
     *
253
     * @param callable $callback A item callback routine.
254
     * @return self
255
     */
256
    public function setItemCallback(callable $callback)
257
    {
258
        $this->itemCallback = $callback;
259
        return $this;
260
    }
261
262
    /**
263
     * Set the callback routine when the item is resolved.
264
     *
265
     * @param callable $callback A item callback routine.
266
     * @return self
267
     */
268
    public function setItemSuccessCallback(callable $callback)
269
    {
270
        $this->itemSuccessCallback = $callback;
271
        return $this;
272
    }
273
274
    /**
275
     * Set the callback routine when the item is rejected.
276
     *
277
     * @param callable $callback A item callback routine.
278
     * @return self
279
     */
280
    public function setItemFailureCallback(callable $callback)
281
    {
282
        $this->itemSuccessCallback = $callback;
283
        return $this;
284
    }
285
286
    /**
287
     * Set the callback routine when the queue is processed.
288
     *
289
     * @param callable $callback A queue callback routine.
290
     * @return self
291
     */
292
    public function setProcessedCallback(callable $callback)
293
    {
294
        $this->processedCallback = $callback;
295
        return $this;
296
    }
297
298
    /**
299
     * Process the queue.
300
     *
301
     * It can be process in a single batch or in multiple chunks to reduce memory
302
     * If no callback is passed and a self::$processedCallback is set, the latter is used.
303
     *
304
     * @param  callable $callback An optional alternative callback routine executed
305
     *                            after all queue items are processed.
306
     * @return boolean  Success / Failure
307
     */
308
    public function processQueue(callable $callback = null)
309
    {
310
        if (!is_callable($callback)) {
311
            $callback = $this->processedCallback;
312
        }
313
314
        if ($this->chunkSize() > 0) {
315
            $totalChunks = $this->totalChunks();
316
            for ($i = 0; $i < $totalChunks; $i++) {
317
                $queuedItems = $this->loadQueueItems();
318
                $this->processItems($queuedItems);
319
            }
320
        } else {
321
            $queuedItems = $this->loadQueueItems();
322
            $this->processItems($queuedItems);
323
        }
324
325
        if (is_callable($callback)) {
326
            $callback($this->successItems, $this->failedItems, $this->skippedItems);
327
        }
328
329
        $summary = sprintf(
330
            '%d successful, %d skipped, %d failed',
331
            count($this->successItems),
332
            count($this->failedItems),
333
            count($this->skippedItems)
334
        );
335
336
        $queueId = $this->queueId();
337
        if ($queueId) {
338
            $this->logger->notice(sprintf(
339
                'Completed processing of queue [%s]: %s',
340
                $queueId,
341
                $summary
342
            ), [
343
                'manager' => get_called_class(),
344
            ]);
345
        } else {
346
            $this->logger->notice(sprintf(
347
                'Completed processing of queues: %s',
348
                $summary
349
            ), [
350
                'manager' => get_called_class(),
351
            ]);
352
        }
353
354
        return true;
355
    }
356
357
    /**
358
     * @param mixed $queuedItems The items to process.
359
     * @return void
360
     */
361
    private function processItems($queuedItems)
362
    {
363
        /** @var QueueItemInterface $q */
364
        foreach ($queuedItems as $q) {
365
            try {
366
                if ($q->processed()) {
367
                    // Do not process twice, ever.
368
                    $this->skippedItems[] = $q;
369
                    continue;
370
                }
371
                // Ensuring a queue item won't ever be processed twice
372
                $q->setProcessed(true)
373
                  ->setProcessedDate('now')
374
                  ->update([
375
                      'processed',
376
                      'processed_date',
377
                  ]);
378
379
                $result = $q->process(
380
                    $this->itemCallback,
381
                    $this->itemSuccessCallback,
382
                    $this->itemFailureCallback
383
                );
384
385
                if ($result === true) {
386
                    $this->successItems[] = $q;
387
                } elseif ($result === false) {
388
                    $this->failedItems[] = $q;
389
                } else {
390
                    $this->skippedItems[] = $q;
391
                }
392
            } catch (Exception $e) {
393
                $this->logger->error(
394
                    sprintf('Could not process a queue item: %s', $e->getMessage()),
395
                    [
396
                        'manager' => get_called_class(),
397
                        'queueId' => $q['queueId'],
398
                        'itemId'  => $q['id'],
399
                    ]
400
                );
401
                $this->failedItems[] = $q;
402
                continue;
403
            }
404
405
            $this->throttle();
406
        }
407
    }
408
409
    /**
410
     * Throttle processing of items.
411
     *
412
     * @return void
413
     */
414
    private function throttle()
415
    {
416
        if ($this->rate > 0) {
417
            usleep(1000000 / $this->rate);
418
        }
419
    }
420
421
    /**
422
     * Create a queue items collection loader.
423
     *
424
     * @return CollectionLoader
425
     */
426
    public function createQueueItemsLoader()
427
    {
428
        $loader = new CollectionLoader([
429
            'logger'  => $this->logger,
430
            'factory' => $this->queueItemFactory(),
431
            'model'   => $this->queueItemProto(),
432
        ]);
433
434
        return $loader;
435
    }
436
437
    /**
438
     * Configure the queue items collection loader.
439
     *
440
     * @param  CollectionLoader $loader The collection loader to prepare.
441
     * @return void
442
     */
443
    protected function configureQueueItemsLoader(CollectionLoader $loader)
444
    {
445
        $loader->addFilter([
446
            'property' => 'processed',
447
            'value'    => 0,
448
        ]);
449
        $loader->addFilter([
450
            'property' => 'processing_date',
451
            'operator' => '<',
452
            'value'    => date('Y-m-d H:i:s'),
453
        ]);
454
        $loader->addFilter([
455
            'condition' => '(expiry_date > NOW() OR expiry_date IS NULL)',
456
        ]);
457
458
        $queueId = $this->queueId();
459
        if ($queueId) {
460
            $loader->addFilter([
461
                'property' => 'queue_id',
462
                'value'    => $queueId,
463
            ]);
464
        }
465
466
        $loader->addOrder([
467
            'property' => 'queued_date',
468
            'mode'     => 'asc',
469
        ]);
470
471
        $loader->isConfigured = true;
0 ignored issues
show
Bug introduced by
The property isConfigured does not seem to exist in Charcoal\Loader\CollectionLoader.

An attempt at access to an undefined property has been detected. This may either be a typographical error or the property has been renamed but there are still references to its old name.

If you really want to allow access to undefined properties, you can define magic methods to allow access. See the php core documentation on Overloading.

Loading history...
472
    }
473
474
    /**
475
     * Retrieve the items of the current queue.
476
     *
477
     * @return \Charcoal\Model\Collection|array
478
     */
479
    public function loadQueueItems()
480
    {
481
        $loader = $this->createQueueItemsLoader();
482
        $this->configureQueueItemsLoader($loader);
483
484
        if ($this->chunkSize() > 0) {
485
            $loader->setNumPerPage($this->chunkSize());
486
        } elseif ($this->limit() > 0) {
487
            $loader->setNumPerPage($this->limit());
488
        }
489
490
        return $loader->load();
491
    }
492
493
    /**
494
     * Retrieve the total of queued items.
495
     *
496
     * @return integer
497
     */
498
    public function totalQueuedItems()
499
    {
500
        $loader = $this->createQueueItemsLoader();
501
        $this->configureQueueItemsLoader($loader);
502
503
        return $loader->loadCount();
504
    }
505
506
    /**
507
     * Retrieve the number of chunks to process.
508
     *
509
     * @return integer
510
     */
511
    public function totalChunks()
512
    {
513
        $total = $this->totalQueuedItems();
514
515
        $limit = $this->limit();
516
        if ($limit > 0 && $total > $limit) {
517
            $total = $limit;
518
        }
519
520
        return (int)ceil($total / $this->chunkSize());
521
    }
522
523
    /**
524
     * Retrieve the queue item prototype model.
525
     *
526
     * @return QueueItemInterface
527
     */
528
    public function queueItemProto()
529
    {
530
        return $this->queueItemFactory()->get($this->getQueueItemClass());
531
    }
532
533
    /**
534
     * Retrieve the class name of the queue item model.
535
     *
536
     * @return string
537
     */
538
    abstract public function getQueueItemClass();
539
540
    /**
541
     * @return FactoryInterface
542
     */
543
    protected function queueItemFactory()
544
    {
545
        return $this->queueItemFactory;
546
    }
547
548
    /**
549
     * @param FactoryInterface $factory The factory used to create queue items.
550
     * @return self
551
     */
552
    private function setQueueItemFactory(FactoryInterface $factory)
553
    {
554
        $this->queueItemFactory = $factory;
555
        return $this;
556
    }
557
}
558