Passed
Push — custom-exceptions ( 3ddd8b...920e5f )
by Tomas Norre
06:21
created

QueueRepository   B

Complexity

Total Complexity 51

Size/Duplication

Total Lines 587
Duplicated Lines 0 %

Test Coverage

Coverage 70.17%

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 286
dl 0
loc 587
ccs 287
cts 409
cp 0.7017
rs 7.92
c 1
b 0
f 0
wmc 51

29 Methods

Rating   Name   Duplication   Size   Complexity  
A unsetQueueProcessId() 0 10 1
A __construct() 0 2 1
A getLastProcessedEntriesTimestamps() 0 16 2
A countNonExecutedItemsByProcess() 0 13 1
A getSetIdWithUnprocessedEntries() 0 19 2
A getTotalQueueEntriesByConfiguration() 0 22 3
A countAllPendingItems() 0 14 1
A getPerformanceData() 0 21 2
A countAllUnassignedPendingItems() 0 14 1
A findByQueueId() 0 12 2
A countPendingItemsGroupedByConfigurationKey() 0 15 1
A getUnprocessedItems() 0 11 1
A countAllAssignedPendingItems() 0 14 1
A cleanupQueue() 0 18 3
A cleanUpOldQueueEntries() 0 19 2
A findOldestEntryForProcess() 0 3 1
A isPageInQueueTimed() 0 3 1
A getLastProcessedEntries() 0 16 2
A countUnprocessedItems() 0 3 1
A findYoungestEntryForProcess() 0 3 1
A getAvailableSets() 0 17 2
B isPageInQueue() 0 45 7
A countExecutedItemsByProcess() 0 13 1
A fetchRecordsToBeCrawled() 0 16 1
A noUnprocessedQueueEntriesForPageWithConfigurationHashExist() 0 21 2
A unsetProcessScheduledAndProcessIdForQueueEntries() 0 12 1
A flushQueue() 0 20 4
A updateProcessIdAndSchedulerForQueueIds() 0 11 1
A getFirstOrLastObjectByProcess() 0 15 2

How to fix   Complexity   

Complex Class

Complex classes like QueueRepository often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use QueueRepository, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
declare(strict_types=1);
4
5
namespace AOE\Crawler\Domain\Repository;
6
7
/***************************************************************
8
 *  Copyright notice
9
 *
10
 *  (c) 2020 AOE GmbH <[email protected]>
11
 *
12
 *  All rights reserved
13
 *
14
 *  This script is part of the TYPO3 project. The TYPO3 project is
15
 *  free software; you can redistribute it and/or modify
16
 *  it under the terms of the GNU General Public License as published by
17
 *  the Free Software Foundation; either version 3 of the License, or
18
 *  (at your option) any later version.
19
 *
20
 *  The GNU General Public License can be found at
21
 *  http://www.gnu.org/copyleft/gpl.html.
22
 *
23
 *  This script is distributed in the hope that it will be useful,
24
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
25
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26
 *  GNU General Public License for more details.
27
 *
28
 *  This copyright notice MUST APPEAR in all copies of the script!
29
 ***************************************************************/
30
31
use AOE\Crawler\Configuration\ExtensionConfigurationProvider;
32
use AOE\Crawler\Domain\Model\Process;
33
use Psr\Log\LoggerAwareInterface;
34
use TYPO3\CMS\Core\Database\Connection;
35
use TYPO3\CMS\Core\Database\ConnectionPool;
36
use TYPO3\CMS\Core\Utility\GeneralUtility;
37
use TYPO3\CMS\Core\Utility\MathUtility;
38
39
/**
40
 * Class QueueRepository
41
 *
42
 * @package AOE\Crawler\Domain\Repository
43
 */
44
class QueueRepository extends AbstractRepository implements LoggerAwareInterface
45
{
46
    use \Psr\Log\LoggerAwareTrait;
47
48
    /**
49
     * @var string
50
     */
51
    protected $tableName = 'tx_crawler_queue';
52
53 89
    public function __construct()
54
    {
55
        // Left empty intentional
56 89
    }
57
58 3
    public function unsetQueueProcessId(string $processId): void
59
    {
60 3
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
61
        $queryBuilder
62 3
            ->update($this->tableName)
63 3
            ->where(
64 3
                $queryBuilder->expr()->eq('process_id', $queryBuilder->createNamedParameter($processId))
65
            )
66 3
            ->set('process_id', '')
67 3
            ->execute();
68 3
    }
69
70
    /**
71
     * This method is used to find the youngest entry for a given process.
72
     */
73 1
    public function findYoungestEntryForProcess(Process $process): array
74
    {
75 1
        return $this->getFirstOrLastObjectByProcess($process, 'exec_time');
76
    }
77
78
    /**
79
     * This method is used to find the oldest entry for a given process.
80
     */
81 1
    public function findOldestEntryForProcess(Process $process): array
82
    {
83 1
        return $this->getFirstOrLastObjectByProcess($process, 'exec_time', 'DESC');
84
    }
85
86
    /**
87
     * Counts all executed items of a process.
88
     *
89
     * @param Process $process
90
     */
91 1
    public function countExecutedItemsByProcess($process): int
92
    {
93 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
94
95
        return $queryBuilder
96 1
            ->count('*')
97 1
            ->from($this->tableName)
98 1
            ->where(
99 1
                $queryBuilder->expr()->eq('process_id_completed', $queryBuilder->createNamedParameter($process->getProcessId())),
100 1
                $queryBuilder->expr()->gt('exec_time', 0)
101
            )
102 1
            ->execute()
103 1
            ->fetchColumn(0);
104
    }
105
106
    /**
107
     * Counts items of a process which yet have not been processed/executed
108
     *
109
     * @param Process $process
110
     */
111 1
    public function countNonExecutedItemsByProcess($process): int
112
    {
113 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
114
115
        return $queryBuilder
116 1
            ->count('*')
117 1
            ->from($this->tableName)
118 1
            ->where(
119 1
                $queryBuilder->expr()->eq('process_id', $queryBuilder->createNamedParameter($process->getProcessId())),
120 1
                $queryBuilder->expr()->eq('exec_time', 0)
121
            )
122 1
            ->execute()
123 1
            ->fetchColumn(0);
124
    }
125
126
    /**
127
     * get items which have not been processed yet
128
     */
129 3
    public function getUnprocessedItems(): array
130
    {
131 3
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
132
133
        return $queryBuilder
134 3
            ->select('*')
135 3
            ->from($this->tableName)
136 3
            ->where(
137 3
                $queryBuilder->expr()->eq('exec_time', 0)
138
            )
139 3
            ->execute()->fetchAll();
140
    }
141
142
    /**
143
     * Count items which have not been processed yet
144
     */
145 3
    public function countUnprocessedItems(): int
146
    {
147 3
        return count($this->getUnprocessedItems());
148
    }
149
150
    /**
151
     * This method can be used to count all queue entrys which are
152
     * scheduled for now or a earlier date.
153
     */
154 2
    public function countAllPendingItems(): int
155
    {
156 2
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
157
158
        return $queryBuilder
159 2
            ->count('*')
160 2
            ->from($this->tableName)
161 2
            ->where(
162 2
                $queryBuilder->expr()->eq('process_scheduled', 0),
163 2
                $queryBuilder->expr()->eq('exec_time', 0),
164 2
                $queryBuilder->expr()->lte('scheduled', time())
165
            )
166 2
            ->execute()
167 2
            ->fetchColumn(0);
168
    }
169
170
    /**
171
     * This method can be used to count all queue entries which are
172
     * scheduled for now or a earlier date and are assigned to a process.
173
     */
174 2
    public function countAllAssignedPendingItems(): int
175
    {
176 2
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
177
178
        return $queryBuilder
179 2
            ->count('*')
180 2
            ->from($this->tableName)
181 2
            ->where(
182 2
                $queryBuilder->expr()->neq('process_id', '""'),
183 2
                $queryBuilder->expr()->eq('exec_time', 0),
184 2
                $queryBuilder->expr()->lte('scheduled', time())
185
            )
186 2
            ->execute()
187 2
            ->fetchColumn(0);
188
    }
189
190
    /**
191
     * This method can be used to count all queue entrys which are
192
     * scheduled for now or a earlier date and are not assigned to a process.
193
     */
194 2
    public function countAllUnassignedPendingItems(): int
195
    {
196 2
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
197
198
        return $queryBuilder
199 2
            ->count('*')
200 2
            ->from($this->tableName)
201 2
            ->where(
202 2
                $queryBuilder->expr()->eq('process_id', '""'),
203 2
                $queryBuilder->expr()->eq('exec_time', 0),
204 2
                $queryBuilder->expr()->lte('scheduled', time())
205
            )
206 2
            ->execute()
207 2
            ->fetchColumn(0);
208
    }
209
210
    /**
211
     * Count pending queue entries grouped by configuration key
212
     */
213 1
    public function countPendingItemsGroupedByConfigurationKey(): array
214
    {
215 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
216
        $statement = $queryBuilder
217 1
            ->from($this->tableName)
218 1
            ->selectLiteral('count(*) as unprocessed', 'sum(process_id != \'\') as assignedButUnprocessed')
219 1
            ->addSelect('configuration')
220 1
            ->where(
221 1
                $queryBuilder->expr()->eq('exec_time', 0),
222 1
                $queryBuilder->expr()->lt('scheduled', time())
223
            )
224 1
            ->groupBy('configuration')
225 1
            ->execute();
226
227 1
        return $statement->fetchAll();
228
    }
229
230
    /**
231
     * Get set id with unprocessed entries
232
     *
233
     * @return array array of set ids
234
     */
235 1
    public function getSetIdWithUnprocessedEntries(): array
236
    {
237 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
238
        $statement = $queryBuilder
239 1
            ->select('set_id')
240 1
            ->from($this->tableName)
241 1
            ->where(
242 1
                $queryBuilder->expr()->lt('scheduled', time()),
243 1
                $queryBuilder->expr()->eq('exec_time', 0)
244
            )
245 1
            ->addGroupBy('set_id')
246 1
            ->execute();
247
248 1
        $setIds = [];
249 1
        while ($row = $statement->fetch()) {
250 1
            $setIds[] = intval($row['set_id']);
251
        }
252
253 1
        return $setIds;
254
    }
255
256
    /**
257
     * Get total queue entries by configuration
258
     *
259
     * @return array totals by configuration (keys)
260
     */
261 1
    public function getTotalQueueEntriesByConfiguration(array $setIds): array
262
    {
263 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
264 1
        $totals = [];
265 1
        if (count($setIds) > 0) {
266
            $statement = $queryBuilder
267 1
                ->from($this->tableName)
268 1
                ->selectLiteral('count(*) as c')
269 1
                ->addSelect('configuration')
270 1
                ->where(
271 1
                    $queryBuilder->expr()->in('set_id', implode(',', $setIds)),
272 1
                    $queryBuilder->expr()->lt('scheduled', time())
273
                )
274 1
                ->groupBy('configuration')
275 1
                ->execute();
276
277 1
            while ($row = $statement->fetch()) {
278 1
                $totals[$row['configuration']] = $row['c'];
279
            }
280
        }
281
282 1
        return $totals;
283
    }
284
285
    /**
286
     * Get the timestamps of the last processed entries
287
     *
288
     * @param int $limit
289
     */
290 1
    public function getLastProcessedEntriesTimestamps($limit = 100): array
291
    {
292 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
293
        $statement = $queryBuilder
294 1
            ->select('exec_time')
295 1
            ->from($this->tableName)
296 1
            ->addOrderBy('exec_time', 'desc')
297 1
            ->setMaxResults($limit)
298 1
            ->execute();
299
300 1
        $rows = [];
301 1
        while ($row = $statement->fetch()) {
302 1
            $rows[] = $row['exec_time'];
303
        }
304
305 1
        return $rows;
306
    }
307
308
    /**
309
     * Get the last processed entries
310
     *
311
     * @param int $limit
312
     */
313 1
    public function getLastProcessedEntries($limit = 100): array
314
    {
315 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
316
        $statement = $queryBuilder
317 1
            ->from($this->tableName)
318 1
            ->select('*')
319 1
            ->orderBy('exec_time', 'desc')
320 1
            ->setMaxResults($limit)
321 1
            ->execute();
322
323 1
        $rows = [];
324 1
        while (($row = $statement->fetch()) !== false) {
325 1
            $rows[] = $row;
326
        }
327
328 1
        return $rows;
329
    }
330
331
    /**
332
     * Get performance statistics data
333
     *
334
     * @param int $start timestamp
335
     * @param int $end timestamp
336
     *
337
     * @return array performance data
338
     */
339 1
    public function getPerformanceData($start, $end): array
340
    {
341 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
342
        $statement = $queryBuilder
343 1
            ->from($this->tableName)
344 1
            ->selectLiteral('min(exec_time) as start', 'max(exec_time) as end', 'count(*) as urlcount')
345 1
            ->addSelect('process_id_completed')
346 1
            ->where(
347 1
                $queryBuilder->expr()->neq('exec_time', 0),
348 1
                $queryBuilder->expr()->gte('exec_time', $queryBuilder->createNamedParameter($start, \PDO::PARAM_INT)),
349 1
                $queryBuilder->expr()->lte('exec_time', $queryBuilder->createNamedParameter($end, \PDO::PARAM_INT))
350
            )
351 1
            ->groupBy('process_id_completed')
352 1
            ->execute();
353
354 1
        $rows = [];
355 1
        while ($row = $statement->fetch()) {
356 1
            $rows[$row['process_id_completed']] = $row;
357
        }
358
359 1
        return $rows;
360
    }
361
362
    /**
363
     * Determines if a page is queued
364
     */
365 5
    public function isPageInQueue(int $uid, bool $unprocessed_only = true, bool $timed_only = false, int $timestamp = 0): bool
366
    {
367
        // TODO: Looks like the check is obsolete as PHP doesn't allow other than ints for the $uid, PHP 7.2 Strict Mode
368 5
        if (! MathUtility::canBeInterpretedAsInteger($uid)) {
369
            throw new \InvalidArgumentException('Invalid parameter type', 1468931945);
370
        }
371
372 5
        $isPageInQueue = false;
373
374 5
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
375
        $statement = $queryBuilder
376 5
            ->from($this->tableName)
377 5
            ->count('*')
378 5
            ->where(
379 5
                $queryBuilder->expr()->eq('page_id', $queryBuilder->createNamedParameter($uid, \PDO::PARAM_INT))
380
            );
381
382 5
        if ($unprocessed_only !== false) {
383 2
            $statement->andWhere(
384 2
                $queryBuilder->expr()->eq('exec_time', 0)
385
            );
386
        }
387
388 5
        if ($timed_only !== false) {
389 1
            $statement->andWhere(
390 1
                $queryBuilder->expr()->neq('scheduled', 0)
391
            );
392
        }
393
394 5
        if ($timestamp) {
395 1
            $statement->andWhere(
396 1
                $queryBuilder->expr()->eq('scheduled', $queryBuilder->createNamedParameter($timestamp, \PDO::PARAM_INT))
397
            );
398
        }
399
400
        // TODO: Currently it's not working if page doesn't exists. See tests
401
        $count = $statement
402 5
            ->execute()
403 5
            ->fetchColumn(0);
404
405 5
        if ($count !== false && $count > 0) {
406 4
            $isPageInQueue = true;
407
        }
408
409 5
        return $isPageInQueue;
410
    }
411
412
    /**
413
     * Method to check if a page is in the queue which is timed for a
414
     * date when it should be crawled
415
     */
416 1
    public function isPageInQueueTimed(int $uid, bool $show_unprocessed = true): bool
417
    {
418 1
        return $this->isPageInQueue($uid, $show_unprocessed);
419
    }
420
421 1
    public function getAvailableSets(): array
422
    {
423 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
424
        $statement = $queryBuilder
425 1
            ->selectLiteral('count(*) as count_value')
426 1
            ->addSelect('set_id', 'scheduled')
427 1
            ->from($this->tableName)
428 1
            ->orderBy('scheduled', 'desc')
429 1
            ->groupBy('set_id', 'scheduled')
430 1
            ->execute();
431
432 1
        $rows = [];
433 1
        while ($row = $statement->fetch()) {
434 1
            $rows[] = $row;
435
        }
436
437 1
        return $rows;
438
    }
439
440 1
    public function findByQueueId(string $queueId): ?array
441
    {
442 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
443
        $queueRec = $queryBuilder
444 1
            ->select('*')
445 1
            ->from($this->tableName)
446 1
            ->where(
447 1
                $queryBuilder->expr()->eq('qid', $queryBuilder->createNamedParameter($queueId))
448
            )
449 1
            ->execute()
450 1
            ->fetch();
451 1
        return is_array($queueRec) ? $queueRec : null;
452
    }
453
454 1
    public function cleanupQueue(): void
455
    {
456 1
        $extensionSettings = GeneralUtility::makeInstance(ExtensionConfigurationProvider::class)->getExtensionConfiguration();
457 1
        $purgeDays = (int) $extensionSettings['purgeQueueDays'];
458
459 1
        if ($purgeDays > 0) {
460 1
            $purgeDate = time() - 24 * 60 * 60 * $purgeDays;
461
462 1
            $queryBuilderDelete = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
463
            $del = $queryBuilderDelete
464 1
                ->delete($this->tableName)
465 1
                ->where(
466 1
                    'exec_time != 0 AND exec_time < ' . $purgeDate
467 1
                )->execute();
468
469 1
            if ($del === false) {
470
                $this->logger->info(
471
                    'Records could not be deleted.'
472
                );
473
            }
474
        }
475 1
    }
476
477
    /**
478
     * Cleans up entries that stayed for too long in the queue. These are default:
479
     * - processed entries that are over 1.5 days in age
480
     * - scheduled entries that are over 7 days old
481
     */
482 1
    public function cleanUpOldQueueEntries(): void
483
    {
484 1
        $extensionSettings = GeneralUtility::makeInstance(ExtensionConfigurationProvider::class)->getExtensionConfiguration();
485 1
        $processedAgeInSeconds = $extensionSettings['cleanUpProcessedAge'] * 86400; // 24*60*60 Seconds in 24 hours
486 1
        $scheduledAgeInSeconds = $extensionSettings['cleanUpScheduledAge'] * 86400;
487
488 1
        $now = time();
489 1
        $condition = '(exec_time<>0 AND exec_time<' . ($now - $processedAgeInSeconds) . ') OR scheduled<=' . ($now - $scheduledAgeInSeconds);
490
491 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
492
        $del = $queryBuilder
493 1
            ->delete($this->tableName)
494 1
            ->where(
495 1
                $condition
496 1
            )->execute();
497
498 1
        if ($del === false) {
499
            $this->logger->info(
500
                'Records could not be deleted.'
501
            );
502
        }
503 1
    }
504
505 1
    public function fetchRecordsToBeCrawled(int $countInARun)
506
    {
507 1
        $queryBuilderSelect = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
508
        return $queryBuilderSelect
509 1
            ->select('qid', 'scheduled')
510 1
            ->from($this->tableName)
511 1
            ->where(
512 1
                $queryBuilderSelect->expr()->eq('exec_time', 0),
513 1
                $queryBuilderSelect->expr()->eq('process_scheduled', 0),
514 1
                $queryBuilderSelect->expr()->lte('scheduled', time())
515
            )
516 1
            ->orderBy('scheduled')
517 1
            ->addOrderBy('qid')
518 1
            ->setMaxResults($countInARun)
519 1
            ->execute()
520 1
            ->fetchAll();
521
    }
522
523 1
    public function updateProcessIdAndSchedulerForQueueIds(array $quidList, string $processId)
524
    {
525 1
        $queryBuilderUpdate = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
526
        return $queryBuilderUpdate
527 1
            ->update($this->tableName)
528 1
            ->where(
529 1
                $queryBuilderUpdate->expr()->in('qid', $quidList)
530
            )
531 1
            ->set('process_scheduled', time())
532 1
            ->set('process_id', $processId)
533 1
            ->execute();
534
    }
535
536 1
    public function unsetProcessScheduledAndProcessIdForQueueEntries(array $processIds): void
537
    {
538 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
539
        $queryBuilder
540 1
            ->update($this->tableName)
541 1
            ->where(
542 1
                $queryBuilder->expr()->eq('exec_time', 0),
543 1
                $queryBuilder->expr()->in('process_id', $queryBuilder->createNamedParameter($processIds, Connection::PARAM_STR_ARRAY))
544
            )
545 1
            ->set('process_scheduled', 0)
546 1
            ->set('process_id', '')
547 1
            ->execute();
548 1
    }
549
550
    /**
551
     * This method is used to count if there are ANY unprocessed queue entries
552
     * of a given page_id and the configuration which matches a given hash.
553
     * If there if none, we can skip an inner detail check
554
     *
555
     * @param int $uid
556
     * @param string $configurationHash
557
     * @return boolean
558
     */
559 2
    public function noUnprocessedQueueEntriesForPageWithConfigurationHashExist($uid, $configurationHash): bool
560
    {
561 2
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
562 2
        $noUnprocessedQueueEntriesFound = true;
563
564
        $result = $queryBuilder
565 2
            ->count('*')
566 2
            ->from($this->tableName)
567 2
            ->where(
568 2
                $queryBuilder->expr()->eq('page_id', (int) $uid),
569 2
                $queryBuilder->expr()->eq('configuration_hash', $queryBuilder->createNamedParameter($configurationHash)),
570 2
                $queryBuilder->expr()->eq('exec_time', 0)
571
            )
572 2
            ->execute()
573 2
            ->fetchColumn();
574
575 2
        if ($result) {
576 2
            $noUnprocessedQueueEntriesFound = false;
577
        }
578
579 2
        return $noUnprocessedQueueEntriesFound;
580
    }
581
582
    /**
583
     * Removes queue entries
584
     *
585
     * @param string $filter all, pending, finished
586
     */
587 6
    public function flushQueue(string $filter): void
588
    {
589 6
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
590
591 6
        switch (strtolower($filter)) {
592 6
            case 'all':
593
                // No where claus needed delete everything
594 2
                break;
595 4
            case 'pending':
596 1
                $queryBuilder->andWhere($queryBuilder->expr()->eq('exec_time', 0));
597 1
                break;
598 3
            case 'finished':
599
            default:
600 3
                $queryBuilder->andWhere($queryBuilder->expr()->gt('exec_time', 0));
601 3
                break;
602
        }
603
604
        $queryBuilder
605 6
            ->delete($this->tableName)
606 6
            ->execute();
607 6
    }
608
609
    /**
610
     * This internal helper method is used to create an instance of an entry object
611
     *
612
     * @param Process $process
613
     * @param string $orderByField first matching item will be returned as object
614
     * @param string $orderBySorting sorting direction
615
     */
616 5
    protected function getFirstOrLastObjectByProcess($process, $orderByField, $orderBySorting = 'ASC'): array
617
    {
618 5
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
619
        $first = $queryBuilder
620 5
            ->select('*')
621 5
            ->from($this->tableName)
622 5
            ->where(
623 5
                $queryBuilder->expr()->eq('process_id_completed', $queryBuilder->createNamedParameter($process->getProcessId())),
624 5
                $queryBuilder->expr()->gt('exec_time', 0)
625
            )
626 5
            ->setMaxResults(1)
627 5
            ->addOrderBy($orderByField, $orderBySorting)
628 5
            ->execute()->fetch(0);
629
630 5
        return $first ?: [];
631
    }
632
}
633