1 | <?php |
||
2 | |||
3 | declare(strict_types=1); |
||
4 | |||
5 | namespace AOE\Crawler\Domain\Repository; |
||
6 | |||
7 | /*************************************************************** |
||
8 | * Copyright notice |
||
9 | * |
||
10 | * (c) 2020 AOE GmbH <[email protected]> |
||
11 | * |
||
12 | * All rights reserved |
||
13 | * |
||
14 | * This script is part of the TYPO3 project. The TYPO3 project is |
||
15 | * free software; you can redistribute it and/or modify |
||
16 | * it under the terms of the GNU General Public License as published by |
||
17 | * the Free Software Foundation; either version 3 of the License, or |
||
18 | * (at your option) any later version. |
||
19 | * |
||
20 | * The GNU General Public License can be found at |
||
21 | * http://www.gnu.org/copyleft/gpl.html. |
||
22 | * |
||
23 | * This script is distributed in the hope that it will be useful, |
||
24 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
||
25 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||
26 | * GNU General Public License for more details. |
||
27 | * |
||
28 | * This copyright notice MUST APPEAR in all copies of the script! |
||
29 | ***************************************************************/ |
||
30 | |||
31 | use AOE\Crawler\Configuration\ExtensionConfigurationProvider; |
||
32 | use AOE\Crawler\Domain\Model\Process; |
||
33 | use AOE\Crawler\Value\QueueFilter; |
||
34 | use PDO; |
||
35 | use Psr\Log\LoggerAwareInterface; |
||
36 | use Psr\Log\LoggerAwareTrait; |
||
37 | use TYPO3\CMS\Core\Database\Connection; |
||
38 | use TYPO3\CMS\Core\Database\ConnectionPool; |
||
39 | use TYPO3\CMS\Core\Utility\GeneralUtility; |
||
40 | use TYPO3\CMS\Extbase\Object\ObjectManager; |
||
41 | use TYPO3\CMS\Extbase\Persistence\Repository; |
||
42 | |||
43 | class QueueRepository extends Repository implements LoggerAwareInterface |
||
44 | { |
||
45 | use LoggerAwareTrait; |
||
46 | |||
47 | /** |
||
48 | * @var string |
||
49 | */ |
||
50 | protected $tableName = 'tx_crawler_queue'; |
||
51 | |||
52 | /** |
||
53 | * @var array |
||
54 | */ |
||
55 | protected $extensionSettings; |
||
56 | |||
57 | 112 | public function __construct() |
|
58 | { |
||
59 | 112 | $objectManager = GeneralUtility::makeInstance(ObjectManager::class); |
|
60 | 112 | $this->extensionSettings = GeneralUtility::makeInstance(ExtensionConfigurationProvider::class)->getExtensionConfiguration(); |
|
61 | |||
62 | 112 | parent::__construct($objectManager); |
|
63 | 112 | } |
|
64 | |||
65 | // TODO: Should be a property on the QueueObject |
||
66 | 3 | public function unsetQueueProcessId(string $processId): void |
|
67 | { |
||
68 | 3 | $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName); |
|
69 | $queryBuilder |
||
70 | 3 | ->update($this->tableName) |
|
71 | 3 | ->where( |
|
72 | 3 | $queryBuilder->expr()->eq('process_id', $queryBuilder->createNamedParameter($processId)) |
|
73 | ) |
||
74 | 3 | ->set('process_id', '') |
|
75 | 3 | ->execute(); |
|
76 | 3 | } |
|
77 | |||
78 | /** |
||
79 | * This method is used to find the youngest entry for a given process. |
||
80 | */ |
||
81 | 1 | public function findYoungestEntryForProcess(Process $process): array |
|
82 | { |
||
83 | 1 | return $this->getFirstOrLastObjectByProcess($process, 'exec_time'); |
|
84 | } |
||
85 | |||
86 | /** |
||
87 | * This method is used to find the oldest entry for a given process. |
||
88 | */ |
||
89 | 1 | public function findOldestEntryForProcess(Process $process): array |
|
90 | { |
||
91 | 1 | return $this->getFirstOrLastObjectByProcess($process, 'exec_time', 'DESC'); |
|
92 | } |
||
93 | |||
94 | /** |
||
95 | * Counts all executed items of a process. |
||
96 | * |
||
97 | * @param Process $process |
||
98 | */ |
||
99 | 1 | public function countExecutedItemsByProcess($process): int |
|
100 | { |
||
101 | 1 | $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName); |
|
102 | |||
103 | return $queryBuilder |
||
104 | 1 | ->count('*') |
|
105 | 1 | ->from($this->tableName) |
|
106 | 1 | ->where( |
|
107 | 1 | $queryBuilder->expr()->eq('process_id_completed', $queryBuilder->createNamedParameter($process->getProcessId())), |
|
108 | 1 | $queryBuilder->expr()->gt('exec_time', 0) |
|
109 | ) |
||
110 | 1 | ->execute() |
|
111 | 1 | ->fetchColumn(0); |
|
112 | } |
||
113 | |||
114 | /** |
||
115 | * Counts items of a process which yet have not been processed/executed |
||
116 | * |
||
117 | * @param Process $process |
||
118 | */ |
||
119 | 1 | public function countNonExecutedItemsByProcess($process): int |
|
120 | { |
||
121 | 1 | $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName); |
|
122 | |||
123 | return $queryBuilder |
||
124 | 1 | ->count('*') |
|
125 | 1 | ->from($this->tableName) |
|
126 | 1 | ->where( |
|
127 | 1 | $queryBuilder->expr()->eq('process_id', $queryBuilder->createNamedParameter($process->getProcessId())), |
|
128 | 1 | $queryBuilder->expr()->eq('exec_time', 0) |
|
129 | ) |
||
130 | 1 | ->execute() |
|
131 | 1 | ->fetchColumn(0); |
|
132 | } |
||
133 | |||
134 | /** |
||
135 | * get items which have not been processed yet |
||
136 | */ |
||
137 | 10 | public function getUnprocessedItems(): array |
|
138 | { |
||
139 | 10 | $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName); |
|
140 | |||
141 | return $queryBuilder |
||
142 | 10 | ->select('*') |
|
143 | 10 | ->from($this->tableName) |
|
144 | 10 | ->where( |
|
145 | 10 | $queryBuilder->expr()->eq('exec_time', 0) |
|
146 | ) |
||
147 | 10 | ->execute()->fetchAll(); |
|
148 | } |
||
149 | |||
150 | /** |
||
151 | * Count items which have not been processed yet |
||
152 | * @deprecated Using QueueRepository->countUnprocessedItems() is deprecated since 9.1.5 and will be removed in v11.x, please use count(QueueRepository->getUnprocessedItems()) instead |
||
153 | */ |
||
154 | public function countUnprocessedItems(): int |
||
155 | { |
||
156 | trigger_error( |
||
157 | 'Using QueueRepository->countUnprocessedItems() is deprecated since 9.1.5 and will be removed in v11.x, please use count(QueueRepository->getUnprocessedItems()) instead', |
||
158 | E_USER_DEPRECATED |
||
159 | ); |
||
160 | return count($this->getUnprocessedItems()); |
||
161 | } |
||
162 | |||
163 | /** |
||
164 | * This method can be used to count all queue entrys which are |
||
165 | * scheduled for now or a earlier date. |
||
166 | */ |
||
167 | 2 | public function countAllPendingItems(): int |
|
168 | { |
||
169 | 2 | $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName); |
|
170 | |||
171 | return $queryBuilder |
||
172 | 2 | ->count('*') |
|
173 | 2 | ->from($this->tableName) |
|
174 | 2 | ->where( |
|
175 | 2 | $queryBuilder->expr()->eq('process_scheduled', 0), |
|
176 | 2 | $queryBuilder->expr()->eq('exec_time', 0), |
|
177 | 2 | $queryBuilder->expr()->lte('scheduled', time()) |
|
178 | ) |
||
179 | 2 | ->execute() |
|
180 | 2 | ->fetchColumn(0); |
|
181 | } |
||
182 | |||
183 | /** |
||
184 | * This method can be used to count all queue entries which are |
||
185 | * scheduled for now or a earlier date and are assigned to a process. |
||
186 | */ |
||
187 | 2 | public function countAllAssignedPendingItems(): int |
|
188 | { |
||
189 | 2 | $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName); |
|
190 | |||
191 | return $queryBuilder |
||
192 | 2 | ->count('*') |
|
193 | 2 | ->from($this->tableName) |
|
194 | 2 | ->where( |
|
195 | 2 | $queryBuilder->expr()->neq('process_id', '""'), |
|
196 | 2 | $queryBuilder->expr()->eq('exec_time', 0), |
|
197 | 2 | $queryBuilder->expr()->lte('scheduled', time()) |
|
198 | ) |
||
199 | 2 | ->execute() |
|
200 | 2 | ->fetchColumn(0); |
|
201 | } |
||
202 | |||
203 | /** |
||
204 | * This method can be used to count all queue entrys which are |
||
205 | * scheduled for now or a earlier date and are not assigned to a process. |
||
206 | */ |
||
207 | 2 | public function countAllUnassignedPendingItems(): int |
|
208 | { |
||
209 | 2 | $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName); |
|
210 | |||
211 | return $queryBuilder |
||
212 | 2 | ->count('*') |
|
213 | 2 | ->from($this->tableName) |
|
214 | 2 | ->where( |
|
215 | 2 | $queryBuilder->expr()->eq('process_id', '""'), |
|
216 | 2 | $queryBuilder->expr()->eq('exec_time', 0), |
|
217 | 2 | $queryBuilder->expr()->lte('scheduled', time()) |
|
218 | ) |
||
219 | 2 | ->execute() |
|
220 | 2 | ->fetchColumn(0); |
|
221 | } |
||
222 | |||
223 | /** |
||
224 | * Count pending queue entries grouped by configuration key |
||
225 | */ |
||
226 | 1 | public function countPendingItemsGroupedByConfigurationKey(): array |
|
227 | { |
||
228 | 1 | $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName); |
|
229 | $statement = $queryBuilder |
||
230 | 1 | ->from($this->tableName) |
|
231 | 1 | ->selectLiteral('count(*) as unprocessed', 'sum(process_id != \'\') as assignedButUnprocessed') |
|
232 | 1 | ->addSelect('configuration') |
|
233 | 1 | ->where( |
|
234 | 1 | $queryBuilder->expr()->eq('exec_time', 0), |
|
235 | 1 | $queryBuilder->expr()->lt('scheduled', time()) |
|
236 | ) |
||
237 | 1 | ->groupBy('configuration') |
|
238 | 1 | ->execute(); |
|
239 | |||
240 | 1 | return $statement->fetchAll(); |
|
241 | } |
||
242 | |||
243 | /** |
||
244 | * Get set id with unprocessed entries |
||
245 | * |
||
246 | * @return array array of set ids |
||
247 | */ |
||
248 | 1 | public function getSetIdWithUnprocessedEntries(): array |
|
249 | { |
||
250 | 1 | $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName); |
|
251 | $statement = $queryBuilder |
||
252 | 1 | ->select('set_id') |
|
253 | 1 | ->from($this->tableName) |
|
254 | 1 | ->where( |
|
255 | 1 | $queryBuilder->expr()->lt('scheduled', time()), |
|
256 | 1 | $queryBuilder->expr()->eq('exec_time', 0) |
|
257 | ) |
||
258 | 1 | ->addGroupBy('set_id') |
|
259 | 1 | ->execute(); |
|
260 | |||
261 | 1 | $setIds = []; |
|
262 | 1 | while ($row = $statement->fetch()) { |
|
263 | 1 | $setIds[] = intval($row['set_id']); |
|
264 | } |
||
265 | |||
266 | 1 | return $setIds; |
|
267 | } |
||
268 | |||
269 | /** |
||
270 | * Get total queue entries by configuration |
||
271 | * |
||
272 | * @return array totals by configuration (keys) |
||
273 | */ |
||
274 | 1 | public function getTotalQueueEntriesByConfiguration(array $setIds): array |
|
275 | { |
||
276 | 1 | $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName); |
|
277 | 1 | $totals = []; |
|
278 | 1 | if (! empty($setIds)) { |
|
279 | $statement = $queryBuilder |
||
280 | 1 | ->from($this->tableName) |
|
281 | 1 | ->selectLiteral('count(*) as c') |
|
282 | 1 | ->addSelect('configuration') |
|
283 | 1 | ->where( |
|
284 | 1 | $queryBuilder->expr()->in('set_id', implode(',', $setIds)), |
|
285 | 1 | $queryBuilder->expr()->lt('scheduled', time()) |
|
286 | ) |
||
287 | 1 | ->groupBy('configuration') |
|
288 | 1 | ->execute(); |
|
289 | |||
290 | 1 | while ($row = $statement->fetch()) { |
|
291 | 1 | $totals[$row['configuration']] = $row['c']; |
|
292 | } |
||
293 | } |
||
294 | |||
295 | 1 | return $totals; |
|
296 | } |
||
297 | |||
298 | /** |
||
299 | * Get the timestamps of the last processed entries |
||
300 | * |
||
301 | * @param int $limit |
||
302 | */ |
||
303 | 1 | public function getLastProcessedEntriesTimestamps($limit = 100): array |
|
304 | { |
||
305 | 1 | $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName); |
|
306 | $statement = $queryBuilder |
||
307 | 1 | ->select('exec_time') |
|
308 | 1 | ->from($this->tableName) |
|
309 | 1 | ->addOrderBy('exec_time', 'desc') |
|
310 | 1 | ->setMaxResults($limit) |
|
311 | 1 | ->execute(); |
|
312 | |||
313 | 1 | $rows = []; |
|
314 | 1 | while ($row = $statement->fetch()) { |
|
315 | 1 | $rows[] = $row['exec_time']; |
|
316 | } |
||
317 | |||
318 | 1 | return $rows; |
|
319 | } |
||
320 | |||
321 | /** |
||
322 | * Get the last processed entries |
||
323 | * |
||
324 | * @param int $limit |
||
325 | */ |
||
326 | 1 | public function getLastProcessedEntries($limit = 100): array |
|
327 | { |
||
328 | 1 | $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName); |
|
329 | $statement = $queryBuilder |
||
330 | 1 | ->from($this->tableName) |
|
331 | 1 | ->select('*') |
|
332 | 1 | ->orderBy('exec_time', 'desc') |
|
333 | 1 | ->setMaxResults($limit) |
|
334 | 1 | ->execute(); |
|
335 | |||
336 | 1 | $rows = []; |
|
337 | 1 | while (($row = $statement->fetch()) !== false) { |
|
338 | 1 | $rows[] = $row; |
|
339 | } |
||
340 | |||
341 | 1 | return $rows; |
|
342 | } |
||
343 | |||
344 | /** |
||
345 | * Get performance statistics data |
||
346 | * |
||
347 | * @param int $start timestamp |
||
348 | * @param int $end timestamp |
||
349 | * |
||
350 | * @return array performance data |
||
351 | */ |
||
352 | 1 | public function getPerformanceData($start, $end): array |
|
353 | { |
||
354 | 1 | $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName); |
|
355 | $statement = $queryBuilder |
||
356 | 1 | ->from($this->tableName) |
|
357 | 1 | ->selectLiteral('min(exec_time) as start', 'max(exec_time) as end', 'count(*) as urlcount') |
|
358 | 1 | ->addSelect('process_id_completed') |
|
359 | 1 | ->where( |
|
360 | 1 | $queryBuilder->expr()->neq('exec_time', 0), |
|
361 | 1 | $queryBuilder->expr()->gte('exec_time', $queryBuilder->createNamedParameter($start, \PDO::PARAM_INT)), |
|
362 | 1 | $queryBuilder->expr()->lte('exec_time', $queryBuilder->createNamedParameter($end, \PDO::PARAM_INT)) |
|
363 | ) |
||
364 | 1 | ->groupBy('process_id_completed') |
|
365 | 1 | ->execute(); |
|
366 | |||
367 | 1 | $rows = []; |
|
368 | 1 | while ($row = $statement->fetch()) { |
|
369 | 1 | $rows[$row['process_id_completed']] = $row; |
|
370 | } |
||
371 | |||
372 | 1 | return $rows; |
|
373 | } |
||
374 | |||
375 | /** |
||
376 | * Determines if a page is queued |
||
377 | */ |
||
378 | 5 | public function isPageInQueue(int $uid, bool $unprocessed_only = true, bool $timed_only = false, int $timestamp = 0): bool |
|
379 | { |
||
380 | 5 | $isPageInQueue = false; |
|
381 | |||
382 | 5 | $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName); |
|
383 | $statement = $queryBuilder |
||
384 | 5 | ->from($this->tableName) |
|
385 | 5 | ->count('*') |
|
386 | 5 | ->where( |
|
387 | 5 | $queryBuilder->expr()->eq('page_id', $queryBuilder->createNamedParameter($uid, \PDO::PARAM_INT)) |
|
388 | ); |
||
389 | |||
390 | 5 | if ($unprocessed_only !== false) { |
|
391 | 2 | $statement->andWhere( |
|
392 | 2 | $queryBuilder->expr()->eq('exec_time', 0) |
|
393 | ); |
||
394 | } |
||
395 | |||
396 | 5 | if ($timed_only !== false) { |
|
397 | 1 | $statement->andWhere( |
|
398 | 1 | $queryBuilder->expr()->neq('scheduled', 0) |
|
399 | ); |
||
400 | } |
||
401 | |||
402 | 5 | if ($timestamp) { |
|
403 | 1 | $statement->andWhere( |
|
404 | 1 | $queryBuilder->expr()->eq('scheduled', $queryBuilder->createNamedParameter($timestamp, \PDO::PARAM_INT)) |
|
405 | ); |
||
406 | } |
||
407 | |||
408 | // TODO: Currently it's not working if page doesn't exists. See tests |
||
409 | $count = $statement |
||
410 | 5 | ->execute() |
|
411 | 5 | ->fetchColumn(0); |
|
412 | |||
413 | 5 | if ($count !== false && $count > 0) { |
|
414 | 4 | $isPageInQueue = true; |
|
415 | } |
||
416 | |||
417 | 5 | return $isPageInQueue; |
|
418 | } |
||
419 | |||
420 | /** |
||
421 | * Method to check if a page is in the queue which is timed for a |
||
422 | * date when it should be crawled |
||
423 | */ |
||
424 | 1 | public function isPageInQueueTimed(int $uid, bool $show_unprocessed = true): bool |
|
425 | { |
||
426 | 1 | return $this->isPageInQueue($uid, $show_unprocessed); |
|
427 | } |
||
428 | |||
429 | 1 | public function getAvailableSets(): array |
|
430 | { |
||
431 | 1 | $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName); |
|
432 | $statement = $queryBuilder |
||
433 | 1 | ->selectLiteral('count(*) as count_value') |
|
434 | 1 | ->addSelect('set_id', 'scheduled') |
|
435 | 1 | ->from($this->tableName) |
|
436 | 1 | ->orderBy('scheduled', 'desc') |
|
437 | 1 | ->groupBy('set_id', 'scheduled') |
|
438 | 1 | ->execute(); |
|
439 | |||
440 | 1 | $rows = []; |
|
441 | 1 | while ($row = $statement->fetch()) { |
|
442 | 1 | $rows[] = $row; |
|
443 | } |
||
444 | |||
445 | 1 | return $rows; |
|
446 | } |
||
447 | |||
448 | 1 | public function findByQueueId(string $queueId): ?array |
|
449 | { |
||
450 | 1 | $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName); |
|
451 | $queueRec = $queryBuilder |
||
452 | 1 | ->select('*') |
|
453 | 1 | ->from($this->tableName) |
|
454 | 1 | ->where( |
|
455 | 1 | $queryBuilder->expr()->eq('qid', $queryBuilder->createNamedParameter($queueId)) |
|
456 | ) |
||
457 | 1 | ->execute() |
|
458 | 1 | ->fetch(); |
|
459 | 1 | return is_array($queueRec) ? $queueRec : null; |
|
460 | } |
||
461 | |||
462 | 3 | public function cleanupQueue(): void |
|
463 | { |
||
464 | 3 | $extensionSettings = GeneralUtility::makeInstance(ExtensionConfigurationProvider::class)->getExtensionConfiguration(); |
|
465 | 3 | $purgeDays = (int) $extensionSettings['purgeQueueDays']; |
|
466 | |||
467 | 3 | if ($purgeDays > 0) { |
|
468 | 3 | $purgeDate = time() - 24 * 60 * 60 * $purgeDays; |
|
469 | |||
470 | 3 | $queryBuilderDelete = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName); |
|
471 | $del = $queryBuilderDelete |
||
472 | 3 | ->delete($this->tableName) |
|
473 | 3 | ->where( |
|
474 | 3 | 'exec_time != 0 AND exec_time < ' . $purgeDate |
|
475 | 3 | )->execute(); |
|
476 | |||
477 | 3 | if ($del === false) { |
|
478 | $this->logger->info( |
||
0 ignored issues
–
show
|
|||
479 | 'Records could not be deleted.' |
||
480 | ); |
||
481 | } |
||
482 | } |
||
483 | 3 | } |
|
484 | |||
485 | /** |
||
486 | * Cleans up entries that stayed for too long in the queue. These are default: |
||
487 | * - processed entries that are over 1.5 days in age |
||
488 | * - scheduled entries that are over 7 days old |
||
489 | */ |
||
490 | 1 | public function cleanUpOldQueueEntries(): void |
|
491 | { |
||
492 | 1 | $extensionSettings = GeneralUtility::makeInstance(ExtensionConfigurationProvider::class)->getExtensionConfiguration(); |
|
493 | // 24*60*60 Seconds in 24 hours |
||
494 | 1 | $processedAgeInSeconds = $extensionSettings['cleanUpProcessedAge'] * 86400; |
|
495 | 1 | $scheduledAgeInSeconds = $extensionSettings['cleanUpScheduledAge'] * 86400; |
|
496 | |||
497 | 1 | $now = time(); |
|
498 | 1 | $condition = '(exec_time<>0 AND exec_time<' . ($now - $processedAgeInSeconds) . ') OR scheduled<=' . ($now - $scheduledAgeInSeconds); |
|
499 | |||
500 | 1 | $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName); |
|
501 | $del = $queryBuilder |
||
502 | 1 | ->delete($this->tableName) |
|
503 | 1 | ->where( |
|
504 | 1 | $condition |
|
505 | 1 | )->execute(); |
|
506 | |||
507 | 1 | if ($del === false) { |
|
508 | $this->logger->info( |
||
509 | 'Records could not be deleted.' |
||
510 | ); |
||
511 | } |
||
512 | 1 | } |
|
513 | |||
514 | 4 | public function fetchRecordsToBeCrawled(int $countInARun): array |
|
515 | { |
||
516 | 4 | $queryBuilderSelect = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName); |
|
517 | return $queryBuilderSelect |
||
518 | 4 | ->select('qid', 'scheduled', 'page_id', 'sitemap_priority') |
|
519 | 4 | ->from($this->tableName) |
|
520 | 4 | ->leftJoin( |
|
521 | 4 | $this->tableName, |
|
522 | 4 | 'pages', |
|
523 | 4 | 'p', |
|
524 | 4 | $queryBuilderSelect->expr()->eq('p.uid', $queryBuilderSelect->quoteIdentifier($this->tableName . '.page_id')) |
|
525 | ) |
||
526 | 4 | ->where( |
|
527 | 4 | $queryBuilderSelect->expr()->eq('exec_time', 0), |
|
528 | 4 | $queryBuilderSelect->expr()->eq('process_scheduled', 0), |
|
529 | 4 | $queryBuilderSelect->expr()->lte('scheduled', time()) |
|
530 | ) |
||
531 | 4 | ->orderBy('sitemap_priority', 'DESC') |
|
532 | 4 | ->addOrderBy('scheduled') |
|
533 | 4 | ->addOrderBy('qid') |
|
534 | 4 | ->setMaxResults($countInARun) |
|
535 | 4 | ->execute() |
|
536 | 4 | ->fetchAll(); |
|
537 | } |
||
538 | |||
539 | 3 | public function updateProcessIdAndSchedulerForQueueIds(array $quidList, string $processId) |
|
540 | { |
||
541 | 3 | $queryBuilderUpdate = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName); |
|
542 | return $queryBuilderUpdate |
||
543 | 3 | ->update($this->tableName) |
|
544 | 3 | ->where( |
|
545 | 3 | $queryBuilderUpdate->expr()->in('qid', $quidList) |
|
546 | ) |
||
547 | 3 | ->set('process_scheduled', time()) |
|
548 | 3 | ->set('process_id', $processId) |
|
549 | 3 | ->execute(); |
|
550 | } |
||
551 | |||
552 | 3 | public function unsetProcessScheduledAndProcessIdForQueueEntries(array $processIds): void |
|
553 | { |
||
554 | 3 | $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName); |
|
555 | $queryBuilder |
||
556 | 3 | ->update($this->tableName) |
|
557 | 3 | ->where( |
|
558 | 3 | $queryBuilder->expr()->eq('exec_time', 0), |
|
559 | 3 | $queryBuilder->expr()->in('process_id', $queryBuilder->createNamedParameter($processIds, Connection::PARAM_STR_ARRAY)) |
|
560 | ) |
||
561 | 3 | ->set('process_scheduled', 0) |
|
562 | 3 | ->set('process_id', '') |
|
563 | 3 | ->execute(); |
|
564 | 3 | } |
|
565 | |||
566 | /** |
||
567 | * This method is used to count if there are ANY unprocessed queue entries |
||
568 | * of a given page_id and the configuration which matches a given hash. |
||
569 | * If there if none, we can skip an inner detail check |
||
570 | */ |
||
571 | 7 | public function noUnprocessedQueueEntriesForPageWithConfigurationHashExist(int $uid, string $configurationHash): bool |
|
572 | { |
||
573 | 7 | $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName); |
|
574 | 7 | $noUnprocessedQueueEntriesFound = true; |
|
575 | |||
576 | $result = $queryBuilder |
||
577 | 7 | ->count('*') |
|
578 | 7 | ->from($this->tableName) |
|
579 | 7 | ->where( |
|
580 | 7 | $queryBuilder->expr()->eq('page_id', $uid), |
|
581 | 7 | $queryBuilder->expr()->eq('configuration_hash', $queryBuilder->createNamedParameter($configurationHash)), |
|
582 | 7 | $queryBuilder->expr()->eq('exec_time', 0) |
|
583 | ) |
||
584 | 7 | ->execute() |
|
585 | 7 | ->fetchColumn(); |
|
586 | |||
587 | 7 | if ($result) { |
|
588 | 7 | $noUnprocessedQueueEntriesFound = false; |
|
589 | } |
||
590 | |||
591 | 7 | return $noUnprocessedQueueEntriesFound; |
|
592 | } |
||
593 | |||
594 | /** |
||
595 | * Removes queue entries |
||
596 | */ |
||
597 | 8 | public function flushQueue(QueueFilter $queueFilter): void |
|
598 | { |
||
599 | 8 | $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName); |
|
600 | |||
601 | 8 | switch ($queueFilter) { |
|
602 | 8 | case 'all': |
|
603 | // No where claus needed delete everything |
||
604 | 4 | break; |
|
605 | 4 | case 'pending': |
|
606 | 2 | $queryBuilder->andWhere($queryBuilder->expr()->eq('exec_time', 0)); |
|
607 | 2 | break; |
|
608 | 2 | case 'finished': |
|
609 | default: |
||
610 | 2 | $queryBuilder->andWhere($queryBuilder->expr()->gt('exec_time', 0)); |
|
611 | 2 | break; |
|
612 | } |
||
613 | |||
614 | $queryBuilder |
||
615 | 8 | ->delete($this->tableName) |
|
616 | 8 | ->execute(); |
|
617 | 8 | } |
|
618 | |||
619 | /** |
||
620 | * @param string $processId |
||
621 | * |
||
622 | * @return bool|string |
||
623 | * @deprecated Using QueueRepository->countAllByProcessId() is deprecated since 9.1.5 and will be removed in v11.x, please use QueueRepository->findByProcessId()->count() instead |
||
624 | */ |
||
625 | public function countAllByProcessId($processId) |
||
626 | { |
||
627 | trigger_error( |
||
628 | 'Using QueueRepository->countAllByProcessId() is deprecated since 9.1.5 and will be removed in v11.x, please use QueueRepository->findByProcessId()->count() instead', |
||
629 | E_USER_DEPRECATED |
||
630 | ); |
||
631 | $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName); |
||
632 | |||
633 | return $queryBuilder |
||
634 | ->count('*') |
||
635 | ->from($this->tableName) |
||
636 | ->where( |
||
637 | $queryBuilder->expr()->eq('process_id', $queryBuilder->createNamedParameter($processId, \PDO::PARAM_STR)) |
||
638 | ) |
||
639 | ->execute() |
||
640 | ->fetchColumn(0); |
||
641 | } |
||
642 | |||
643 | 12 | public function getDuplicateQueueItemsIfExists(bool $enableTimeslot, int $timestamp, int $currentTime, int $pageId, string $parametersHash): array |
|
644 | { |
||
645 | 12 | $rows = []; |
|
646 | |||
647 | 12 | $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName); |
|
648 | $queryBuilder |
||
649 | 12 | ->select('qid') |
|
650 | 12 | ->from('tx_crawler_queue'); |
|
651 | //if this entry is scheduled with "now" |
||
652 | 12 | if ($timestamp <= $currentTime) { |
|
653 | 4 | if ($enableTimeslot) { |
|
654 | 3 | $timeBegin = $currentTime - 100; |
|
655 | 3 | $timeEnd = $currentTime + 100; |
|
656 | $queryBuilder |
||
657 | 3 | ->where( |
|
658 | 3 | 'scheduled BETWEEN ' . $timeBegin . ' AND ' . $timeEnd . '' |
|
659 | ) |
||
660 | 3 | ->orWhere( |
|
661 | 3 | $queryBuilder->expr()->lte('scheduled', $currentTime) |
|
662 | ); |
||
663 | } else { |
||
664 | $queryBuilder |
||
665 | 1 | ->where( |
|
666 | 4 | $queryBuilder->expr()->lte('scheduled', $currentTime) |
|
667 | ); |
||
668 | } |
||
669 | 8 | } elseif ($timestamp > $currentTime) { |
|
670 | //entry with a timestamp in the future need to have the same schedule time |
||
671 | $queryBuilder |
||
672 | 8 | ->where( |
|
673 | 8 | $queryBuilder->expr()->eq('scheduled', $timestamp) |
|
674 | ); |
||
675 | } |
||
676 | |||
677 | $queryBuilder |
||
678 | 12 | ->andWhere('NOT exec_time') |
|
679 | 12 | ->andWhere('NOT process_id') |
|
680 | 12 | ->andWhere($queryBuilder->expr()->eq('page_id', $queryBuilder->createNamedParameter($pageId, \PDO::PARAM_INT))) |
|
681 | 12 | ->andWhere($queryBuilder->expr()->eq('parameters_hash', $queryBuilder->createNamedParameter($parametersHash, \PDO::PARAM_STR))); |
|
682 | |||
683 | 12 | $statement = $queryBuilder->execute(); |
|
684 | |||
685 | 12 | while ($row = $statement->fetch()) { |
|
686 | 9 | $rows[] = $row['qid']; |
|
687 | } |
||
688 | |||
689 | 12 | return $rows; |
|
690 | } |
||
691 | |||
692 | 4 | public function getQueueEntriesForPageId(int $id, int $itemsPerPage, QueueFilter $queueFilter): array |
|
693 | { |
||
694 | 4 | $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName); |
|
695 | $queryBuilder |
||
696 | 4 | ->select('*') |
|
697 | 4 | ->from($this->tableName) |
|
698 | 4 | ->where( |
|
699 | 4 | $queryBuilder->expr()->eq('page_id', $queryBuilder->createNamedParameter($id, PDO::PARAM_INT)) |
|
700 | ) |
||
701 | 4 | ->orderBy('scheduled', 'DESC'); |
|
702 | |||
703 | 4 | $expressionBuilder = GeneralUtility::makeInstance(ConnectionPool::class) |
|
704 | 4 | ->getConnectionForTable($this->tableName) |
|
705 | 4 | ->getExpressionBuilder(); |
|
706 | 4 | $query = $expressionBuilder->andX(); |
|
707 | // PHPStorm adds the highlight that the $addWhere is immediately overwritten, |
||
708 | // but the $query = $expressionBuilder->andX() ensures that the $addWhere is written correctly with AND |
||
709 | // between the statements, it's not a mistake in the code. |
||
710 | 4 | switch ($queueFilter) { |
|
711 | 4 | case 'pending': |
|
712 | 1 | $queryBuilder->andWhere($queryBuilder->expr()->eq('exec_time', 0)); |
|
713 | 1 | break; |
|
714 | 3 | case 'finished': |
|
715 | 1 | $queryBuilder->andWhere($queryBuilder->expr()->gt('exec_time', 0)); |
|
716 | 1 | break; |
|
717 | } |
||
718 | |||
719 | 4 | if ($itemsPerPage > 0) { |
|
720 | $queryBuilder |
||
721 | 4 | ->setMaxResults($itemsPerPage); |
|
722 | } |
||
723 | |||
724 | 4 | return $queryBuilder->execute()->fetchAll(); |
|
725 | } |
||
726 | |||
727 | /** |
||
728 | * This internal helper method is used to create an instance of an entry object |
||
729 | * |
||
730 | * @param Process $process |
||
731 | * @param string $orderByField first matching item will be returned as object |
||
732 | * @param string $orderBySorting sorting direction |
||
733 | */ |
||
734 | 5 | protected function getFirstOrLastObjectByProcess($process, $orderByField, $orderBySorting = 'ASC'): array |
|
735 | { |
||
736 | 5 | $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName); |
|
737 | $first = $queryBuilder |
||
738 | 5 | ->select('*') |
|
739 | 5 | ->from($this->tableName) |
|
740 | 5 | ->where( |
|
741 | 5 | $queryBuilder->expr()->eq('process_id_completed', $queryBuilder->createNamedParameter($process->getProcessId())), |
|
742 | 5 | $queryBuilder->expr()->gt('exec_time', 0) |
|
743 | ) |
||
744 | 5 | ->setMaxResults(1) |
|
745 | 5 | ->addOrderBy($orderByField, $orderBySorting) |
|
746 | 5 | ->execute()->fetch(0); |
|
747 | |||
748 | 5 | return $first ?: []; |
|
749 | } |
||
750 | } |
||
751 |
This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.
This is most likely a typographical error or the method has been renamed.