Passed
Push — features/addFlushedPagesToCraw... ( 44f5f4...d700c8 )
by Tomas Norre
08:39
created

QueueRepository::countAllUnassignedPendingItems()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 14
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 10
nc 1
nop 0
dl 0
loc 14
ccs 10
cts 10
cp 1
crap 1
rs 9.9332
c 0
b 0
f 0
1
<?php
2
3
declare(strict_types=1);
4
5
namespace AOE\Crawler\Domain\Repository;
6
7
/***************************************************************
8
 *  Copyright notice
9
 *
10
 *  (c) 2020 AOE GmbH <[email protected]>
11
 *
12
 *  All rights reserved
13
 *
14
 *  This script is part of the TYPO3 project. The TYPO3 project is
15
 *  free software; you can redistribute it and/or modify
16
 *  it under the terms of the GNU General Public License as published by
17
 *  the Free Software Foundation; either version 3 of the License, or
18
 *  (at your option) any later version.
19
 *
20
 *  The GNU General Public License can be found at
21
 *  http://www.gnu.org/copyleft/gpl.html.
22
 *
23
 *  This script is distributed in the hope that it will be useful,
24
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
25
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26
 *  GNU General Public License for more details.
27
 *
28
 *  This copyright notice MUST APPEAR in all copies of the script!
29
 ***************************************************************/
30
31
use AOE\Crawler\Configuration\ExtensionConfigurationProvider;
32
use AOE\Crawler\Domain\Model\Process;
33
use Psr\Log\LoggerAwareInterface;
34
use TYPO3\CMS\Core\Database\Connection;
35
use TYPO3\CMS\Core\Database\ConnectionPool;
36
use TYPO3\CMS\Core\Utility\GeneralUtility;
37
38
/**
39
 * Class QueueRepository
40
 *
41
 * @package AOE\Crawler\Domain\Repository
42
 */
43
class QueueRepository extends AbstractRepository implements LoggerAwareInterface
44
{
45
    use \Psr\Log\LoggerAwareTrait;
46
47
    /**
48
     * @var string
49
     */
50
    protected $tableName = 'tx_crawler_queue';
51
52 104
    public function __construct()
53
    {
54
        // Left empty intentional
55 104
    }
56
57 3
    public function unsetQueueProcessId(string $processId): void
58
    {
59 3
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
60
        $queryBuilder
61 3
            ->update($this->tableName)
62 3
            ->where(
63 3
                $queryBuilder->expr()->eq('process_id', $queryBuilder->createNamedParameter($processId))
64
            )
65 3
            ->set('process_id', '')
66 3
            ->execute();
67 3
    }
68
69
    /**
70
     * This method is used to find the youngest entry for a given process.
71
     */
72 1
    public function findYoungestEntryForProcess(Process $process): array
73
    {
74 1
        return $this->getFirstOrLastObjectByProcess($process, 'exec_time');
75
    }
76
77
    /**
78
     * This method is used to find the oldest entry for a given process.
79
     */
80 1
    public function findOldestEntryForProcess(Process $process): array
81
    {
82 1
        return $this->getFirstOrLastObjectByProcess($process, 'exec_time', 'DESC');
83
    }
84
85
    /**
86
     * Counts all executed items of a process.
87
     *
88
     * @param Process $process
89
     */
90 1
    public function countExecutedItemsByProcess($process): int
91
    {
92 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
93
94
        return $queryBuilder
95 1
            ->count('*')
96 1
            ->from($this->tableName)
97 1
            ->where(
98 1
                $queryBuilder->expr()->eq('process_id_completed', $queryBuilder->createNamedParameter($process->getProcessId())),
99 1
                $queryBuilder->expr()->gt('exec_time', 0)
100
            )
101 1
            ->execute()
102 1
            ->fetchColumn(0);
103
    }
104
105
    /**
106
     * Counts items of a process which yet have not been processed/executed
107
     *
108
     * @param Process $process
109
     */
110 1
    public function countNonExecutedItemsByProcess($process): int
111
    {
112 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
113
114
        return $queryBuilder
115 1
            ->count('*')
116 1
            ->from($this->tableName)
117 1
            ->where(
118 1
                $queryBuilder->expr()->eq('process_id', $queryBuilder->createNamedParameter($process->getProcessId())),
119 1
                $queryBuilder->expr()->eq('exec_time', 0)
120
            )
121 1
            ->execute()
122 1
            ->fetchColumn(0);
123
    }
124
125
    /**
126
     * get items which have not been processed yet
127
     */
128 5
    public function getUnprocessedItems(): array
129
    {
130 5
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
131
132
        return $queryBuilder
133 5
            ->select('*')
134 5
            ->from($this->tableName)
135 5
            ->where(
136 5
                $queryBuilder->expr()->eq('exec_time', 0)
137
            )
138 5
            ->execute()->fetchAll();
139
    }
140
141
    /**
142
     * Count items which have not been processed yet
143
     */
144 5
    public function countUnprocessedItems(): int
145
    {
146 5
        return count($this->getUnprocessedItems());
147
    }
148
149
    /**
150
     * This method can be used to count all queue entrys which are
151
     * scheduled for now or a earlier date.
152
     */
153 2
    public function countAllPendingItems(): int
154
    {
155 2
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
156
157
        return $queryBuilder
158 2
            ->count('*')
159 2
            ->from($this->tableName)
160 2
            ->where(
161 2
                $queryBuilder->expr()->eq('process_scheduled', 0),
162 2
                $queryBuilder->expr()->eq('exec_time', 0),
163 2
                $queryBuilder->expr()->lte('scheduled', time())
164
            )
165 2
            ->execute()
166 2
            ->fetchColumn(0);
167
    }
168
169
    /**
170
     * This method can be used to count all queue entries which are
171
     * scheduled for now or a earlier date and are assigned to a process.
172
     */
173 2
    public function countAllAssignedPendingItems(): int
174
    {
175 2
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
176
177
        return $queryBuilder
178 2
            ->count('*')
179 2
            ->from($this->tableName)
180 2
            ->where(
181 2
                $queryBuilder->expr()->neq('process_id', '""'),
182 2
                $queryBuilder->expr()->eq('exec_time', 0),
183 2
                $queryBuilder->expr()->lte('scheduled', time())
184
            )
185 2
            ->execute()
186 2
            ->fetchColumn(0);
187
    }
188
189
    /**
190
     * This method can be used to count all queue entrys which are
191
     * scheduled for now or a earlier date and are not assigned to a process.
192
     */
193 2
    public function countAllUnassignedPendingItems(): int
194
    {
195 2
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
196
197
        return $queryBuilder
198 2
            ->count('*')
199 2
            ->from($this->tableName)
200 2
            ->where(
201 2
                $queryBuilder->expr()->eq('process_id', '""'),
202 2
                $queryBuilder->expr()->eq('exec_time', 0),
203 2
                $queryBuilder->expr()->lte('scheduled', time())
204
            )
205 2
            ->execute()
206 2
            ->fetchColumn(0);
207
    }
208
209
    /**
210
     * Count pending queue entries grouped by configuration key
211
     */
212 1
    public function countPendingItemsGroupedByConfigurationKey(): array
213
    {
214 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
215
        $statement = $queryBuilder
216 1
            ->from($this->tableName)
217 1
            ->selectLiteral('count(*) as unprocessed', 'sum(process_id != \'\') as assignedButUnprocessed')
218 1
            ->addSelect('configuration')
219 1
            ->where(
220 1
                $queryBuilder->expr()->eq('exec_time', 0),
221 1
                $queryBuilder->expr()->lt('scheduled', time())
222
            )
223 1
            ->groupBy('configuration')
224 1
            ->execute();
225
226 1
        return $statement->fetchAll();
227
    }
228
229
    /**
230
     * Get set id with unprocessed entries
231
     *
232
     * @return array array of set ids
233
     */
234 1
    public function getSetIdWithUnprocessedEntries(): array
235
    {
236 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
237
        $statement = $queryBuilder
238 1
            ->select('set_id')
239 1
            ->from($this->tableName)
240 1
            ->where(
241 1
                $queryBuilder->expr()->lt('scheduled', time()),
242 1
                $queryBuilder->expr()->eq('exec_time', 0)
243
            )
244 1
            ->addGroupBy('set_id')
245 1
            ->execute();
246
247 1
        $setIds = [];
248 1
        while ($row = $statement->fetch()) {
249 1
            $setIds[] = intval($row['set_id']);
250
        }
251
252 1
        return $setIds;
253
    }
254
255
    /**
256
     * Get total queue entries by configuration
257
     *
258
     * @return array totals by configuration (keys)
259
     */
260 1
    public function getTotalQueueEntriesByConfiguration(array $setIds): array
261
    {
262 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
263 1
        $totals = [];
264 1
        if (count($setIds) > 0) {
265
            $statement = $queryBuilder
266 1
                ->from($this->tableName)
267 1
                ->selectLiteral('count(*) as c')
268 1
                ->addSelect('configuration')
269 1
                ->where(
270 1
                    $queryBuilder->expr()->in('set_id', implode(',', $setIds)),
271 1
                    $queryBuilder->expr()->lt('scheduled', time())
272
                )
273 1
                ->groupBy('configuration')
274 1
                ->execute();
275
276 1
            while ($row = $statement->fetch()) {
277 1
                $totals[$row['configuration']] = $row['c'];
278
            }
279
        }
280
281 1
        return $totals;
282
    }
283
284
    /**
285
     * Get the timestamps of the last processed entries
286
     *
287
     * @param int $limit
288
     */
289 1
    public function getLastProcessedEntriesTimestamps($limit = 100): array
290
    {
291 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
292
        $statement = $queryBuilder
293 1
            ->select('exec_time')
294 1
            ->from($this->tableName)
295 1
            ->addOrderBy('exec_time', 'desc')
296 1
            ->setMaxResults($limit)
297 1
            ->execute();
298
299 1
        $rows = [];
300 1
        while ($row = $statement->fetch()) {
301 1
            $rows[] = $row['exec_time'];
302
        }
303
304 1
        return $rows;
305
    }
306
307
    /**
308
     * Get the last processed entries
309
     *
310
     * @param int $limit
311
     */
312 1
    public function getLastProcessedEntries($limit = 100): array
313
    {
314 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
315
        $statement = $queryBuilder
316 1
            ->from($this->tableName)
317 1
            ->select('*')
318 1
            ->orderBy('exec_time', 'desc')
319 1
            ->setMaxResults($limit)
320 1
            ->execute();
321
322 1
        $rows = [];
323 1
        while (($row = $statement->fetch()) !== false) {
324 1
            $rows[] = $row;
325
        }
326
327 1
        return $rows;
328
    }
329
330
    /**
331
     * Get performance statistics data
332
     *
333
     * @param int $start timestamp
334
     * @param int $end timestamp
335
     *
336
     * @return array performance data
337
     */
338 1
    public function getPerformanceData($start, $end): array
339
    {
340 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
341
        $statement = $queryBuilder
342 1
            ->from($this->tableName)
343 1
            ->selectLiteral('min(exec_time) as start', 'max(exec_time) as end', 'count(*) as urlcount')
344 1
            ->addSelect('process_id_completed')
345 1
            ->where(
346 1
                $queryBuilder->expr()->neq('exec_time', 0),
347 1
                $queryBuilder->expr()->gte('exec_time', $queryBuilder->createNamedParameter($start, \PDO::PARAM_INT)),
348 1
                $queryBuilder->expr()->lte('exec_time', $queryBuilder->createNamedParameter($end, \PDO::PARAM_INT))
349
            )
350 1
            ->groupBy('process_id_completed')
351 1
            ->execute();
352
353 1
        $rows = [];
354 1
        while ($row = $statement->fetch()) {
355 1
            $rows[$row['process_id_completed']] = $row;
356
        }
357
358 1
        return $rows;
359
    }
360
361
    /**
362
     * Determines if a page is queued
363
     */
364 5
    public function isPageInQueue(int $uid, bool $unprocessed_only = true, bool $timed_only = false, int $timestamp = 0): bool
365
    {
366 5
        $isPageInQueue = false;
367
368 5
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
369
        $statement = $queryBuilder
370 5
            ->from($this->tableName)
371 5
            ->count('*')
372 5
            ->where(
373 5
                $queryBuilder->expr()->eq('page_id', $queryBuilder->createNamedParameter($uid, \PDO::PARAM_INT))
374
            );
375
376 5
        if ($unprocessed_only !== false) {
377 2
            $statement->andWhere(
378 2
                $queryBuilder->expr()->eq('exec_time', 0)
379
            );
380
        }
381
382 5
        if ($timed_only !== false) {
383 1
            $statement->andWhere(
384 1
                $queryBuilder->expr()->neq('scheduled', 0)
385
            );
386
        }
387
388 5
        if ($timestamp) {
389 1
            $statement->andWhere(
390 1
                $queryBuilder->expr()->eq('scheduled', $queryBuilder->createNamedParameter($timestamp, \PDO::PARAM_INT))
391
            );
392
        }
393
394
        // TODO: Currently it's not working if page doesn't exists. See tests
395
        $count = $statement
396 5
            ->execute()
397 5
            ->fetchColumn(0);
398
399 5
        if ($count !== false && $count > 0) {
400 4
            $isPageInQueue = true;
401
        }
402
403 5
        return $isPageInQueue;
404
    }
405
406
    /**
407
     * Method to check if a page is in the queue which is timed for a
408
     * date when it should be crawled
409
     */
410 1
    public function isPageInQueueTimed(int $uid, bool $show_unprocessed = true): bool
411
    {
412 1
        return $this->isPageInQueue($uid, $show_unprocessed);
413
    }
414
415 1
    public function getAvailableSets(): array
416
    {
417 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
418
        $statement = $queryBuilder
419 1
            ->selectLiteral('count(*) as count_value')
420 1
            ->addSelect('set_id', 'scheduled')
421 1
            ->from($this->tableName)
422 1
            ->orderBy('scheduled', 'desc')
423 1
            ->groupBy('set_id', 'scheduled')
424 1
            ->execute();
425
426 1
        $rows = [];
427 1
        while ($row = $statement->fetch()) {
428 1
            $rows[] = $row;
429
        }
430
431 1
        return $rows;
432
    }
433
434 1
    public function findByQueueId(string $queueId): ?array
435
    {
436 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
437
        $queueRec = $queryBuilder
438 1
            ->select('*')
439 1
            ->from($this->tableName)
440 1
            ->where(
441 1
                $queryBuilder->expr()->eq('qid', $queryBuilder->createNamedParameter($queueId))
442
            )
443 1
            ->execute()
444 1
            ->fetch();
445 1
        return is_array($queueRec) ? $queueRec : null;
446
    }
447
448 1
    public function cleanupQueue(): void
449
    {
450 1
        $extensionSettings = GeneralUtility::makeInstance(ExtensionConfigurationProvider::class)->getExtensionConfiguration();
451 1
        $purgeDays = (int) $extensionSettings['purgeQueueDays'];
452
453 1
        if ($purgeDays > 0) {
454 1
            $purgeDate = time() - 24 * 60 * 60 * $purgeDays;
455
456 1
            $queryBuilderDelete = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
457
            $del = $queryBuilderDelete
458 1
                ->delete($this->tableName)
459 1
                ->where(
460 1
                    'exec_time != 0 AND exec_time < ' . $purgeDate
461 1
                )->execute();
462
463 1
            if ($del === false) {
464
                $this->logger->info(
465
                    'Records could not be deleted.'
466
                );
467
            }
468
        }
469 1
    }
470
471
    /**
472
     * Cleans up entries that stayed for too long in the queue. These are default:
473
     * - processed entries that are over 1.5 days in age
474
     * - scheduled entries that are over 7 days old
475
     */
476 1
    public function cleanUpOldQueueEntries(): void
477
    {
478 1
        $extensionSettings = GeneralUtility::makeInstance(ExtensionConfigurationProvider::class)->getExtensionConfiguration();
479 1
        $processedAgeInSeconds = $extensionSettings['cleanUpProcessedAge'] * 86400; // 24*60*60 Seconds in 24 hours
480 1
        $scheduledAgeInSeconds = $extensionSettings['cleanUpScheduledAge'] * 86400;
481
482 1
        $now = time();
483 1
        $condition = '(exec_time<>0 AND exec_time<' . ($now - $processedAgeInSeconds) . ') OR scheduled<=' . ($now - $scheduledAgeInSeconds);
484
485 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
486
        $del = $queryBuilder
487 1
            ->delete($this->tableName)
488 1
            ->where(
489 1
                $condition
490 1
            )->execute();
491
492 1
        if ($del === false) {
493
            $this->logger->info(
494
                'Records could not be deleted.'
495
            );
496
        }
497 1
    }
498
499 1
    public function fetchRecordsToBeCrawled(int $countInARun)
500
    {
501 1
        $queryBuilderSelect = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
502
        return $queryBuilderSelect
503 1
            ->select('qid', 'scheduled')
504 1
            ->from($this->tableName)
505 1
            ->where(
506 1
                $queryBuilderSelect->expr()->eq('exec_time', 0),
507 1
                $queryBuilderSelect->expr()->eq('process_scheduled', 0),
508 1
                $queryBuilderSelect->expr()->lte('scheduled', time())
509
            )
510 1
            ->orderBy('scheduled')
511 1
            ->addOrderBy('qid')
512 1
            ->setMaxResults($countInARun)
513 1
            ->execute()
514 1
            ->fetchAll();
515
    }
516
517 1
    public function updateProcessIdAndSchedulerForQueueIds(array $quidList, string $processId)
518
    {
519 1
        $queryBuilderUpdate = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
520
        return $queryBuilderUpdate
521 1
            ->update($this->tableName)
522 1
            ->where(
523 1
                $queryBuilderUpdate->expr()->in('qid', $quidList)
524
            )
525 1
            ->set('process_scheduled', time())
526 1
            ->set('process_id', $processId)
527 1
            ->execute();
528
    }
529
530 1
    public function unsetProcessScheduledAndProcessIdForQueueEntries(array $processIds): void
531
    {
532 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
533
        $queryBuilder
534 1
            ->update($this->tableName)
535 1
            ->where(
536 1
                $queryBuilder->expr()->eq('exec_time', 0),
537 1
                $queryBuilder->expr()->in('process_id', $queryBuilder->createNamedParameter($processIds, Connection::PARAM_STR_ARRAY))
538
            )
539 1
            ->set('process_scheduled', 0)
540 1
            ->set('process_id', '')
541 1
            ->execute();
542 1
    }
543
544
    /**
545
     * This method is used to count if there are ANY unprocessed queue entries
546
     * of a given page_id and the configuration which matches a given hash.
547
     * If there if none, we can skip an inner detail check
548
     *
549
     * @param int $uid
550
     * @param string $configurationHash
551
     * @return boolean
552
     */
553 4
    public function noUnprocessedQueueEntriesForPageWithConfigurationHashExist($uid, $configurationHash): bool
554
    {
555 4
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
556 4
        $noUnprocessedQueueEntriesFound = true;
557
558
        $result = $queryBuilder
559 4
            ->count('*')
560 4
            ->from($this->tableName)
561 4
            ->where(
562 4
                $queryBuilder->expr()->eq('page_id', (int) $uid),
563 4
                $queryBuilder->expr()->eq('configuration_hash', $queryBuilder->createNamedParameter($configurationHash)),
564 4
                $queryBuilder->expr()->eq('exec_time', 0)
565
            )
566 4
            ->execute()
567 4
            ->fetchColumn();
568
569 4
        if ($result) {
570 4
            $noUnprocessedQueueEntriesFound = false;
571
        }
572
573 4
        return $noUnprocessedQueueEntriesFound;
574
    }
575
576
    /**
577
     * Removes queue entries
578
     *
579
     * @param string $filter all, pending, finished
580
     */
581 6
    public function flushQueue(string $filter): void
582
    {
583 6
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
584
585 6
        switch (strtolower($filter)) {
586 6
            case 'all':
587
                // No where claus needed delete everything
588 2
                break;
589 4
            case 'pending':
590 1
                $queryBuilder->andWhere($queryBuilder->expr()->eq('exec_time', 0));
591 1
                break;
592 3
            case 'finished':
593
            default:
594 3
                $queryBuilder->andWhere($queryBuilder->expr()->gt('exec_time', 0));
595 3
                break;
596
        }
597
598
        $queryBuilder
599 6
            ->delete($this->tableName)
600 6
            ->execute();
601 6
    }
602
603
    /**
604
     * This internal helper method is used to create an instance of an entry object
605
     *
606
     * @param Process $process
607
     * @param string $orderByField first matching item will be returned as object
608
     * @param string $orderBySorting sorting direction
609
     */
610 5
    protected function getFirstOrLastObjectByProcess($process, $orderByField, $orderBySorting = 'ASC'): array
611
    {
612 5
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
613
        $first = $queryBuilder
614 5
            ->select('*')
615 5
            ->from($this->tableName)
616 5
            ->where(
617 5
                $queryBuilder->expr()->eq('process_id_completed', $queryBuilder->createNamedParameter($process->getProcessId())),
618 5
                $queryBuilder->expr()->gt('exec_time', 0)
619
            )
620 5
            ->setMaxResults(1)
621 5
            ->addOrderBy($orderByField, $orderBySorting)
622 5
            ->execute()->fetch(0);
623
624 5
        return $first ?: [];
625
    }
626
}
627