Passed
Push — cleanup/crawlercontroller-cli-... ( 1c5632 )
by Tomas Norre
12:41 queued 02:22
created

QueueRepository::cleanupQueue()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 18
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 11
CRAP Score 3.1707

Importance

Changes 0
Metric Value
cc 3
eloc 13
c 0
b 0
f 0
nc 3
nop 0
dl 0
loc 18
ccs 11
cts 15
cp 0.7332
crap 3.1707
rs 9.8333
1
<?php
2
3
declare(strict_types=1);
4
5
namespace AOE\Crawler\Domain\Repository;
6
7
/***************************************************************
8
 *  Copyright notice
9
 *
10
 *  (c) 2020 AOE GmbH <[email protected]>
11
 *
12
 *  All rights reserved
13
 *
14
 *  This script is part of the TYPO3 project. The TYPO3 project is
15
 *  free software; you can redistribute it and/or modify
16
 *  it under the terms of the GNU General Public License as published by
17
 *  the Free Software Foundation; either version 3 of the License, or
18
 *  (at your option) any later version.
19
 *
20
 *  The GNU General Public License can be found at
21
 *  http://www.gnu.org/copyleft/gpl.html.
22
 *
23
 *  This script is distributed in the hope that it will be useful,
24
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
25
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26
 *  GNU General Public License for more details.
27
 *
28
 *  This copyright notice MUST APPEAR in all copies of the script!
29
 ***************************************************************/
30
31
use AOE\Crawler\Configuration\ExtensionConfigurationProvider;
32
use AOE\Crawler\Domain\Model\Process;
33
use Psr\Log\LoggerAwareInterface;
34
use TYPO3\CMS\Core\Database\ConnectionPool;
35
use TYPO3\CMS\Core\Utility\GeneralUtility;
36
use TYPO3\CMS\Core\Utility\MathUtility;
37
38
/**
39
 * Class QueueRepository
40
 *
41
 * @package AOE\Crawler\Domain\Repository
42
 */
43
class QueueRepository extends AbstractRepository implements LoggerAwareInterface
44
{
45
    use \Psr\Log\LoggerAwareTrait;
46
47
    /**
48
     * @var string
49
     */
50
    protected $tableName = 'tx_crawler_queue';
51
52 88
    public function __construct()
53
    {
54
        // Left empty intentional
55 88
    }
56
57 3
    public function unsetQueueProcessId(string $processId): void
58
    {
59 3
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
60
        $queryBuilder
61 3
            ->update($this->tableName)
62 3
            ->where(
63 3
                $queryBuilder->expr()->eq('process_id', $queryBuilder->createNamedParameter($processId))
64
            )
65 3
            ->set('process_id', '')
66 3
            ->execute();
67 3
    }
68
69
    /**
70
     * This method is used to find the youngest entry for a given process.
71
     *
72
     * @param Process $process
73
     *
74
     * @return array
75
     */
76 1
    public function findYoungestEntryForProcess(Process $process): array
77
    {
78 1
        return $this->getFirstOrLastObjectByProcess($process, 'exec_time');
79
    }
80
81
    /**
82
     * This method is used to find the oldest entry for a given process.
83
     *
84
     * @param Process $process
85
     *
86
     * @return array
87
     */
88 1
    public function findOldestEntryForProcess(Process $process): array
89
    {
90 1
        return $this->getFirstOrLastObjectByProcess($process, 'exec_time', 'DESC');
91
    }
92
93
    /**
94
     * This internal helper method is used to create an instance of an entry object
95
     *
96
     * @param Process $process
97
     * @param string $orderByField first matching item will be returned as object
98
     * @param string $orderBySorting sorting direction
99
     *
100
     * @return array
101
     */
102 5
    protected function getFirstOrLastObjectByProcess($process, $orderByField, $orderBySorting = 'ASC'): array
103
    {
104 5
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
105
        $first = $queryBuilder
106 5
            ->select('*')
107 5
            ->from($this->tableName)
108 5
            ->where(
109 5
                $queryBuilder->expr()->eq('process_id_completed', $queryBuilder->createNamedParameter($process->getProcessId())),
110 5
                $queryBuilder->expr()->gt('exec_time', 0)
111
            )
112 5
            ->setMaxResults(1)
113 5
            ->addOrderBy($orderByField, $orderBySorting)
114 5
            ->execute()->fetch(0);
115
116 5
        return $first ?: [];
117
    }
118
119
    /**
120
     * Counts all executed items of a process.
121
     *
122
     * @param Process $process
123
     *
124
     * @return int
125
     */
126 1
    public function countExecutedItemsByProcess($process): int
127
    {
128 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
129
130
        return $queryBuilder
131 1
            ->count('*')
132 1
            ->from($this->tableName)
133 1
            ->where(
134 1
                $queryBuilder->expr()->eq('process_id_completed', $queryBuilder->createNamedParameter($process->getProcessId())),
135 1
                $queryBuilder->expr()->gt('exec_time', 0)
136
            )
137 1
            ->execute()
138 1
            ->fetchColumn(0);
139
    }
140
141
    /**
142
     * Counts items of a process which yet have not been processed/executed
143
     *
144
     * @param Process $process
145
     *
146
     * @return int
147
     */
148 1
    public function countNonExecutedItemsByProcess($process): int
149
    {
150 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
151
152
        return $queryBuilder
153 1
            ->count('*')
154 1
            ->from($this->tableName)
155 1
            ->where(
156 1
                $queryBuilder->expr()->eq('process_id', $queryBuilder->createNamedParameter($process->getProcessId())),
157 1
                $queryBuilder->expr()->eq('exec_time', 0)
158
            )
159 1
            ->execute()
160 1
            ->fetchColumn(0);
161
    }
162
163
    /**
164
     * get items which have not been processed yet
165
     *
166
     * @return array
167
     */
168 3
    public function getUnprocessedItems(): array
169
    {
170 3
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
171
172
        return $queryBuilder
173 3
            ->select('*')
174 3
            ->from($this->tableName)
175 3
            ->where(
176 3
                $queryBuilder->expr()->eq('exec_time', 0)
177
            )
178 3
            ->execute()->fetchAll();
179
    }
180
181
    /**
182
     * Count items which have not been processed yet
183
     *
184
     * @return int
185
     */
186 3
    public function countUnprocessedItems(): int
187
    {
188 3
        return count($this->getUnprocessedItems());
189
    }
190
191
    /**
192
     * This method can be used to count all queue entrys which are
193
     * scheduled for now or a earlier date.
194
     *
195
     * @return int
196
     */
197 2
    public function countAllPendingItems(): int
198
    {
199 2
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
200
201
        return $queryBuilder
202 2
            ->count('*')
203 2
            ->from($this->tableName)
204 2
            ->where(
205 2
                $queryBuilder->expr()->eq('process_scheduled', 0),
206 2
                $queryBuilder->expr()->eq('exec_time', 0),
207 2
                $queryBuilder->expr()->lte('scheduled', time())
208
            )
209 2
            ->execute()
210 2
            ->fetchColumn(0);
211
    }
212
213
    /**
214
     * This method can be used to count all queue entries which are
215
     * scheduled for now or a earlier date and are assigned to a process.
216
     *
217
     * @return int
218
     */
219 2
    public function countAllAssignedPendingItems(): int
220
    {
221 2
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
222
223
        return $queryBuilder
224 2
            ->count('*')
225 2
            ->from($this->tableName)
226 2
            ->where(
227 2
                $queryBuilder->expr()->neq('process_id', '""'),
228 2
                $queryBuilder->expr()->eq('exec_time', 0),
229 2
                $queryBuilder->expr()->lte('scheduled', time())
230
            )
231 2
            ->execute()
232 2
            ->fetchColumn(0);
233
    }
234
235
    /**
236
     * This method can be used to count all queue entrys which are
237
     * scheduled for now or a earlier date and are not assigned to a process.
238
     *
239
     * @return int
240
     */
241 1
    public function countAllUnassignedPendingItems(): int
242
    {
243 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
244
245
        return $queryBuilder
246 1
            ->count('*')
247 1
            ->from($this->tableName)
248 1
            ->where(
249 1
                $queryBuilder->expr()->eq('process_id', '""'),
250 1
                $queryBuilder->expr()->eq('exec_time', 0),
251 1
                $queryBuilder->expr()->lte('scheduled', time())
252
            )
253 1
            ->execute()
254 1
            ->fetchColumn(0);
255
    }
256
257
    /**
258
     * Count pending queue entries grouped by configuration key
259
     *
260
     * @return array
261
     */
262 1
    public function countPendingItemsGroupedByConfigurationKey(): array
263
    {
264 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
265
        $statement = $queryBuilder
266 1
            ->from($this->tableName)
267 1
            ->selectLiteral('count(*) as unprocessed', 'sum(process_id != \'\') as assignedButUnprocessed')
268 1
            ->addSelect('configuration')
269 1
            ->where(
270 1
                $queryBuilder->expr()->eq('exec_time', 0),
271 1
                $queryBuilder->expr()->lt('scheduled', time())
272
            )
273 1
            ->groupBy('configuration')
274 1
            ->execute();
275
276 1
        return $statement->fetchAll();
277
    }
278
279
    /**
280
     * Get set id with unprocessed entries
281
     *
282
     * @return array array of set ids
283
     */
284 1
    public function getSetIdWithUnprocessedEntries(): array
285
    {
286 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
287
        $statement = $queryBuilder
288 1
            ->select('set_id')
289 1
            ->from($this->tableName)
290 1
            ->where(
291 1
                $queryBuilder->expr()->lt('scheduled', time()),
292 1
                $queryBuilder->expr()->eq('exec_time', 0)
293
            )
294 1
            ->addGroupBy('set_id')
295 1
            ->execute();
296
297 1
        $setIds = [];
298 1
        while ($row = $statement->fetch()) {
299 1
            $setIds[] = intval($row['set_id']);
300
        }
301
302 1
        return $setIds;
303
    }
304
305
    /**
306
     * Get total queue entries by configuration
307
     *
308
     * @param array $setIds
309
     *
310
     * @return array totals by configuration (keys)
311
     */
312 1
    public function getTotalQueueEntriesByConfiguration(array $setIds): array
313
    {
314 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
315 1
        $totals = [];
316 1
        if (count($setIds) > 0) {
317
            $statement = $queryBuilder
318 1
                ->from($this->tableName)
319 1
                ->selectLiteral('count(*) as c')
320 1
                ->addSelect('configuration')
321 1
                ->where(
322 1
                    $queryBuilder->expr()->in('set_id', implode(',', $setIds)),
323 1
                    $queryBuilder->expr()->lt('scheduled', time())
324
                )
325 1
                ->groupBy('configuration')
326 1
                ->execute();
327
328 1
            while ($row = $statement->fetch()) {
329 1
                $totals[$row['configuration']] = $row['c'];
330
            }
331
        }
332
333 1
        return $totals;
334
    }
335
336
    /**
337
     * Get the timestamps of the last processed entries
338
     *
339
     * @param int $limit
340
     *
341
     * @return array
342
     */
343 1
    public function getLastProcessedEntriesTimestamps($limit = 100): array
344
    {
345 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
346
        $statement = $queryBuilder
347 1
            ->select('exec_time')
348 1
            ->from($this->tableName)
349 1
            ->addOrderBy('exec_time', 'desc')
350 1
            ->setMaxResults($limit)
351 1
            ->execute();
352
353 1
        $rows = [];
354 1
        while ($row = $statement->fetch()) {
355 1
            $rows[] = $row['exec_time'];
356
        }
357
358 1
        return $rows;
359
    }
360
361
    /**
362
     * Get the last processed entries
363
     *
364
     * @param int $limit
365
     *
366
     * @return array
367
     */
368 1
    public function getLastProcessedEntries($limit = 100): array
369
    {
370 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
371
        $statement = $queryBuilder
372 1
            ->from($this->tableName)
373 1
            ->select('*')
374 1
            ->orderBy('exec_time', 'desc')
375 1
            ->setMaxResults($limit)
376 1
            ->execute();
377
378 1
        $rows = [];
379 1
        while (($row = $statement->fetch()) !== false) {
380 1
            $rows[] = $row;
381
        }
382
383 1
        return $rows;
384
    }
385
386
    /**
387
     * Get performance statistics data
388
     *
389
     * @param int $start timestamp
390
     * @param int $end timestamp
391
     *
392
     * @return array performance data
393
     */
394 1
    public function getPerformanceData($start, $end): array
395
    {
396 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
397
        $statement = $queryBuilder
398 1
            ->from($this->tableName)
399 1
            ->selectLiteral('min(exec_time) as start', 'max(exec_time) as end', 'count(*) as urlcount')
400 1
            ->addSelect('process_id_completed')
401 1
            ->where(
402 1
                $queryBuilder->expr()->neq('exec_time', 0),
403 1
                $queryBuilder->expr()->gte('exec_time', $queryBuilder->createNamedParameter($start, \PDO::PARAM_INT)),
404 1
                $queryBuilder->expr()->lte('exec_time', $queryBuilder->createNamedParameter($end, \PDO::PARAM_INT))
405
            )
406 1
            ->groupBy('process_id_completed')
407 1
            ->execute();
408
409 1
        $rows = [];
410 1
        while ($row = $statement->fetch()) {
411 1
            $rows[$row['process_id_completed']] = $row;
412
        }
413
414 1
        return $rows;
415
    }
416
417
    /**
418
     * Determines if a page is queued
419
     */
420 5
    public function isPageInQueue(int $uid, bool $unprocessed_only = true, bool $timed_only = false, int $timestamp = 0): bool
421
    {
422
        // TODO: Looks like the check is obsolete as PHP doesn't allow other than ints for the $uid, PHP 7.2 Strict Mode
423 5
        if (!MathUtility::canBeInterpretedAsInteger($uid)) {
424
            throw new \InvalidArgumentException('Invalid parameter type', 1468931945);
425
        }
426
427 5
        $isPageInQueue = false;
428
429 5
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
430
        $statement = $queryBuilder
431 5
            ->from($this->tableName)
432 5
            ->count('*')
433 5
            ->where(
434 5
                $queryBuilder->expr()->eq('page_id', $queryBuilder->createNamedParameter($uid, \PDO::PARAM_INT))
435
            );
436
437 5
        if (false !== $unprocessed_only) {
438 2
            $statement->andWhere(
439 2
                $queryBuilder->expr()->eq('exec_time', 0)
440
            );
441
        }
442
443 5
        if (false !== $timed_only) {
444 1
            $statement->andWhere(
445 1
                $queryBuilder->expr()->neq('scheduled', 0)
446
            );
447
        }
448
449 5
        if ($timestamp) {
450 1
            $statement->andWhere(
451 1
                $queryBuilder->expr()->eq('scheduled', $queryBuilder->createNamedParameter($timestamp, \PDO::PARAM_INT))
452
            );
453
        }
454
455
        // TODO: Currently it's not working if page doesn't exists. See tests
456
        $count = $statement
457 5
            ->execute()
458 5
            ->fetchColumn(0);
459
460 5
        if (false !== $count && $count > 0) {
461 4
            $isPageInQueue = true;
462
        }
463
464 5
        return $isPageInQueue;
465
    }
466
467
    /**
468
     * Method to check if a page is in the queue which is timed for a
469
     * date when it should be crawled
470
     */
471 1
    public function isPageInQueueTimed(int $uid, bool $show_unprocessed = true): bool
472
    {
473 1
        return $this->isPageInQueue($uid, $show_unprocessed);
474
    }
475
476 1
    public function getAvailableSets(): array
477
    {
478 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
479
        $statement = $queryBuilder
480 1
            ->selectLiteral('count(*) as count_value')
481 1
            ->addSelect('set_id', 'scheduled')
482 1
            ->from($this->tableName)
483 1
            ->orderBy('scheduled', 'desc')
484 1
            ->groupBy('set_id', 'scheduled')
485 1
            ->execute();
486
487 1
        $rows = [];
488 1
        while ($row = $statement->fetch()) {
489 1
            $rows[] = $row;
490
        }
491
492 1
        return $rows;
493
    }
494
495 1
    public function findByQueueId(string $queueId): ?array
496
    {
497 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
498
        $queueRec = $queryBuilder
499 1
            ->select('*')
500 1
            ->from($this->tableName)
501 1
            ->where(
502 1
                $queryBuilder->expr()->eq('qid', $queryBuilder->createNamedParameter($queueId))
503
            )
504 1
            ->execute()
505 1
            ->fetch();
506 1
        return is_array($queueRec) ? $queueRec : null;
507
    }
508
509 1
    public function cleanupQueue(): void
510
    {
511 1
        $extensionSettings = GeneralUtility::makeInstance(ExtensionConfigurationProvider::class)->getExtensionConfiguration();
512 1
        $purgeDays = (int)$extensionSettings['purgeQueueDays'];
513
514 1
        if ($purgeDays > 0) {
515 1
            $purgeDate = time() - 24 * 60 * 60 * $purgeDays;
516
517 1
            $queryBuilderDelete = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
518
            $del = $queryBuilderDelete
519 1
                ->delete($this->tableName)
520 1
                ->where(
521 1
                    'exec_time != 0 AND exec_time < ' . $purgeDate
522 1
                )->execute();
523
524 1
            if (false === $del) {
525
                $this->logger->info(
526
                    'Records could not be deleted.'
527
                );
528
            }
529
        }
530 1
    }
531
}
532