1 | <?php |
||
45 | abstract class AbstractQueueManager implements |
||
46 | QueueManagerInterface, |
||
47 | LoggerAwareInterface |
||
48 | { |
||
49 | use LoggerAwareTrait; |
||
50 | |||
51 | /** |
||
52 | * The queue processing rate (throttle), in items per second. |
||
53 | * |
||
54 | * @var integer |
||
55 | */ |
||
56 | private $rate = 0; |
||
57 | |||
58 | /** |
||
59 | * The batch limit. |
||
60 | * |
||
61 | * @var integer |
||
62 | */ |
||
63 | private $limit = 0; |
||
64 | |||
65 | /** |
||
66 | * The chunk size to batch the queue with. |
||
67 | * |
||
68 | * @var integer|null |
||
69 | */ |
||
70 | private $chunkSize = null; |
||
71 | |||
72 | /** |
||
73 | * The queue ID. |
||
74 | * |
||
75 | * If set, then it will load only the items from this queue. |
||
76 | * If NULL, load *all* queued items. |
||
77 | * |
||
78 | * @var mixed |
||
79 | */ |
||
80 | private $queueId; |
||
81 | |||
82 | /** |
||
83 | * Items that were successfully processed |
||
84 | * |
||
85 | * @var array |
||
86 | */ |
||
87 | private $successItems = []; |
||
88 | |||
89 | /** |
||
90 | * Item that failed to process |
||
91 | * |
||
92 | * @var array |
||
93 | */ |
||
94 | private $failedItems = []; |
||
95 | |||
96 | /** |
||
97 | * Items that were skipped during the processing |
||
98 | * |
||
99 | * @var array |
||
100 | */ |
||
101 | private $skippedItems = []; |
||
102 | |||
103 | /** |
||
104 | * The callback routine when an item is processed (whether resolved or rejected). |
||
105 | * |
||
106 | * @var callable $itemCallback |
||
107 | */ |
||
108 | private $itemCallback; |
||
109 | |||
110 | /** |
||
111 | * The callback routine when the item is resolved. |
||
112 | * |
||
113 | * @var callable $itemSuccessCallback |
||
114 | */ |
||
115 | private $itemSuccessCallback; |
||
116 | |||
117 | /** |
||
118 | * The callback routine when the item is rejected. |
||
119 | * |
||
120 | * @var callable $itemFailureCallback |
||
121 | */ |
||
122 | private $itemFailureCallback; |
||
123 | |||
124 | /** |
||
125 | * The callback routine when the queue is processed. |
||
126 | * |
||
127 | * @var callable $processedCallback |
||
128 | */ |
||
129 | private $processedCallback; |
||
130 | |||
131 | /** |
||
132 | * @var FactoryInterface $queueItemFactory |
||
133 | */ |
||
134 | private $queueItemFactory; |
||
135 | |||
136 | /** |
||
137 | * Construct new queue manager. |
||
138 | * |
||
139 | * @param array $data Dependencies and settings. |
||
140 | */ |
||
141 | public function __construct(array $data = []) |
||
158 | |||
159 | /** |
||
160 | * Set the manager's data. |
||
161 | * |
||
162 | * @param array $data The queue data to set. |
||
163 | * @return AbstractQueueManager Chainable |
||
164 | */ |
||
165 | public function setData(array $data) |
||
173 | |||
174 | /** |
||
175 | * Set the queue's ID. |
||
176 | * |
||
177 | * @param mixed $id The unique queue identifier. |
||
178 | * @return self |
||
179 | */ |
||
180 | public function setQueueId($id) |
||
185 | |||
186 | /** |
||
187 | * Get the queue's ID. |
||
188 | * |
||
189 | * @return mixed |
||
190 | */ |
||
191 | public function queueId() |
||
195 | |||
196 | /** |
||
197 | * @param integer $rate The throttling / processing rate, in items per second. |
||
198 | * @return self |
||
199 | */ |
||
200 | public function setRate($rate) |
||
205 | |||
206 | /** |
||
207 | * @return integer |
||
208 | */ |
||
209 | public function rate() |
||
213 | |||
214 | /** |
||
215 | * @param integer $limit The maximum number of items to load. |
||
216 | * @return self |
||
217 | */ |
||
218 | public function setLimit($limit) |
||
223 | |||
224 | /** |
||
225 | * @return integer |
||
226 | */ |
||
227 | public function limit() |
||
231 | |||
232 | /** |
||
233 | * @param integer $chunkSize The size of the chunk of items to process at the same time in the queue. |
||
234 | * @return self |
||
235 | */ |
||
236 | public function setChunkSize($chunkSize) |
||
241 | |||
242 | /** |
||
243 | * @return integer |
||
244 | */ |
||
245 | public function chunkSize() |
||
249 | |||
250 | /** |
||
251 | * Set the callback routine when an item is processed. |
||
252 | * |
||
253 | * @param callable $callback A item callback routine. |
||
254 | * @return self |
||
255 | */ |
||
256 | public function setItemCallback(callable $callback) |
||
261 | |||
262 | /** |
||
263 | * Set the callback routine when the item is resolved. |
||
264 | * |
||
265 | * @param callable $callback A item callback routine. |
||
266 | * @return self |
||
267 | */ |
||
268 | public function setItemSuccessCallback(callable $callback) |
||
273 | |||
274 | /** |
||
275 | * Set the callback routine when the item is rejected. |
||
276 | * |
||
277 | * @param callable $callback A item callback routine. |
||
278 | * @return self |
||
279 | */ |
||
280 | public function setItemFailureCallback(callable $callback) |
||
285 | |||
286 | /** |
||
287 | * Set the callback routine when the queue is processed. |
||
288 | * |
||
289 | * @param callable $callback A queue callback routine. |
||
290 | * @return self |
||
291 | */ |
||
292 | public function setProcessedCallback(callable $callback) |
||
297 | |||
298 | /** |
||
299 | * Process the queue. |
||
300 | * |
||
301 | * It can be process in a single batch or in multiple chunks to reduce memory |
||
302 | * If no callback is passed and a self::$processedCallback is set, the latter is used. |
||
303 | * |
||
304 | * @param callable $callback An optional alternative callback routine executed |
||
305 | * after all queue items are processed. |
||
306 | * @return boolean Success / Failure |
||
307 | */ |
||
308 | public function processQueue(callable $callback = null) |
||
331 | |||
332 | /** |
||
333 | * @param mixed $queuedItems The items to process. |
||
334 | * @return void |
||
335 | */ |
||
336 | private function processItems($queuedItems) |
||
362 | |||
363 | /** |
||
364 | * Retrieve the items of the current queue. |
||
365 | * |
||
366 | * @return \Charcoal\Model\Collection|array |
||
367 | */ |
||
368 | public function loadQueueItems() |
||
411 | |||
412 | /** |
||
413 | * Retrieve the total of queued items. |
||
414 | * |
||
415 | * @return integer |
||
416 | */ |
||
417 | public function totalQueuedItems() |
||
451 | |||
452 | /** |
||
453 | * Retrieve the number of chunks to process. |
||
454 | * |
||
455 | * @return integer |
||
456 | */ |
||
457 | public function totalChunks() |
||
461 | |||
462 | /** |
||
463 | * Retrieve the queue item's model. |
||
464 | * |
||
465 | * @return QueueItemInterface |
||
466 | */ |
||
467 | abstract public function queueItemProto(); |
||
468 | |||
469 | /** |
||
470 | * @return FactoryInterface |
||
471 | */ |
||
472 | protected function queueItemFactory() |
||
476 | |||
477 | /** |
||
478 | * @param FactoryInterface $factory The factory used to create queue items. |
||
479 | * @return self |
||
480 | */ |
||
481 | private function setQueueItemFactory(FactoryInterface $factory) |
||
486 | } |
||
487 |