Completed
Push — typo3v9 ( aea555...37a7d2 )
by Tomas Norre
06:20
created

getLastProcessedEntriesTimestamps()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 17

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 0
Metric Value
cc 2
nc 2
nop 1
dl 0
loc 17
ccs 0
cts 15
cp 0
crap 6
rs 9.7
c 0
b 0
f 0
1
<?php
2
3
declare(strict_types=1);
4
5
namespace AOE\Crawler\Domain\Repository;
6
7
/***************************************************************
8
 *  Copyright notice
9
 *
10
 *  (c) 2019 AOE GmbH <[email protected]>
11
 *
12
 *  All rights reserved
13
 *
14
 *  This script is part of the TYPO3 project. The TYPO3 project is
15
 *  free software; you can redistribute it and/or modify
16
 *  it under the terms of the GNU General Public License as published by
17
 *  the Free Software Foundation; either version 3 of the License, or
18
 *  (at your option) any later version.
19
 *
20
 *  The GNU General Public License can be found at
21
 *  http://www.gnu.org/copyleft/gpl.html.
22
 *
23
 *  This script is distributed in the hope that it will be useful,
24
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
25
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26
 *  GNU General Public License for more details.
27
 *
28
 *  This copyright notice MUST APPEAR in all copies of the script!
29
 ***************************************************************/
30
31
use AOE\Crawler\Domain\Model\Process;
32
use AOE\Crawler\Domain\Model\Queue;
33
use TYPO3\CMS\Core\Database\ConnectionPool;
34
use TYPO3\CMS\Core\Utility\GeneralUtility;
35
use TYPO3\CMS\Core\Utility\MathUtility;
36
37
/**
38
 * Class QueueRepository
39
 *
40
 * @package AOE\Crawler\Domain\Repository
41
 */
42
class QueueRepository extends AbstractRepository
43
{
44
    /**
45
     * @var string
46
     */
47
    protected $tableName = 'tx_crawler_queue';
48
49
    public function __construct()
50
    {
51
        // Left empty intentional
52
    }
53
54
    /**
55
     * @param $processId
56
     */
57
    public function unsetQueueProcessId($processId): void
58
    {
59
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
60
        $queryBuilder
61
            ->update($this->tableName)
62
            ->where(
63
                $queryBuilder->expr()->eq('process_id', $queryBuilder->createNamedParameter($processId))
64
            )
65
            ->set('process_id', '')
66
            ->execute();
67
    }
68
69
    /**
70
     * This method is used to find the youngest entry for a given process.
71
     *
72
     * @param Process $process
73
     *
74
     * @return array
75
     */
76
    public function findYoungestEntryForProcess(Process $process): array
77
    {
78
        return $this->getFirstOrLastObjectByProcess($process, 'exec_time');
79
    }
80
81
    /**
82
     * This method is used to find the oldest entry for a given process.
83
     *
84
     * @param Process $process
85
     *
86
     * @return array
87
     */
88
    public function findOldestEntryForProcess(Process $process): array
89
    {
90
        return $this->getFirstOrLastObjectByProcess($process, 'exec_time', 'DESC');
91
    }
92
93
    /**
94
     * This internal helper method is used to create an instance of an entry object
95
     *
96
     * @param Process $process
97
     * @param string $orderByField first matching item will be returned as object
98
     * @param string $orderBySorting sorting direction
99
     *
100
     * @return array
101
     */
102
    protected function getFirstOrLastObjectByProcess($process, $orderByField, $orderBySorting = 'ASC'): array
103
    {
104
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
105
        $first = $queryBuilder
106
            ->select('*')
107
            ->from($this->tableName)
108
            ->where(
109
                $queryBuilder->expr()->eq('process_id_completed', $queryBuilder->createNamedParameter($process->getProcessId())),
110
                $queryBuilder->expr()->gt('exec_time', 0)
111
            )
112
            ->setMaxResults(1)
113
            ->addOrderBy($orderByField, $orderBySorting)
114
            ->execute()->fetch(0);
115
116
        return $first ?: [];
117
    }
118
119
    /**
120
     * Counts all executed items of a process.
121
     *
122
     * @param Process $process
123
     *
124
     * @return int
125
     */
126
    public function countExecutedItemsByProcess($process): int
127
    {
128
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
129
130
        return $queryBuilder
131
            ->count('*')
132
            ->from($this->tableName)
133
            ->where(
134
                $queryBuilder->expr()->eq('process_id_completed', $queryBuilder->createNamedParameter($process->getProcessId())),
135
                $queryBuilder->expr()->gt('exec_time', 0)
136
            )
137
            ->execute()
138
            ->fetchColumn(0);
139
    }
140
141
    /**
142
     * Counts items of a process which yet have not been processed/executed
143
     *
144
     * @param Process $process
145
     *
146
     * @return int
147
     */
148
    public function countNonExecutedItemsByProcess($process): int
149
    {
150
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
151
152
        return $queryBuilder
153
            ->count('*')
154
            ->from($this->tableName)
155
            ->where(
156
                $queryBuilder->expr()->eq('process_id', $queryBuilder->createNamedParameter($process->getProcessId())),
157
                $queryBuilder->expr()->eq('exec_time', 0)
158
            )
159
            ->execute()
160
            ->fetchColumn(0);
161
    }
162
163
    /**
164
     * get items which have not been processed yet
165
     *
166
     * @return array
167
     */
168
    public function getUnprocessedItems(): array
169
    {
170
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
171
172
        return $queryBuilder
173
            ->select('*')
174
            ->from($this->tableName)
175
            ->where(
176
                $queryBuilder->expr()->eq('exec_time', 0)
177
            )
178
            ->execute()->fetchAll();
179
    }
180
181
    /**
182
     * Count items which have not been processed yet
183
     *
184
     * @return int
185
     */
186
    public function countUnprocessedItems(): int
187
    {
188
        return count($this->getUnprocessedItems());
189
    }
190
191
    /**
192
     * This method can be used to count all queue entrys which are
193
     * scheduled for now or a earlier date.
194
     *
195
     * @return int
196
     */
197
    public function countAllPendingItems(): int
198
    {
199
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
200
201
        return $queryBuilder
202
            ->count('*')
203
            ->from($this->tableName)
204
            ->where(
205
                $queryBuilder->expr()->eq('process_scheduled', 0),
206
                $queryBuilder->expr()->eq('exec_time', 0),
207
                $queryBuilder->expr()->lte('scheduled', time())
208
            )
209
            ->execute()
210
            ->fetchColumn(0);
211
    }
212
213
    /**
214
     * This method can be used to count all queue entrys which are
215
     * scheduled for now or a earlier date and are assigned to a process.
216
     *
217
     * @return int
218
     */
219
    public function countAllAssignedPendingItems(): int
220
    {
221
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
222
223
        return $queryBuilder
224
            ->count('*')
225
            ->from($this->tableName)
226
            ->where(
227
                $queryBuilder->expr()->neq('process_id', '""'),
228
                $queryBuilder->expr()->eq('exec_time', 0),
229
                $queryBuilder->expr()->lte('scheduled', time())
230
            )
231
            ->execute()
232
            ->fetchColumn(0);
233
    }
234
235
    /**
236
     * This method can be used to count all queue entrys which are
237
     * scheduled for now or a earlier date and are not assigned to a process.
238
     *
239
     * @return int
240
     */
241
    public function countAllUnassignedPendingItems(): int
242
    {
243
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
244
245
        return $queryBuilder
246
            ->count('*')
247
            ->from($this->tableName)
248
            ->where(
249
                $queryBuilder->expr()->eq('process_id', '""'),
250
                $queryBuilder->expr()->eq('exec_time', 0),
251
                $queryBuilder->expr()->lte('scheduled', time())
252
            )
253
            ->execute()
254
            ->fetchColumn(0);
255
    }
256
257
    /**
258
     * Count pending queue entries grouped by configuration key
259
     *
260
     * @return array
261
     */
262
    public function countPendingItemsGroupedByConfigurationKey(): array
263
    {
264
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
265
        $statement = $queryBuilder
266
            ->from($this->tableName)
267
            ->selectLiteral('count(*) as unprocessed', 'sum(process_id != \'\') as assignedButUnprocessed')
268
            ->addSelect('configuration')
269
            ->where(
270
                $queryBuilder->expr()->eq('exec_time', 0),
271
                $queryBuilder->expr()->lt('scheduled', time())
272
            )
273
            ->groupBy('configuration')
274
            ->execute();
275
276
        return $statement->fetchAll();
277
    }
278
279
    /**
280
     * Get set id with unprocessed entries
281
     *
282
     * @return array array of set ids
283
     */
284
    public function getSetIdWithUnprocessedEntries(): array
285
    {
286
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
287
        $statement = $queryBuilder
288
            ->select('set_id')
289
            ->from($this->tableName)
290
            ->where(
291
                $queryBuilder->expr()->lt('scheduled', time()),
292
                $queryBuilder->expr()->eq('exec_time', 0)
293
            )
294
            ->addGroupBy('set_id')
295
            ->execute();
296
297
        $setIds = [];
298
        while ($row = $statement->fetch()) {
299
            $setIds[] = intval($row['set_id']);
300
        }
301
302
        return $setIds;
303
    }
304
305
    /**
306
     * Get total queue entries by configuration
307
     *
308
     * @param array $setIds
309
     *
310
     * @return array totals by configuration (keys)
311
     */
312
    public function getTotalQueueEntriesByConfiguration(array $setIds): array
313
    {
314
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
315
        $totals = [];
316
        if (count($setIds) > 0) {
317
            $statement = $queryBuilder
318
                ->from($this->tableName)
319
                ->selectLiteral('count(*) as c')
320
                ->addSelect('configuration')
321
                ->where(
322
                    $queryBuilder->expr()->in('set_id', implode(',', $setIds)),
323
                    $queryBuilder->expr()->lt('scheduled', time())
324
                )
325
                ->groupBy('configuration')
326
                ->execute();
327
328
            while ($row = $statement->fetch()) {
329
                $totals[$row['configuration']] = $row['c'];
330
            }
331
        }
332
333
        return $totals;
334
    }
335
336
    /**
337
     * Get the timestamps of the last processed entries
338
     *
339
     * @param int $limit
340
     *
341
     * @return array
342
     */
343
    public function getLastProcessedEntriesTimestamps($limit = 100): array
344
    {
345
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
346
        $statement = $queryBuilder
347
            ->select('exec_time')
348
            ->from($this->tableName)
349
            ->addOrderBy('exec_time', 'desc')
350
            ->setMaxResults($limit)
351
            ->execute();
352
353
        $rows = [];
354
        while ($row = $statement->fetch()) {
355
            $rows[] = $row['exec_time'];
356
        }
357
358
        return $rows;
359
    }
360
361
    /**
362
     * Get the last processed entries
363
     *
364
     * @param int $limit
365
     *
366
     * @return array
367
     */
368
    public function getLastProcessedEntries($limit = 100): array
369
    {
370
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
371
        $statement = $queryBuilder
372
            ->from($this->tableName)
373
            ->select('*')
374
            ->orderBy('exec_time', 'desc')
375
            ->setMaxResults($limit)
376
            ->execute();
377
378
        $rows = [];
379
        while (($row = $statement->fetch()) !== false) {
380
            $rows[] = $row;
381
        }
382
383
        return $rows;
384
    }
385
386
    /**
387
     * Get performance statistics data
388
     *
389
     * @param int $start timestamp
390
     * @param int $end timestamp
391
     *
392
     * @return array performance data
393
     */
394
    public function getPerformanceData($start, $end): array
395
    {
396
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
397
        $statement = $queryBuilder
398
            ->from($this->tableName)
399
            ->selectLiteral('min(exec_time) as start', 'max(exec_time) as end', 'count(*) as urlcount')
400
            ->addSelect('process_id_completed')
401
            ->where(
402
                $queryBuilder->expr()->neq('exec_time', 0),
403
                $queryBuilder->expr()->gte('exec_time', $queryBuilder->createNamedParameter($start, \PDO::PARAM_INT)),
404
                $queryBuilder->expr()->lte('exec_time', $queryBuilder->createNamedParameter($end, \PDO::PARAM_INT))
405
            )
406
            ->groupBy('process_id_completed')
407
            ->execute();
408
409
        $rows = [];
410
        while ($row = $statement->fetch()) {
411
            $rows[$row['process_id_completed']] = $row;
412
        }
413
414
        return $rows;
415
    }
416
417
    /**
418
     * Determines if a page is queued
419
     *
420
     * @param $uid
421
     * @param bool $unprocessed_only
422
     * @param bool $timed_only
423
     * @param bool $timestamp
424
     *
425
     * @return bool
426
     */
427
    public function isPageInQueue($uid, $unprocessed_only = true, $timed_only = false, $timestamp = false): bool
428
    {
429
        if (!MathUtility::canBeInterpretedAsInteger($uid)) {
430
            throw new \InvalidArgumentException('Invalid parameter type', 1468931945);
431
        }
432
433
        $isPageInQueue = false;
434
435
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
436
        $statement = $queryBuilder
437
            ->from($this->tableName)
438
            ->count('*')
439
            ->where(
440
                $queryBuilder->expr()->eq('page_id', $queryBuilder->createNamedParameter($uid, \PDO::PARAM_INT))
441
            );
442
443
        if (false !== $unprocessed_only) {
444
            $statement->andWhere(
445
                $queryBuilder->expr()->eq('exec_time', 0)
446
            );
447
        }
448
449
        if (false !== $timed_only) {
450
            $statement->andWhere(
451
                $queryBuilder->expr()->neq('scheduled', 0)
452
            );
453
        }
454
455
        if (false !== $timestamp) {
456
            $statement->andWhere(
457
                $queryBuilder->expr()->eq('scheduled', $queryBuilder->createNamedParameter($timestamp, \PDO::PARAM_INT))
458
            );
459
        }
460
461
        // TODO: Currently it's not working if page doesn't exists. See tests
462
        $count = $statement
463
            ->execute()
464
            ->fetchColumn(0);
465
466
        if (false !== $count && $count > 0) {
467
            $isPageInQueue = true;
468
        }
469
470
        return $isPageInQueue;
471
    }
472
473
    /**
474
     * Method to check if a page is in the queue which is timed for a
475
     * date when it should be crawled
476
     *
477
     * @param int $uid uid of the page
478
     * @param boolean $show_unprocessed only respect unprocessed pages
479
     *
480
     * @return bool
481
     *
482
     */
483
    public function isPageInQueueTimed($uid, $show_unprocessed = true): bool
484
    {
485
        $uid = intval($uid);
486
        return $this->isPageInQueue($uid, $show_unprocessed);
487
    }
488
489
    /**
490
     * @return array
491
     */
492
    public function getAvailableSets(): array
493
    {
494
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
495
        $statement = $queryBuilder
496
            ->selectLiteral('count(*) as count_value')
497
            ->addSelect('set_id', 'scheduled')
498
            ->from($this->tableName)
499
            ->orderBy('scheduled', 'desc')
500
            ->groupBy('set_id', 'scheduled')
501
            ->execute();
502
503
        $rows = [];
504
        while ($row = $statement->fetch()) {
505
            $rows[] = $row;
506
        }
507
508
        return $rows;
509
    }
510
511
    public function findByQueueId(string $queueId): ?array
512
    {
513
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
514
        $queueRec = $queryBuilder
515
            ->select('*')
516
            ->from($this->tableName)
517
            ->where(
518
                $queryBuilder->expr()->eq('qid', $queryBuilder->createNamedParameter($queueId))
519
            )
520
            ->execute()
521
            ->fetch();
522
        return is_array($queueRec) ? $queueRec : null;
523
    }
524
}
525