Passed
Pull Request — master (#677)
by Tomas Norre
09:30 queued 06:06
created

QueueRepository   F

Complexity

Total Complexity 60

Size/Duplication

Total Lines 710
Duplicated Lines 0 %

Test Coverage

Coverage 94.44%

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 361
dl 0
loc 710
ccs 340
cts 360
cp 0.9444
rs 3.6
c 1
b 0
f 0
wmc 60

32 Methods

Rating   Name   Duplication   Size   Complexity  
A getLastProcessedEntriesTimestamps() 0 16 2
A getSetIdWithUnprocessedEntries() 0 19 2
A getTotalQueueEntriesByConfiguration() 0 22 3
A countAllPendingItems() 0 14 1
A getPerformanceData() 0 21 2
A countAllUnassignedPendingItems() 0 14 1
A fetchRecordsToBeCrawled() 0 23 1
A findByQueueId() 0 12 2
A countPendingItemsGroupedByConfigurationKey() 0 15 1
A countAllAssignedPendingItems() 0 14 1
A cleanupQueue() 0 18 3
A cleanUpOldQueueEntries() 0 20 2
A noUnprocessedQueueEntriesForPageWithConfigurationHashExist() 0 21 2
A isPageInQueueTimed() 0 3 1
A unsetProcessScheduledAndProcessIdForQueueEntries() 0 12 1
A getLastProcessedEntries() 0 16 2
A getAvailableSets() 0 17 2
B isPageInQueue() 0 40 6
A updateProcessIdAndSchedulerForQueueIds() 0 11 1
A countNonExecutedItemsByProcess() 0 13 1
A getQueueEntriesForPageId() 0 33 4
A unsetQueueProcessId() 0 10 1
A countAllByProcessId() 0 16 1
A getDuplicateQueueItemsIfExists() 0 47 5
A getUnprocessedItems() 0 11 1
A findOldestEntryForProcess() 0 3 1
A flushQueue() 0 20 4
A countUnprocessedItems() 0 7 1
A findYoungestEntryForProcess() 0 3 1
A getFirstOrLastObjectByProcess() 0 15 2
A __construct() 0 6 1
A countExecutedItemsByProcess() 0 13 1

How to fix   Complexity   

Complex Class

Complex classes like QueueRepository often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use QueueRepository, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
declare(strict_types=1);
4
5
namespace AOE\Crawler\Domain\Repository;
6
7
/***************************************************************
8
 *  Copyright notice
9
 *
10
 *  (c) 2020 AOE GmbH <[email protected]>
11
 *
12
 *  All rights reserved
13
 *
14
 *  This script is part of the TYPO3 project. The TYPO3 project is
15
 *  free software; you can redistribute it and/or modify
16
 *  it under the terms of the GNU General Public License as published by
17
 *  the Free Software Foundation; either version 3 of the License, or
18
 *  (at your option) any later version.
19
 *
20
 *  The GNU General Public License can be found at
21
 *  http://www.gnu.org/copyleft/gpl.html.
22
 *
23
 *  This script is distributed in the hope that it will be useful,
24
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
25
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26
 *  GNU General Public License for more details.
27
 *
28
 *  This copyright notice MUST APPEAR in all copies of the script!
29
 ***************************************************************/
30
31
use AOE\Crawler\Configuration\ExtensionConfigurationProvider;
32
use AOE\Crawler\Domain\Model\Process;
33
use AOE\Crawler\Value\QueueFilter;
34
use PDO;
35
use Psr\Log\LoggerAwareInterface;
36
use Psr\Log\LoggerAwareTrait;
37
use TYPO3\CMS\Core\Database\Connection;
38
use TYPO3\CMS\Core\Database\ConnectionPool;
39
use TYPO3\CMS\Core\Utility\GeneralUtility;
40
use TYPO3\CMS\Extbase\Object\ObjectManager;
41
use TYPO3\CMS\Extbase\Persistence\Repository;
42
43
class QueueRepository extends Repository implements LoggerAwareInterface
44
{
45
    use LoggerAwareTrait;
46
47
    /**
48
     * @var string
49
     */
50
    protected $tableName = 'tx_crawler_queue';
51
52
    /**
53
     * @var array
54
     */
55
    protected $extensionSettings;
56
57 109
    public function __construct()
58
    {
59 109
        $objectManager = GeneralUtility::makeInstance(ObjectManager::class);
60 109
        $this->extensionSettings = GeneralUtility::makeInstance(ExtensionConfigurationProvider::class)->getExtensionConfiguration();
61
62 109
        parent::__construct($objectManager);
63 109
    }
64
65
    // TODO: Should be a property on the QueueObject
66 3
    public function unsetQueueProcessId(string $processId): void
67
    {
68 3
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
69
        $queryBuilder
70 3
            ->update($this->tableName)
71 3
            ->where(
72 3
                $queryBuilder->expr()->eq('process_id', $queryBuilder->createNamedParameter($processId))
73
            )
74 3
            ->set('process_id', '')
75 3
            ->execute();
76 3
    }
77
78
    /**
79
     * This method is used to find the youngest entry for a given process.
80
     */
81 1
    public function findYoungestEntryForProcess(Process $process): array
82
    {
83 1
        return $this->getFirstOrLastObjectByProcess($process, 'exec_time');
84
    }
85
86
    /**
87
     * This method is used to find the oldest entry for a given process.
88
     */
89 1
    public function findOldestEntryForProcess(Process $process): array
90
    {
91 1
        return $this->getFirstOrLastObjectByProcess($process, 'exec_time', 'DESC');
92
    }
93
94
    /**
95
     * Counts all executed items of a process.
96
     *
97
     * @param Process $process
98
     */
99 1
    public function countExecutedItemsByProcess($process): int
100
    {
101 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
102
103
        return $queryBuilder
104 1
            ->count('*')
105 1
            ->from($this->tableName)
106 1
            ->where(
107 1
                $queryBuilder->expr()->eq('process_id_completed', $queryBuilder->createNamedParameter($process->getProcessId())),
108 1
                $queryBuilder->expr()->gt('exec_time', 0)
109
            )
110 1
            ->execute()
111 1
            ->fetchColumn(0);
112
    }
113
114
    /**
115
     * Counts items of a process which yet have not been processed/executed
116
     *
117
     * @param Process $process
118
     */
119 1
    public function countNonExecutedItemsByProcess($process): int
120
    {
121 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
122
123
        return $queryBuilder
124 1
            ->count('*')
125 1
            ->from($this->tableName)
126 1
            ->where(
127 1
                $queryBuilder->expr()->eq('process_id', $queryBuilder->createNamedParameter($process->getProcessId())),
128 1
                $queryBuilder->expr()->eq('exec_time', 0)
129
            )
130 1
            ->execute()
131 1
            ->fetchColumn(0);
132
    }
133
134
    /**
135
     * get items which have not been processed yet
136
     */
137 5
    public function getUnprocessedItems(): array
138
    {
139 5
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
140
141
        return $queryBuilder
142 5
            ->select('*')
143 5
            ->from($this->tableName)
144 5
            ->where(
145 5
                $queryBuilder->expr()->eq('exec_time', 0)
146
            )
147 5
            ->execute()->fetchAll();
148
    }
149
150
    /**
151
     * Count items which have not been processed yet
152
     * @deprecated Using QueueRepository->countUnprocessedItems() is deprecated since 9.1.5 and will be removed in v11.x, please use count(QueueRepository->getUnprocessedItems()) instead
153
     */
154
    public function countUnprocessedItems(): int
155
    {
156
        trigger_error(
157
            'Using QueueRepository->countUnprocessedItems() is deprecated since 9.1.5 and will be removed in v11.x, please use count(QueueRepository->getUnprocessedItems()) instead',
158
            E_USER_DEPRECATED
159
        );
160
        return count($this->getUnprocessedItems());
161
    }
162
163
    /**
164
     * This method can be used to count all queue entrys which are
165
     * scheduled for now or a earlier date.
166
     */
167 2
    public function countAllPendingItems(): int
168
    {
169 2
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
170
171
        return $queryBuilder
172 2
            ->count('*')
173 2
            ->from($this->tableName)
174 2
            ->where(
175 2
                $queryBuilder->expr()->eq('process_scheduled', 0),
176 2
                $queryBuilder->expr()->eq('exec_time', 0),
177 2
                $queryBuilder->expr()->lte('scheduled', time())
178
            )
179 2
            ->execute()
180 2
            ->fetchColumn(0);
181
    }
182
183
    /**
184
     * This method can be used to count all queue entries which are
185
     * scheduled for now or a earlier date and are assigned to a process.
186
     */
187 2
    public function countAllAssignedPendingItems(): int
188
    {
189 2
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
190
191
        return $queryBuilder
192 2
            ->count('*')
193 2
            ->from($this->tableName)
194 2
            ->where(
195 2
                $queryBuilder->expr()->neq('process_id', '""'),
196 2
                $queryBuilder->expr()->eq('exec_time', 0),
197 2
                $queryBuilder->expr()->lte('scheduled', time())
198
            )
199 2
            ->execute()
200 2
            ->fetchColumn(0);
201
    }
202
203
    /**
204
     * This method can be used to count all queue entrys which are
205
     * scheduled for now or a earlier date and are not assigned to a process.
206
     */
207 2
    public function countAllUnassignedPendingItems(): int
208
    {
209 2
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
210
211
        return $queryBuilder
212 2
            ->count('*')
213 2
            ->from($this->tableName)
214 2
            ->where(
215 2
                $queryBuilder->expr()->eq('process_id', '""'),
216 2
                $queryBuilder->expr()->eq('exec_time', 0),
217 2
                $queryBuilder->expr()->lte('scheduled', time())
218
            )
219 2
            ->execute()
220 2
            ->fetchColumn(0);
221
    }
222
223
    /**
224
     * Count pending queue entries grouped by configuration key
225
     */
226 1
    public function countPendingItemsGroupedByConfigurationKey(): array
227
    {
228 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
229
        $statement = $queryBuilder
230 1
            ->from($this->tableName)
231 1
            ->selectLiteral('count(*) as unprocessed', 'sum(process_id != \'\') as assignedButUnprocessed')
232 1
            ->addSelect('configuration')
233 1
            ->where(
234 1
                $queryBuilder->expr()->eq('exec_time', 0),
235 1
                $queryBuilder->expr()->lt('scheduled', time())
236
            )
237 1
            ->groupBy('configuration')
238 1
            ->execute();
239
240 1
        return $statement->fetchAll();
241
    }
242
243
    /**
244
     * Get set id with unprocessed entries
245
     *
246
     * @return array array of set ids
247
     */
248 1
    public function getSetIdWithUnprocessedEntries(): array
249
    {
250 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
251
        $statement = $queryBuilder
252 1
            ->select('set_id')
253 1
            ->from($this->tableName)
254 1
            ->where(
255 1
                $queryBuilder->expr()->lt('scheduled', time()),
256 1
                $queryBuilder->expr()->eq('exec_time', 0)
257
            )
258 1
            ->addGroupBy('set_id')
259 1
            ->execute();
260
261 1
        $setIds = [];
262 1
        while ($row = $statement->fetch()) {
263 1
            $setIds[] = intval($row['set_id']);
264
        }
265
266 1
        return $setIds;
267
    }
268
269
    /**
270
     * Get total queue entries by configuration
271
     *
272
     * @return array totals by configuration (keys)
273
     */
274 1
    public function getTotalQueueEntriesByConfiguration(array $setIds): array
275
    {
276 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
277 1
        $totals = [];
278 1
        if (count($setIds) > 0) {
279
            $statement = $queryBuilder
280 1
                ->from($this->tableName)
281 1
                ->selectLiteral('count(*) as c')
282 1
                ->addSelect('configuration')
283 1
                ->where(
284 1
                    $queryBuilder->expr()->in('set_id', implode(',', $setIds)),
285 1
                    $queryBuilder->expr()->lt('scheduled', time())
286
                )
287 1
                ->groupBy('configuration')
288 1
                ->execute();
289
290 1
            while ($row = $statement->fetch()) {
291 1
                $totals[$row['configuration']] = $row['c'];
292
            }
293
        }
294
295 1
        return $totals;
296
    }
297
298
    /**
299
     * Get the timestamps of the last processed entries
300
     *
301
     * @param int $limit
302
     */
303 1
    public function getLastProcessedEntriesTimestamps($limit = 100): array
304
    {
305 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
306
        $statement = $queryBuilder
307 1
            ->select('exec_time')
308 1
            ->from($this->tableName)
309 1
            ->addOrderBy('exec_time', 'desc')
310 1
            ->setMaxResults($limit)
311 1
            ->execute();
312
313 1
        $rows = [];
314 1
        while ($row = $statement->fetch()) {
315 1
            $rows[] = $row['exec_time'];
316
        }
317
318 1
        return $rows;
319
    }
320
321
    /**
322
     * Get the last processed entries
323
     *
324
     * @param int $limit
325
     */
326 1
    public function getLastProcessedEntries($limit = 100): array
327
    {
328 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
329
        $statement = $queryBuilder
330 1
            ->from($this->tableName)
331 1
            ->select('*')
332 1
            ->orderBy('exec_time', 'desc')
333 1
            ->setMaxResults($limit)
334 1
            ->execute();
335
336 1
        $rows = [];
337 1
        while (($row = $statement->fetch()) !== false) {
338 1
            $rows[] = $row;
339
        }
340
341 1
        return $rows;
342
    }
343
344
    /**
345
     * Get performance statistics data
346
     *
347
     * @param int $start timestamp
348
     * @param int $end timestamp
349
     *
350
     * @return array performance data
351
     */
352 1
    public function getPerformanceData($start, $end): array
353
    {
354 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
355
        $statement = $queryBuilder
356 1
            ->from($this->tableName)
357 1
            ->selectLiteral('min(exec_time) as start', 'max(exec_time) as end', 'count(*) as urlcount')
358 1
            ->addSelect('process_id_completed')
359 1
            ->where(
360 1
                $queryBuilder->expr()->neq('exec_time', 0),
361 1
                $queryBuilder->expr()->gte('exec_time', $queryBuilder->createNamedParameter($start, \PDO::PARAM_INT)),
362 1
                $queryBuilder->expr()->lte('exec_time', $queryBuilder->createNamedParameter($end, \PDO::PARAM_INT))
363
            )
364 1
            ->groupBy('process_id_completed')
365 1
            ->execute();
366
367 1
        $rows = [];
368 1
        while ($row = $statement->fetch()) {
369 1
            $rows[$row['process_id_completed']] = $row;
370
        }
371
372 1
        return $rows;
373
    }
374
375
    /**
376
     * Determines if a page is queued
377
     */
378 5
    public function isPageInQueue(int $uid, bool $unprocessed_only = true, bool $timed_only = false, int $timestamp = 0): bool
379
    {
380 5
        $isPageInQueue = false;
381
382 5
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
383
        $statement = $queryBuilder
384 5
            ->from($this->tableName)
385 5
            ->count('*')
386 5
            ->where(
387 5
                $queryBuilder->expr()->eq('page_id', $queryBuilder->createNamedParameter($uid, \PDO::PARAM_INT))
388
            );
389
390 5
        if ($unprocessed_only !== false) {
391 2
            $statement->andWhere(
392 2
                $queryBuilder->expr()->eq('exec_time', 0)
393
            );
394
        }
395
396 5
        if ($timed_only !== false) {
397 1
            $statement->andWhere(
398 1
                $queryBuilder->expr()->neq('scheduled', 0)
399
            );
400
        }
401
402 5
        if ($timestamp) {
403 1
            $statement->andWhere(
404 1
                $queryBuilder->expr()->eq('scheduled', $queryBuilder->createNamedParameter($timestamp, \PDO::PARAM_INT))
405
            );
406
        }
407
408
        // TODO: Currently it's not working if page doesn't exists. See tests
409
        $count = $statement
410 5
            ->execute()
411 5
            ->fetchColumn(0);
412
413 5
        if ($count !== false && $count > 0) {
414 4
            $isPageInQueue = true;
415
        }
416
417 5
        return $isPageInQueue;
418
    }
419
420
    /**
421
     * Method to check if a page is in the queue which is timed for a
422
     * date when it should be crawled
423
     */
424 1
    public function isPageInQueueTimed(int $uid, bool $show_unprocessed = true): bool
425
    {
426 1
        return $this->isPageInQueue($uid, $show_unprocessed);
427
    }
428
429 1
    public function getAvailableSets(): array
430
    {
431 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
432
        $statement = $queryBuilder
433 1
            ->selectLiteral('count(*) as count_value')
434 1
            ->addSelect('set_id', 'scheduled')
435 1
            ->from($this->tableName)
436 1
            ->orderBy('scheduled', 'desc')
437 1
            ->groupBy('set_id', 'scheduled')
438 1
            ->execute();
439
440 1
        $rows = [];
441 1
        while ($row = $statement->fetch()) {
442 1
            $rows[] = $row;
443
        }
444
445 1
        return $rows;
446
    }
447
448 1
    public function findByQueueId(string $queueId): ?array
449
    {
450 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
451
        $queueRec = $queryBuilder
452 1
            ->select('*')
453 1
            ->from($this->tableName)
454 1
            ->where(
455 1
                $queryBuilder->expr()->eq('qid', $queryBuilder->createNamedParameter($queueId))
456
            )
457 1
            ->execute()
458 1
            ->fetch();
459 1
        return is_array($queueRec) ? $queueRec : null;
460
    }
461
462 1
    public function cleanupQueue(): void
463
    {
464 1
        $extensionSettings = GeneralUtility::makeInstance(ExtensionConfigurationProvider::class)->getExtensionConfiguration();
465 1
        $purgeDays = (int) $extensionSettings['purgeQueueDays'];
466
467 1
        if ($purgeDays > 0) {
468 1
            $purgeDate = time() - 24 * 60 * 60 * $purgeDays;
469
470 1
            $queryBuilderDelete = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
471
            $del = $queryBuilderDelete
472 1
                ->delete($this->tableName)
473 1
                ->where(
474 1
                    'exec_time != 0 AND exec_time < ' . $purgeDate
475 1
                )->execute();
476
477 1
            if ($del === false) {
478
                $this->logger->info(
479
                    'Records could not be deleted.'
480
                );
481
            }
482
        }
483 1
    }
484
485
    /**
486
     * Cleans up entries that stayed for too long in the queue. These are default:
487
     * - processed entries that are over 1.5 days in age
488
     * - scheduled entries that are over 7 days old
489
     */
490 1
    public function cleanUpOldQueueEntries(): void
491
    {
492 1
        $extensionSettings = GeneralUtility::makeInstance(ExtensionConfigurationProvider::class)->getExtensionConfiguration();
493
        // 24*60*60 Seconds in 24 hours
494 1
        $processedAgeInSeconds = $extensionSettings['cleanUpProcessedAge'] * 86400;
495 1
        $scheduledAgeInSeconds = $extensionSettings['cleanUpScheduledAge'] * 86400;
496
497 1
        $now = time();
498 1
        $condition = '(exec_time<>0 AND exec_time<' . ($now - $processedAgeInSeconds) . ') OR scheduled<=' . ($now - $scheduledAgeInSeconds);
499
500 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
501
        $del = $queryBuilder
502 1
            ->delete($this->tableName)
503 1
            ->where(
504 1
                $condition
505 1
            )->execute();
506
507 1
        if ($del === false) {
508
            $this->logger->info(
509
                'Records could not be deleted.'
510
            );
511
        }
512 1
    }
513
514 2
    public function fetchRecordsToBeCrawled(int $countInARun)
515
    {
516 2
        $queryBuilderSelect = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
517
        return $queryBuilderSelect
518 2
            ->select('qid', 'scheduled', 'page_id', 'sitemap_priority')
519 2
            ->from($this->tableName)
520 2
            ->leftJoin(
521 2
                $this->tableName,
522 2
                'pages',
523 2
                'p',
524 2
                $queryBuilderSelect->expr()->eq('p.uid', $queryBuilderSelect->quoteIdentifier($this->tableName . '.page_id'))
525
            )
526 2
            ->where(
527 2
                $queryBuilderSelect->expr()->eq('exec_time', 0),
528 2
                $queryBuilderSelect->expr()->eq('process_scheduled', 0),
529 2
                $queryBuilderSelect->expr()->lte('scheduled', time())
530
            )
531 2
            ->orderBy('sitemap_priority', 'DESC')
532 2
            ->addOrderBy('scheduled')
533 2
            ->addOrderBy('qid')
534 2
            ->setMaxResults($countInARun)
535 2
            ->execute()
536 2
            ->fetchAll();
537
    }
538
539 1
    public function updateProcessIdAndSchedulerForQueueIds(array $quidList, string $processId)
540
    {
541 1
        $queryBuilderUpdate = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
542
        return $queryBuilderUpdate
543 1
            ->update($this->tableName)
544 1
            ->where(
545 1
                $queryBuilderUpdate->expr()->in('qid', $quidList)
546
            )
547 1
            ->set('process_scheduled', time())
548 1
            ->set('process_id', $processId)
549 1
            ->execute();
550
    }
551
552 1
    public function unsetProcessScheduledAndProcessIdForQueueEntries(array $processIds): void
553
    {
554 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
555
        $queryBuilder
556 1
            ->update($this->tableName)
557 1
            ->where(
558 1
                $queryBuilder->expr()->eq('exec_time', 0),
559 1
                $queryBuilder->expr()->in('process_id', $queryBuilder->createNamedParameter($processIds, Connection::PARAM_STR_ARRAY))
560
            )
561 1
            ->set('process_scheduled', 0)
562 1
            ->set('process_id', '')
563 1
            ->execute();
564 1
    }
565
566
    /**
567
     * This method is used to count if there are ANY unprocessed queue entries
568
     * of a given page_id and the configuration which matches a given hash.
569
     * If there if none, we can skip an inner detail check
570
     *
571
     * @param int $uid
572
     * @param string $configurationHash
573
     * @return boolean
574
     */
575 4
    public function noUnprocessedQueueEntriesForPageWithConfigurationHashExist($uid, $configurationHash): bool
576
    {
577 4
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
578 4
        $noUnprocessedQueueEntriesFound = true;
579
580
        $result = $queryBuilder
581 4
            ->count('*')
582 4
            ->from($this->tableName)
583 4
            ->where(
584 4
                $queryBuilder->expr()->eq('page_id', (int) $uid),
585 4
                $queryBuilder->expr()->eq('configuration_hash', $queryBuilder->createNamedParameter($configurationHash)),
586 4
                $queryBuilder->expr()->eq('exec_time', 0)
587
            )
588 4
            ->execute()
589 4
            ->fetchColumn();
590
591 4
        if ($result) {
592 4
            $noUnprocessedQueueEntriesFound = false;
593
        }
594
595 4
        return $noUnprocessedQueueEntriesFound;
596
    }
597
598
    /**
599
     * Removes queue entries
600
     */
601 5
    public function flushQueue(QueueFilter $queueFilter): void
602
    {
603 5
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
604
605 5
        switch ($queueFilter) {
606 5
            case 'all':
607
                // No where claus needed delete everything
608 3
                break;
609 2
            case 'pending':
610 1
                $queryBuilder->andWhere($queryBuilder->expr()->eq('exec_time', 0));
611 1
                break;
612 1
            case 'finished':
613
            default:
614 1
                $queryBuilder->andWhere($queryBuilder->expr()->gt('exec_time', 0));
615 1
                break;
616
        }
617
618
        $queryBuilder
619 5
            ->delete($this->tableName)
620 5
            ->execute();
621 5
    }
622
623
    /**
624
     * @param string $processId
625
     *
626
     * @return bool|string
627
     * @deprecated Using QueueRepository->countAllByProcessId() is deprecated since 9.1.5 and will be removed in v11.x, please use QueueRepository->findByProcessId()->count() instead
628
     */
629
    public function countAllByProcessId($processId)
630
    {
631
        trigger_error(
632
            'Using QueueRepository->countAllByProcessId() is deprecated since 9.1.5 and will be removed in v11.x, please use QueueRepository->findByProcessId()->count() instead',
633
            E_USER_DEPRECATED
634
        );
635
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
636
637
        return $queryBuilder
638
            ->count('*')
639
            ->from($this->tableName)
640
            ->where(
641
                $queryBuilder->expr()->eq('process_id', $queryBuilder->createNamedParameter($processId, \PDO::PARAM_STR))
642
            )
643
            ->execute()
644
            ->fetchColumn(0);
645
    }
646
647 9
    public function getDuplicateQueueItemsIfExists(bool $enableTimeslot, int $timestamp, int $currentTime, int $pageId, string $parametersHash): array
648
    {
649 9
        $rows = [];
650
651 9
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
652
        $queryBuilder
653 9
            ->select('qid')
654 9
            ->from('tx_crawler_queue');
655
        //if this entry is scheduled with "now"
656 9
        if ($timestamp <= $currentTime) {
657 3
            if ($enableTimeslot) {
658 2
                $timeBegin = $currentTime - 100;
659 2
                $timeEnd = $currentTime + 100;
660
                $queryBuilder
661 2
                    ->where(
662 2
                        'scheduled BETWEEN ' . $timeBegin . ' AND ' . $timeEnd . ''
663
                    )
664 2
                    ->orWhere(
665 2
                        $queryBuilder->expr()->lte('scheduled', $currentTime)
666
                    );
667
            } else {
668
                $queryBuilder
669 1
                    ->where(
670 3
                        $queryBuilder->expr()->lte('scheduled', $currentTime)
671
                    );
672
            }
673 6
        } elseif ($timestamp > $currentTime) {
674
            //entry with a timestamp in the future need to have the same schedule time
675
            $queryBuilder
676 6
                ->where(
677 6
                    $queryBuilder->expr()->eq('scheduled', $timestamp)
678
                );
679
        }
680
681
        $queryBuilder
682 9
            ->andWhere('NOT exec_time')
683 9
            ->andWhere('NOT process_id')
684 9
            ->andWhere($queryBuilder->expr()->eq('page_id', $queryBuilder->createNamedParameter($pageId, \PDO::PARAM_INT)))
685 9
            ->andWhere($queryBuilder->expr()->eq('parameters_hash', $queryBuilder->createNamedParameter($parametersHash, \PDO::PARAM_STR)));
686
687 9
        $statement = $queryBuilder->execute();
688
689 9
        while ($row = $statement->fetch()) {
690 7
            $rows[] = $row['qid'];
691
        }
692
693 9
        return $rows;
694
    }
695
696 4
    public function getQueueEntriesForPageId(int $id, int $itemsPerPage, QueueFilter $queueFilter): array
697
    {
698 4
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
699
        $queryBuilder
700 4
            ->select('*')
701 4
            ->from($this->tableName)
702 4
            ->where(
703 4
                $queryBuilder->expr()->eq('page_id', $queryBuilder->createNamedParameter($id, PDO::PARAM_INT))
704
            )
705 4
            ->orderBy('scheduled', 'DESC');
706
707 4
        $expressionBuilder = GeneralUtility::makeInstance(ConnectionPool::class)
708 4
            ->getConnectionForTable($this->tableName)
709 4
            ->getExpressionBuilder();
710 4
        $query = $expressionBuilder->andX();
0 ignored issues
show
Unused Code introduced by
The assignment to $query is dead and can be removed.
Loading history...
711
        // PHPStorm adds the highlight that the $addWhere is immediately overwritten,
712
        // but the $query = $expressionBuilder->andX() ensures that the $addWhere is written correctly with AND
713
        // between the statements, it's not a mistake in the code.
714 4
        switch ($queueFilter) {
715 4
            case 'pending':
716 1
                $queryBuilder->andWhere($queryBuilder->expr()->eq('exec_time', 0));
717 1
                break;
718 3
            case 'finished':
719 1
                $queryBuilder->andWhere($queryBuilder->expr()->gt('exec_time', 0));
720 1
                break;
721
        }
722
723 4
        if ($itemsPerPage > 0) {
724
            $queryBuilder
725 4
                ->setMaxResults($itemsPerPage);
726
        }
727
728 4
        return $queryBuilder->execute()->fetchAll();
729
    }
730
731
    /**
732
     * This internal helper method is used to create an instance of an entry object
733
     *
734
     * @param Process $process
735
     * @param string $orderByField first matching item will be returned as object
736
     * @param string $orderBySorting sorting direction
737
     */
738 5
    protected function getFirstOrLastObjectByProcess($process, $orderByField, $orderBySorting = 'ASC'): array
739
    {
740 5
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
741
        $first = $queryBuilder
742 5
            ->select('*')
743 5
            ->from($this->tableName)
744 5
            ->where(
745 5
                $queryBuilder->expr()->eq('process_id_completed', $queryBuilder->createNamedParameter($process->getProcessId())),
746 5
                $queryBuilder->expr()->gt('exec_time', 0)
747
            )
748 5
            ->setMaxResults(1)
749 5
            ->addOrderBy($orderByField, $orderBySorting)
750 5
            ->execute()->fetch(0);
751
752 5
        return $first ?: [];
753
    }
754
}
755