Passed
Pull Request — master (#1)
by
unknown
18:00
created

AbstractQueueManager::totalQueuedItems()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 34

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 34
rs 9.376
c 0
b 0
f 0
cc 2
nc 2
nop 0
1
<?php
2
3
namespace Charcoal\Queue;
4
5
use Exception;
6
7
// From PSR-3
8
use Psr\Log\LoggerAwareInterface;
9
use Psr\Log\LoggerAwareTrait;
10
11
// From 'charcoal-core'
12
use Charcoal\Loader\CollectionLoader;
13
14
// From 'charcoal-factory'
15
use Charcoal\Factory\FactoryInterface;
16
17
/**
18
 * Abstract Queue Manager
19
 *
20
 * The queue manager is used to load queued items and batch-process them.
21
 *
22
 * ## Loading queued items
23
 *
24
 * If a "queue_id" is specified, only the item for this specific queue will be loaded.
25
 * Otherwise, all unprocessed queue items will be processed.
26
 *
27
 * ## Type of queue items
28
 *
29
 * The type of queue items can be set in extended concrete class with the
30
 * `queue_item_proto()` method. This method should return a QueueItemInterface instance.
31
 *
32
 * ## Callbacks
33
 *
34
 * There are 4 available callback methods that can be set:
35
 *
36
 * - `item_callback`
37
 *   - Called after an item has been processed.
38
 *   - Arguments: `QueueModelInterface $item`
39
 * - `item_success_callback`
40
 * - `item_failure_callback`
41
 * - `processed_callback`
42
 *   - Called when the entire queue has been processed
43
 *   - Arguments: `array $success`, `array $failures`
44
 */
45
abstract class AbstractQueueManager implements
46
    QueueManagerInterface,
47
    LoggerAwareInterface
48
{
49
    use LoggerAwareTrait;
50
51
    /**
52
     * The queue processing rate (throttle), in items per second.
53
     *
54
     * @var integer
55
     */
56
    private $rate = 0;
57
58
    /**
59
     * The batch limit.
60
     *
61
     * @var integer
62
     */
63
    private $limit = 0;
64
65
    /**
66
     * The chunk size to batch the queue with.
67
     *
68
     * @var integer|null
69
     */
70
    private $chunkSize = null;
71
72
    /**
73
     * The queue ID.
74
     *
75
     * If set, then it will load only the items from this queue.
76
     * If NULL, load *all* queued items.
77
     *
78
     * @var mixed
79
     */
80
    private $queueId;
81
82
    /**
83
     * Items that were successfully processed
84
     *
85
     * @var array
86
     */
87
    private $successItems = [];
88
89
    /**
90
     * Item that failed to process
91
     *
92
     * @var array
93
     */
94
    private $failedItems = [];
95
96
    /**
97
     * Items that were skipped during the processing
98
     *
99
     * @var array
100
     */
101
    private $skippedItems = [];
102
103
    /**
104
     * The callback routine when an item is processed (whether resolved or rejected).
105
     *
106
     * @var callable $itemCallback
107
     */
108
    private $itemCallback;
109
110
    /**
111
     * The callback routine when the item is resolved.
112
     *
113
     * @var callable $itemSuccessCallback
114
     */
115
    private $itemSuccessCallback;
116
117
    /**
118
     * The callback routine when the item is rejected.
119
     *
120
     * @var callable $itemFailureCallback
121
     */
122
    private $itemFailureCallback;
123
124
    /**
125
     * The callback routine when the queue is processed.
126
     *
127
     * @var callable $processedCallback
128
     */
129
    private $processedCallback;
130
131
    /**
132
     * @var FactoryInterface $queueItemFactory
133
     */
134
    private $queueItemFactory;
135
136
    /**
137
     * Construct new queue manager.
138
     *
139
     * @param array $data Dependencies and settings.
140
     */
141
    public function __construct(array $data = [])
142
    {
143
        $this->setLogger($data['logger']);
144
        $this->setQueueItemFactory($data['queue_item_factory']);
145
146
        if (isset($data['rate'])) {
147
            $this->rate = intval($data['rate']);
148
        }
149
150
        if (isset($data['limit'])) {
151
            $this->limit = intval($data['limit']);
152
        }
153
154
        if (isset($data['chunkSize'])) {
155
            $this->chunkSize = intval($data['chunkSize']);
156
        }
157
    }
158
159
    /**
160
     * Set the manager's data.
161
     *
162
     * @param array $data The queue data to set.
163
     * @return AbstractQueueManager Chainable
164
     */
165
    public function setData(array $data)
166
    {
167
        if (isset($data['queue_id']) && $data['queue_id']) {
168
            $this->setQueueId($data['queue_id']);
169
        }
170
171
        return $this;
172
    }
173
174
    /**
175
     * Set the queue's ID.
176
     *
177
     * @param mixed $id The unique queue identifier.
178
     * @return self
179
     */
180
    public function setQueueId($id)
181
    {
182
        $this->queueId = $id;
183
        return $this;
184
    }
185
186
    /**
187
     * Get the queue's ID.
188
     *
189
     * @return mixed
190
     */
191
    public function queueId()
192
    {
193
        return $this->queueId;
194
    }
195
196
    /**
197
     * @param integer $rate The throttling / processing rate, in items per second.
198
     * @return self
199
     */
200
    public function setRate($rate)
201
    {
202
        $this->rate = intval($rate);
203
        return $this;
204
    }
205
206
    /**
207
     * @return integer
208
     */
209
    public function rate()
210
    {
211
        return $this->rate;
212
    }
213
214
    /**
215
     * @param integer $limit The maximum number of items to load.
216
     * @return self
217
     */
218
    public function setLimit($limit)
219
    {
220
        $this->limit = intval($limit);
221
        return $this;
222
    }
223
224
    /**
225
     * @return integer
226
     */
227
    public function limit()
228
    {
229
        return $this->limit;
230
    }
231
232
    /**
233
     * @param integer $chunkSize The size of the chunk of items to process at the same time in the queue.
234
     * @return self
235
     */
236
    public function setChunkSize($chunkSize)
237
    {
238
        $this->chunkSize = intval($chunkSize);
239
        return $this;
240
    }
241
242
    /**
243
     * @return integer
244
     */
245
    public function chunkSize()
246
    {
247
        return $this->chunkSize;
248
    }
249
250
    /**
251
     * Set the callback routine when an item is processed.
252
     *
253
     * @param callable $callback A item callback routine.
254
     * @return self
255
     */
256
    public function setItemCallback(callable $callback)
257
    {
258
        $this->itemCallback = $callback;
259
        return $this;
260
    }
261
262
    /**
263
     * Set the callback routine when the item is resolved.
264
     *
265
     * @param callable $callback A item callback routine.
266
     * @return self
267
     */
268
    public function setItemSuccessCallback(callable $callback)
269
    {
270
        $this->itemSuccessCallback = $callback;
271
        return $this;
272
    }
273
274
    /**
275
     * Set the callback routine when the item is rejected.
276
     *
277
     * @param callable $callback A item callback routine.
278
     * @return self
279
     */
280
    public function setItemFailureCallback(callable $callback)
281
    {
282
        $this->itemSuccessCallback = $callback;
283
        return $this;
284
    }
285
286
    /**
287
     * Set the callback routine when the queue is processed.
288
     *
289
     * @param callable $callback A queue callback routine.
290
     * @return self
291
     */
292
    public function setProcessedCallback(callable $callback)
293
    {
294
        $this->processedCallback = $callback;
295
        return $this;
296
    }
297
298
    /**
299
     * Process the queue.
300
     *
301
     * It can be process in a single batch or in multiple chunks to reduce memory
302
     * If no callback is passed and a self::$processedCallback is set, the latter is used.
303
     *
304
     * @param  callable $callback An optional alternative callback routine executed
305
     *                            after all queue items are processed.
306
     * @return boolean  Success / Failure
307
     */
308
    public function processQueue(callable $callback = null)
309
    {
310
        if (!is_callable($callback)) {
311
            $callback = $this->processedCallback;
312
        }
313
314
        if (!is_null($this->chunkSize())) {
315
            $totalChunks = $this->totalChunks();
316
            for ($i = 0; $i <= $totalChunks; $i++) {
317
                $queuedItems = $this->loadQueueItems();
318
                $this->processItems($queuedItems);
319
            }
320
        } else {
321
            $queuedItems = $this->loadQueueItems();
322
            $this->processItems($queuedItems);
323
        }
324
325
        if (is_callable($callback)) {
326
            $callback($this->successItems, $this->failedItems, $this->skippedItems);
327
        }
328
329
        return true;
330
    }
331
332
    /**
333
     * @param mixed $queuedItems The items to process.
334
     * @return void
335
     */
336
    private function processItems($queuedItems)
337
    {
338
        foreach ($queuedItems as $q) {
339
            try {
340
                $res = $q->process($this->itemCallback, $this->itemSuccessCallback, $this->itemFailureCallback);
341
                if ($res === true) {
342
                    $this->successItems[] = $q;
343
                } elseif ($res === false) {
344
                    $this->failedItems[] = $q;
345
                } else {
346
                    $this->skippedItems[] = $q;
347
                }
348
            } catch (Exception $e) {
349
                $this->logger->error(
350
                    sprintf('Could not process a queue item: %s', $e->getMessage())
351
                );
352
                $this->failedItems[] = $q;
353
                continue;
354
            }
355
356
            // Throttle according to processing rate.
357
            if ($this->rate > 0) {
358
                usleep(1000000 / $this->rate);
359
            }
360
        }
361
    }
362
363
    /**
364
     * Retrieve the items of the current queue.
365
     *
366
     * @return \Charcoal\Model\Collection|array
367
     */
368
    public function loadQueueItems()
369
    {
370
        $loader = new CollectionLoader([
371
            'logger'  => $this->logger,
372
            'factory' => $this->queueItemFactory(),
373
        ]);
374
        $loader->setModel($this->queueItemProto());
375
        $loader->addFilter([
376
            'property' => 'processed',
377
            'value'    => 0,
378
        ]);
379
        $loader->addFilter([
380
            'property' => 'processing_date',
381
            'operator' => '<',
382
            'value'    => date('Y-m-d H:i:s'),
383
        ]);
384
385
        $queueId = $this->queueId();
386
        if ($queueId) {
387
            $loader->addFilter([
388
                'property' => 'queue_id',
389
                'value'    => $queueId,
390
            ]);
391
        }
392
393
        $loader->addOrder([
394
            'property' => 'queued_date',
395
            'mode'     => 'asc',
396
        ]);
397
398
        if (!is_null($this->chunkSize())) {
399
            $loader->setNumPerPage($this->chunkSize());
400
        }
401
402
        if ($this->limit() > 0) {
403
            $loader->setNumPerPage($this->limit());
404
            $loader->setPage(0);
405
        }
406
407
        $queued = $loader->load();
408
409
        return $queued;
410
    }
411
412
    /**
413
     * Retrieve the total of queued items.
414
     *
415
     * @return integer
416
     */
417
    public function totalQueuedItems()
418
    {
419
        $loader = new CollectionLoader([
420
            'logger'  => $this->logger,
421
            'factory' => $this->queueItemFactory(),
422
        ]);
423
        $loader->setModel($this->queueItemProto());
424
        $loader->addFilter([
425
            'property' => 'processed',
426
            'value'    => 0,
427
        ]);
428
        $loader->addFilter([
429
            'property' => 'processing_date',
430
            'operator' => '<',
431
            'value'    => date('Y-m-d H:i:s'),
432
        ]);
433
434
        $queueId = $this->queueId();
435
        if ($queueId) {
436
            $loader->addFilter([
437
                'property' => 'queue_id',
438
                'value'    => $queueId,
439
            ]);
440
        }
441
442
        $loader->addOrder([
443
            'property' => 'queued_date',
444
            'mode'     => 'asc',
445
        ]);
446
447
        $total = $loader->loadCount();
448
449
        return $total;
450
    }
451
452
    /**
453
     * Retrieve the number of chunks to process.
454
     *
455
     * @return integer
456
     */
457
    public function totalChunks()
458
    {
459
        return (int)ceil($this->totalQueuedItems() / $this->chunkSize());
460
    }
461
462
    /**
463
     * Retrieve the queue item's model.
464
     *
465
     * @return QueueItemInterface
466
     */
467
    abstract public function queueItemProto();
468
469
    /**
470
     * @return FactoryInterface
471
     */
472
    protected function queueItemFactory()
473
    {
474
        return $this->queueItemFactory;
475
    }
476
477
    /**
478
     * @param FactoryInterface $factory The factory used to create queue items.
479
     * @return self
480
     */
481
    private function setQueueItemFactory(FactoryInterface $factory)
482
    {
483
        $this->queueItemFactory = $factory;
484
        return $this;
485
    }
486
}
487