Passed
Push — master ( 7eb953...578045 )
by Mathieu
08:49
created

AbstractQueueManager::setRate()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 5
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 3
nc 1
nop 1
1
<?php
2
3
namespace Charcoal\Queue;
4
5
use Exception;
6
7
// PSR-3 (logger) dependencies
8
use Psr\Log\LoggerAwareInterface;
9
use Psr\Log\LoggerAwareTrait;
10
11
// From `charcoal-core`
12
use Charcoal\Loader\CollectionLoader;
13
14
// From `charcoal-factory`
15
use Charcoal\Factory\FactoryInterface;
16
17
/**
18
 * Abstract Queue Manager
19
 *
20
 * The queue manager is used to load queued items and batch-process them.
21
 *
22
 * ## Loading queued items
23
 *
24
 * If a "queue_id" is specified, only the item for this specific queue will be loaded.
25
 * Otherwise, all unprocessed queue items will be processed.
26
 *
27
 * ## Type of queue items
28
 *
29
 * The type of queue items can be set in extended concrete class with the
30
 * `queue_item_proto()` method. This method should return a QueueItemInterface instance.
31
 *
32
 * ## Callbacks
33
 *
34
 * There are 4 available callback methods that can be set:
35
 *
36
 * - `item_callback`
37
 *   - Called after an item has been processed.
38
 *   - Arguments: `QueueModelInterface $item`
39
 * - `item_success_callback`
40
 * - `item_failure_callback`
41
 * - `processed_callback`
42
 *   - Called when the entire queue has been processed
43
 *   - Arguments: `array $success`, `array $failures`
44
 */
45
abstract class AbstractQueueManager implements
46
    QueueManagerInterface,
47
    LoggerAwareInterface
48
{
49
    use LoggerAwareTrait;
50
51
    /**
52
     * The queue processing rate (throttle), in items per second.
53
     *
54
     * @var integer
55
     */
56
    private $rate = 0;
57
58
    /**
59
     * The batch limit.
60
     *
61
     * @var integer
62
     */
63
    private $limit = 0;
64
65
    /**
66
     * The queue ID.
67
     *
68
     * If set, then it will load only the items from this queue.
69
     * If NULL, load *all* queued items.
70
     *
71
     * @var mixed
72
     */
73
    private $queueId;
74
75
    /**
76
     * The callback routine when an item is processed (whether resolved or rejected).
77
     *
78
     * @var callable $itemCallback
79
     */
80
    private $itemCallback;
81
82
    /**
83
     * The callback routine when the item is resolved.
84
     *
85
     * @var callable $itemSuccessCallback
86
     */
87
    private $itemSuccessCallback;
88
89
    /**
90
     * The callback routine when the item is rejected.
91
     *
92
     * @var callable $itemFailureCallback
93
     */
94
    private $itemFailureCallback;
95
96
    /**
97
     * The callback routine when the queue is processed.
98
     *
99
     * @var callable $processedCallback
100
     */
101
    private $processedCallback;
102
103
    /**
104
     * @var FactoryInterface $queueItemFactory
105
     */
106
    private $queueItemFactory;
107
108
    /**
109
     * Construct new queue manager.
110
     *
111
     * @param array $data Dependencies and settings.
112
     */
113
    public function __construct(array $data = [])
114
    {
115
        $this->setLogger($data['logger']);
116
        $this->setQueueItemFactory($data['queue_item_factory']);
117
118
        if (isset($data['rate'])) {
119
            $this->rate = intval($data['rate']);
120
        }
121
122
        if (isset($data['limit'])) {
123
            $this->limit = intval($data['limit']);
124
        }
125
    }
126
127
    /**
128
     * Set the manager's data.
129
     *
130
     * @param array $data The queue data to set.
131
     * @return AbstractQueueManager Chainable
132
     */
133
    public function setData(array $data)
134
    {
135
        if (isset($data['queue_id']) && $data['queue_id']) {
136
            $this->setQueueId($data['queue_id']);
137
        }
138
139
        return $this;
140
    }
141
142
    /**
143
     * Set the queue's ID.
144
     *
145
     * @param mixed $id The unique queue identifier.
146
     * @return self
147
     */
148
    public function setQueueId($id)
149
    {
150
        $this->queueId = $id;
151
        return $this;
152
    }
153
154
    /**
155
     * Get the queue's ID.
156
     *
157
     * @return mixed
158
     */
159
    public function queueId()
160
    {
161
        return $this->queueId;
162
    }
163
164
    /**
165
     * @param integer $rate The throttling / processing rate, in items per second.
166
     * @return self
167
     */
168
    public function setRate($rate)
169
    {
170
        $this->rate = intval($rate);
171
        return $this;
172
    }
173
174
    /**
175
     * @return integer
176
     */
177
    public function rate()
178
    {
179
        return $this->rate;
180
    }
181
182
    /**
183
     * @param integer $limit The maximum number of items to load.
184
     * @return self
185
     */
186
    public function setLimit($limit)
187
    {
188
        $this->limit = intval($limit);
189
        return $this;
190
    }
191
192
    /**
193
     * @return integer
194
     */
195
    public function limit()
196
    {
197
        return $this->limit;
198
    }
199
200
    /**
201
     * Set the callback routine when an item is processed.
202
     *
203
     * @param callable $callback A item callback routine.
204
     * @return self
205
     */
206
    public function setItemCallback(callable $callback)
207
    {
208
        $this->itemCallback = $callback;
209
        return $this;
210
    }
211
212
    /**
213
     * Set the callback routine when the item is resolved.
214
     *
215
     * @param callable $callback A item callback routine.
216
     * @return self
217
     */
218
    public function setItemSuccessCallback(callable $callback)
219
    {
220
        $this->itemSuccessCallback = $callback;
221
        return $this;
222
    }
223
224
    /**
225
     * Set the callback routine when the item is rejected.
226
     *
227
     * @param callable $callback A item callback routine.
228
     * @return self
229
     */
230
    public function setItemFailureCallback(callable $callback)
231
    {
232
        $this->itemSuccessCallback = $callback;
233
        return $this;
234
    }
235
236
    /**
237
     * Set the callback routine when the queue is processed.
238
     *
239
     * @param callable $callback A queue callback routine.
240
     * @return self
241
     */
242
    public function setProcessedCallback(callable $callback)
243
    {
244
        $this->processedCallback = $callback;
245
        return $this;
246
    }
247
248
    /**
249
     * Process the items of the queue.
250
     *
251
     * If no callback is passed and a self::$processedCallback is set, the latter is used.
252
     *
253
     * @param  callable $callback An optional alternative callback routine executed
254
     *                            after all queue items are processed.
255
     * @return boolean  Success / Failure
256
     */
257
    public function processQueue(callable $callback = null)
258
    {
259
        $queued = $this->loadQueueItems();
260
261
        if (!is_callable($callback)) {
262
            $callback = $this->processedCallback;
263
        }
264
265
        $success  = [];
266
        $failures = [];
267
        $skipped  = [];
268
        foreach ($queued as $q) {
269
            try {
270
                $res = $q->process($this->itemCallback, $this->itemSuccessCallback, $this->itemFailureCallback);
271
                if ($res === true) {
272
                    $success[] = $q;
273
                } elseif ($res === false) {
274
                    $failures[] = $q;
275
                } else {
276
                    $skipped[] = $q;
277
                }
278
            } catch (Exception $e) {
279
                $this->logger->error(
280
                    sprintf('Could not process a queue item: %s', $e->getMessage())
281
                );
282
                $failures[] = $q;
283
                continue;
284
            }
285
286
            // Throttle according to processing rate.
287
            if ($this->rate > 0) {
288
                usleep(1000000/$this->rate);
289
            }
290
        }
291
292
        if (is_callable($callback)) {
293
            $callback($success, $failures, $skipped);
294
        }
295
296
        return true;
297
    }
298
299
    /**
300
     * Retrieve the items of the current queue.
301
     *
302
     * @return \Charcoal\Model\Collection|array
303
     */
304
    public function loadQueueItems()
305
    {
306
        $loader = new CollectionLoader([
307
            'logger' => $this->logger,
308
            'factory' => $this->queueItemFactory()
309
        ]);
310
        $loader->setModel($this->queueItemProto());
0 ignored issues
show
Documentation introduced by
$this->queueItemProto() is of type object<Charcoal\Queue\QueueItemInterface>, but the function expects a string|object<Charcoal\Model\ModelInterface>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
311
        $loader->addFilter([
312
            'property' => 'processed',
313
            'value'      => 0
314
        ]);
315
        $loader->addFilter([
316
             'property' => 'processing_date',
317
             'value'      => date('Y-m-d H:i:s'),
318
             'operator' => '<'
319
        ]);
320
321
        $queueId = $this->queueId();
322
        if ($queueId) {
323
            $loader->addFilter([
324
                'property' => 'queue_id',
325
                'value'    => $queueId
326
            ]);
327
        }
328
329
        $loader->addOrder([
330
            'property' => 'queued_date',
331
            'mode'     => 'asc'
332
        ]);
333
        $queued = $loader->load();
334
335
        if ($this->limit > 0) {
336
            $loader->setNumPerPage($this->limit);
337
            $loader->setPage(0);
338
        }
339
340
        return $queued;
341
    }
342
343
    /**
344
     * Retrieve the queue item's model.
345
     *
346
     * @return QueueItemInterface
347
     */
348
    abstract public function queueItemProto();
349
350
    /**
351
     * @return FactoryInterface
352
     */
353
    protected function queueItemFactory()
354
    {
355
        return $this->queueItemFactory;
356
    }
357
358
    /**
359
     * @param FactoryInterface $factory The factory used to create queue items.
360
     * @return self
361
     */
362
    private function setQueueItemFactory(FactoryInterface $factory)
363
    {
364
        $this->queueItemFactory = $factory;
365
        return $this;
366
    }
367
}
368