Completed
Push — master ( b7ef5d...9350a6 )
by
unknown
28:42 queued 10s
created
lib/private/BackgroundJob/JobList.php 1 patch
Indentation   +410 added lines, -410 removed lines patch added patch discarded remove patch
@@ -26,414 +26,414 @@
 block discarded – undo
26 26
 use function strlen;
27 27
 
28 28
 class JobList implements IJobList {
29
-	/** @var array<string, string> */
30
-	protected array $alreadyVisitedParallelBlocked = [];
31
-
32
-	public function __construct(
33
-		protected readonly IDBConnection $connection,
34
-		protected readonly IConfig $config,
35
-		protected readonly ITimeFactory $timeFactory,
36
-		protected readonly LoggerInterface $logger,
37
-		protected readonly IGenerator $generator,
38
-	) {
39
-	}
40
-
41
-	#[Override]
42
-	public function add(IJob|string $job, mixed $argument = null, ?int $firstCheck = null): void {
43
-		if ($firstCheck === null) {
44
-			$firstCheck = $this->timeFactory->getTime();
45
-		}
46
-
47
-		$class = ($job instanceof IJob) ? get_class($job) : $job;
48
-
49
-		$argumentJson = json_encode($argument);
50
-		if (strlen($argumentJson) > 4000) {
51
-			throw new \InvalidArgumentException('Background job arguments can\'t exceed 4000 characters (json encoded)');
52
-		}
53
-
54
-		$query = $this->connection->getQueryBuilder();
55
-		if (!$this->has($job, $argument)) {
56
-			$query->insert('jobs')
57
-				->values([
58
-					'id' => $query->createNamedParameter($this->generator->nextId()),
59
-					'class' => $query->createNamedParameter($class),
60
-					'argument' => $query->createNamedParameter($argumentJson),
61
-					'argument_hash' => $query->createNamedParameter(hash('sha256', $argumentJson)),
62
-					'last_run' => $query->createNamedParameter(0, IQueryBuilder::PARAM_INT),
63
-					'last_checked' => $query->createNamedParameter($firstCheck, IQueryBuilder::PARAM_INT),
64
-				]);
65
-		} else {
66
-			$query->update('jobs')
67
-				->set('reserved_at', $query->expr()->literal(0, IQueryBuilder::PARAM_INT))
68
-				->set('last_checked', $query->createNamedParameter($firstCheck, IQueryBuilder::PARAM_INT))
69
-				->set('last_run', $query->createNamedParameter(0, IQueryBuilder::PARAM_INT))
70
-				->where($query->expr()->eq('class', $query->createNamedParameter($class)))
71
-				->andWhere($query->expr()->eq('argument_hash', $query->createNamedParameter(hash('sha256', $argumentJson))));
72
-		}
73
-		$query->executeStatement();
74
-	}
75
-
76
-	public function scheduleAfter(string $job, int $runAfter, mixed $argument = null): void {
77
-		$this->add($job, $argument, $runAfter);
78
-	}
79
-
80
-	#[Override]
81
-	public function remove(IJob|string $job, mixed $argument = null): void {
82
-		$class = ($job instanceof IJob) ? get_class($job) : $job;
83
-
84
-		$query = $this->connection->getQueryBuilder();
85
-		$query->delete('jobs')
86
-			->where($query->expr()->eq('class', $query->createNamedParameter($class)));
87
-		if (!is_null($argument)) {
88
-			$argumentJson = json_encode($argument);
89
-			$query->andWhere($query->expr()->eq('argument_hash', $query->createNamedParameter(hash('sha256', $argumentJson))));
90
-		}
91
-
92
-		// Add galera safe delete chunking if using mysql
93
-		// Stops us hitting wsrep_max_ws_rows when large row counts are deleted
94
-		if ($this->connection->getDatabaseProvider() === IDBConnection::PLATFORM_MYSQL) {
95
-			// Then use chunked delete
96
-			$max = IQueryBuilder::MAX_ROW_DELETION;
97
-
98
-			$query->setMaxResults($max);
99
-
100
-			do {
101
-				$deleted = $query->executeStatement();
102
-			} while ($deleted === $max);
103
-		} else {
104
-			// Dont use chunked delete - let the DB handle the large row count natively
105
-			$query->executeStatement();
106
-		}
107
-	}
108
-
109
-	#[Override]
110
-	public function removeById(string $id): void {
111
-		$query = $this->connection->getQueryBuilder();
112
-		$query->delete('jobs')
113
-			->where($query->expr()->eq('id', $query->createNamedParameter($id, IQueryBuilder::PARAM_INT)));
114
-		$query->executeStatement();
115
-	}
116
-
117
-	#[Override]
118
-	public function has(IJob|string $job, mixed $argument): bool {
119
-		$class = ($job instanceof IJob) ? get_class($job) : $job;
120
-		$argument = json_encode($argument);
121
-
122
-		$query = $this->connection->getQueryBuilder();
123
-		$query->select('id')
124
-			->from('jobs')
125
-			->where($query->expr()->eq('class', $query->createNamedParameter($class)))
126
-			->andWhere($query->expr()->eq('argument_hash', $query->createNamedParameter(hash('sha256', $argument))))
127
-			->setMaxResults(1);
128
-
129
-		$result = $query->executeQuery();
130
-		$row = $result->fetch();
131
-		$result->closeCursor();
132
-
133
-		return (bool)$row;
134
-	}
135
-
136
-	#[Override]
137
-	public function getJobs(IJob|string|null $job, ?int $limit, int $offset): array {
138
-		$iterable = $this->getJobsIterator($job, $limit, $offset);
139
-		return (is_array($iterable))
140
-			? $iterable
141
-			: iterator_to_array($iterable);
142
-	}
143
-
144
-	#[Override]
145
-	public function getJobsIterator(IJob|string|null $job, ?int $limit, int $offset): iterable {
146
-		$query = $this->connection->getQueryBuilder();
147
-		$query->select('*')
148
-			->from('jobs')
149
-			->setMaxResults($limit)
150
-			->setFirstResult($offset);
151
-
152
-		if ($job !== null) {
153
-			$class = ($job instanceof IJob) ? get_class($job) : $job;
154
-			$query->where($query->expr()->eq('class', $query->createNamedParameter($class)));
155
-		}
156
-
157
-		$result = $query->executeQuery();
158
-
159
-		while ($row = $result->fetch()) {
160
-			$job = $this->buildJob($row);
161
-			if ($job) {
162
-				yield $job;
163
-			}
164
-		}
165
-		$result->closeCursor();
166
-	}
167
-
168
-	#[Override]
169
-	public function getNext(bool $onlyTimeSensitive = false, ?array $jobClasses = null): ?IJob {
170
-		$query = $this->connection->getQueryBuilder();
171
-		$query->select('*')
172
-			->from('jobs')
173
-			->where($query->expr()->lte('reserved_at', $query->createNamedParameter($this->timeFactory->getTime() - 12 * 3600, IQueryBuilder::PARAM_INT)))
174
-			->andWhere($query->expr()->lte('last_checked', $query->createNamedParameter($this->timeFactory->getTime(), IQueryBuilder::PARAM_INT)))
175
-			->orderBy('last_checked', 'ASC')
176
-			->setMaxResults(1);
177
-
178
-		if ($onlyTimeSensitive) {
179
-			$query->andWhere($query->expr()->eq('time_sensitive', $query->createNamedParameter(IJob::TIME_SENSITIVE, IQueryBuilder::PARAM_INT)));
180
-		}
181
-
182
-		if (!empty($jobClasses)) {
183
-			$orClasses = [];
184
-			foreach ($jobClasses as $jobClass) {
185
-				$orClasses[] = $query->expr()->eq('class', $query->createNamedParameter($jobClass, IQueryBuilder::PARAM_STR));
186
-			}
187
-			$query->andWhere($query->expr()->orX(...$orClasses));
188
-		}
189
-
190
-		$result = $query->executeQuery();
191
-		$row = $result->fetch();
192
-		$result->closeCursor();
193
-
194
-		if ($row) {
195
-			$job = $this->buildJob($row);
196
-
197
-			if ($job instanceof IParallelAwareJob && !$job->getAllowParallelRuns() && $this->hasReservedJob(get_class($job))) {
198
-				if (!isset($this->alreadyVisitedParallelBlocked[get_class($job)])) {
199
-					$this->alreadyVisitedParallelBlocked[get_class($job)] = $job->getId();
200
-				} elseif ($this->alreadyVisitedParallelBlocked[get_class($job)] === $job->getId()) {
201
-					$this->logger->info('Skipped through all jobs and revisited a IParallelAwareJob blocked job again, giving up.', ['app' => 'cron']);
202
-					return null;
203
-				}
204
-				$this->logger->info('Skipping ' . get_class($job) . ' job with ID ' . $job->getId() . ' because another job with the same class is already running', ['app' => 'cron']);
205
-
206
-				$update = $this->connection->getQueryBuilder();
207
-				$update->update('jobs')
208
-					->set('last_checked', $update->createNamedParameter($this->timeFactory->getTime() + 1))
209
-					->where($update->expr()->eq('id', $update->createParameter('jobid')));
210
-				$update->setParameter('jobid', $row['id']);
211
-				$update->executeStatement();
212
-
213
-				return $this->getNext($onlyTimeSensitive, $jobClasses);
214
-			}
215
-
216
-			if ($job !== null && isset($this->alreadyVisitedParallelBlocked[get_class($job)])) {
217
-				unset($this->alreadyVisitedParallelBlocked[get_class($job)]);
218
-			}
219
-
220
-			if ($job instanceof \OCP\BackgroundJob\TimedJob) {
221
-				$now = $this->timeFactory->getTime();
222
-				$nextPossibleRun = $job->getLastRun() + $job->getInterval();
223
-				if ($now < $nextPossibleRun) {
224
-					// This job is not ready for execution yet. Set timestamps to the future to avoid
225
-					// re-checking with every cron run.
226
-					// To avoid bugs that lead to jobs never executing again, the future timestamp is
227
-					// capped at two days.
228
-					$nextCheck = min($nextPossibleRun, $now + 48 * 3600);
229
-					$updateTimedJob = $this->connection->getQueryBuilder();
230
-					$updateTimedJob->update('jobs')
231
-						->set('last_checked', $updateTimedJob->createNamedParameter($nextCheck, IQueryBuilder::PARAM_INT))
232
-						->where($updateTimedJob->expr()->eq('id', $updateTimedJob->createParameter('jobid')));
233
-					$updateTimedJob->setParameter('jobid', $row['id']);
234
-					$updateTimedJob->executeStatement();
235
-
236
-					return $this->getNext($onlyTimeSensitive, $jobClasses);
237
-				}
238
-			}
239
-
240
-			$update = $this->connection->getQueryBuilder();
241
-			$update->update('jobs')
242
-				->set('reserved_at', $update->createNamedParameter($this->timeFactory->getTime()))
243
-				->set('last_checked', $update->createNamedParameter($this->timeFactory->getTime()))
244
-				->where($update->expr()->eq('id', $update->createParameter('jobid')))
245
-				->andWhere($update->expr()->eq('reserved_at', $update->createParameter('reserved_at')))
246
-				->andWhere($update->expr()->eq('last_checked', $update->createParameter('last_checked')));
247
-			$update->setParameter('jobid', $row['id']);
248
-			$update->setParameter('reserved_at', $row['reserved_at']);
249
-			$update->setParameter('last_checked', $row['last_checked']);
250
-			$count = $update->executeStatement();
251
-
252
-			if ($count === 0) {
253
-				// Background job already executed elsewhere, try again.
254
-				return $this->getNext($onlyTimeSensitive, $jobClasses);
255
-			}
256
-
257
-			if ($job === null) {
258
-				// set the last_checked to 12h in the future to not check failing jobs all over again
259
-				$reset = $this->connection->getQueryBuilder();
260
-				$reset->update('jobs')
261
-					->set('reserved_at', $reset->expr()->literal(0, IQueryBuilder::PARAM_INT))
262
-					->set('last_checked', $reset->createNamedParameter($this->timeFactory->getTime() + 12 * 3600, IQueryBuilder::PARAM_INT))
263
-					->where($reset->expr()->eq('id', $reset->createNamedParameter($row['id'], IQueryBuilder::PARAM_INT)));
264
-				$reset->executeStatement();
265
-
266
-				// Background job from disabled app, try again.
267
-				return $this->getNext($onlyTimeSensitive, $jobClasses);
268
-			}
269
-
270
-			return $job;
271
-		} else {
272
-			return null;
273
-		}
274
-	}
275
-
276
-	#[Override]
277
-	public function getById(string $id): ?IJob {
278
-		$row = $this->getDetailsById($id);
279
-
280
-		if ($row) {
281
-			return $this->buildJob($row);
282
-		}
283
-
284
-		return null;
285
-	}
286
-
287
-	#[Override]
288
-	public function getDetailsById(string $id): ?array {
289
-		$query = $this->connection->getQueryBuilder();
290
-		$query->select('*')
291
-			->from('jobs')
292
-			->where($query->expr()->eq('id', $query->createNamedParameter($id, IQueryBuilder::PARAM_INT)));
293
-		$result = $query->executeQuery();
294
-		$row = $result->fetch();
295
-		$result->closeCursor();
296
-
297
-		if ($row) {
298
-			return $row;
299
-		}
300
-
301
-		return null;
302
-	}
303
-
304
-	/**
305
-	 * get the job object from a row in the db
306
-	 *
307
-	 * @param array{class:class-string<IJob>, id:mixed, last_run:mixed, argument:string} $row
308
-	 * @return ?IJob the next job to run. Beware that this object may be a singleton and may be modified by the next call to buildJob.
309
-	 */
310
-	private function buildJob(array $row): ?IJob {
311
-		try {
312
-			try {
313
-				// Try to load the job as a service
314
-				/** @var IJob $job */
315
-				$job = \OCP\Server::get($row['class']);
316
-			} catch (ContainerExceptionInterface $e) {
317
-				if (class_exists($row['class'])) {
318
-					$class = $row['class'];
319
-					$job = new $class();
320
-				} else {
321
-					$this->logger->warning('failed to create instance of background job: ' . $row['class'], ['app' => 'cron', 'exception' => $e]);
322
-					// Remove job from disabled app or old version of an app
323
-					$this->removeById($row['id']);
324
-					return null;
325
-				}
326
-			}
327
-
328
-			if (!($job instanceof IJob)) {
329
-				// This most likely means an invalid job was enqueued. We can ignore it.
330
-				return null;
331
-			}
332
-			$job->setId($row['id']);
333
-			$job->setLastRun((int)$row['last_run']);
334
-			$job->setArgument(json_decode($row['argument'], true));
335
-			return $job;
336
-		} catch (AutoloadNotAllowedException $e) {
337
-			// job is from a disabled app, ignore
338
-			return null;
339
-		}
340
-	}
341
-
342
-	/**
343
-	 * set the job that was last ran
344
-	 */
345
-	public function setLastJob(IJob $job): void {
346
-		$this->unlockJob($job);
347
-		$this->config->setAppValue('backgroundjob', 'lastjob', $job->getId());
348
-	}
349
-
350
-	#[Override]
351
-	public function unlockJob(IJob $job): void {
352
-		$query = $this->connection->getQueryBuilder();
353
-		$query->update('jobs')
354
-			->set('reserved_at', $query->expr()->literal(0, IQueryBuilder::PARAM_INT))
355
-			->where($query->expr()->eq('id', $query->createNamedParameter($job->getId(), IQueryBuilder::PARAM_INT)));
356
-		$query->executeStatement();
357
-	}
358
-
359
-	#[Override]
360
-	public function setLastRun(IJob $job): void {
361
-		$query = $this->connection->getQueryBuilder();
362
-		$query->update('jobs')
363
-			->set('last_run', $query->createNamedParameter(time(), IQueryBuilder::PARAM_INT))
364
-			->where($query->expr()->eq('id', $query->createNamedParameter($job->getId(), IQueryBuilder::PARAM_INT)));
365
-
366
-		if ($job instanceof \OCP\BackgroundJob\TimedJob
367
-			&& !$job->isTimeSensitive()) {
368
-			$query->set('time_sensitive', $query->createNamedParameter(IJob::TIME_INSENSITIVE));
369
-		}
370
-
371
-		$query->executeStatement();
372
-	}
373
-
374
-	#[Override]
375
-	public function setExecutionTime(IJob $job, $timeTaken): void {
376
-		$query = $this->connection->getQueryBuilder();
377
-		$query->update('jobs')
378
-			->set('execution_duration', $query->createNamedParameter($timeTaken, IQueryBuilder::PARAM_INT))
379
-			->set('reserved_at', $query->createNamedParameter(0, IQueryBuilder::PARAM_INT))
380
-			->where($query->expr()->eq('id', $query->createNamedParameter($job->getId(), IQueryBuilder::PARAM_INT)));
381
-		$query->executeStatement();
382
-	}
383
-
384
-	#[Override]
385
-	public function resetBackgroundJob(IJob $job): void {
386
-		$query = $this->connection->getQueryBuilder();
387
-		$query->update('jobs')
388
-			->set('last_run', $query->createNamedParameter(0, IQueryBuilder::PARAM_INT))
389
-			->set('reserved_at', $query->createNamedParameter(0, IQueryBuilder::PARAM_INT))
390
-			->where($query->expr()->eq('id', $query->createNamedParameter($job->getId()), IQueryBuilder::PARAM_INT));
391
-		$query->executeStatement();
392
-	}
393
-
394
-	#[Override]
395
-	public function hasReservedJob(?string $className = null): bool {
396
-		$query = $this->connection->getQueryBuilder();
397
-		$query->select('*')
398
-			->from('jobs')
399
-			->where($query->expr()->gt('reserved_at', $query->createNamedParameter($this->timeFactory->getTime() - 6 * 3600, IQueryBuilder::PARAM_INT)))
400
-			->setMaxResults(1);
401
-
402
-		if ($className !== null) {
403
-			$query->andWhere($query->expr()->eq('class', $query->createNamedParameter($className)));
404
-		}
405
-
406
-		try {
407
-			$result = $query->executeQuery();
408
-			$hasReservedJobs = $result->fetch() !== false;
409
-			$result->closeCursor();
410
-			return $hasReservedJobs;
411
-		} catch (Exception $e) {
412
-			$this->logger->debug('Querying reserved jobs failed', ['exception' => $e]);
413
-			return false;
414
-		}
415
-	}
416
-
417
-	#[Override]
418
-	public function countByClass(): array {
419
-		$query = $this->connection->getQueryBuilder();
420
-		$query->select('class')
421
-			->selectAlias($query->func()->count('id'), 'count')
422
-			->from('jobs')
423
-			->orderBy('count')
424
-			->groupBy('class');
425
-
426
-		$result = $query->executeQuery();
427
-
428
-		$jobs = [];
429
-
430
-		while (($row = $result->fetch()) !== false) {
431
-			/**
432
-			 * @var array{count:int, class:class-string<IJob>} $row
433
-			 */
434
-			$jobs[] = $row;
435
-		}
436
-
437
-		return $jobs;
438
-	}
29
+    /** @var array<string, string> */
30
+    protected array $alreadyVisitedParallelBlocked = [];
31
+
32
+    public function __construct(
33
+        protected readonly IDBConnection $connection,
34
+        protected readonly IConfig $config,
35
+        protected readonly ITimeFactory $timeFactory,
36
+        protected readonly LoggerInterface $logger,
37
+        protected readonly IGenerator $generator,
38
+    ) {
39
+    }
40
+
41
+    #[Override]
42
+    public function add(IJob|string $job, mixed $argument = null, ?int $firstCheck = null): void {
43
+        if ($firstCheck === null) {
44
+            $firstCheck = $this->timeFactory->getTime();
45
+        }
46
+
47
+        $class = ($job instanceof IJob) ? get_class($job) : $job;
48
+
49
+        $argumentJson = json_encode($argument);
50
+        if (strlen($argumentJson) > 4000) {
51
+            throw new \InvalidArgumentException('Background job arguments can\'t exceed 4000 characters (json encoded)');
52
+        }
53
+
54
+        $query = $this->connection->getQueryBuilder();
55
+        if (!$this->has($job, $argument)) {
56
+            $query->insert('jobs')
57
+                ->values([
58
+                    'id' => $query->createNamedParameter($this->generator->nextId()),
59
+                    'class' => $query->createNamedParameter($class),
60
+                    'argument' => $query->createNamedParameter($argumentJson),
61
+                    'argument_hash' => $query->createNamedParameter(hash('sha256', $argumentJson)),
62
+                    'last_run' => $query->createNamedParameter(0, IQueryBuilder::PARAM_INT),
63
+                    'last_checked' => $query->createNamedParameter($firstCheck, IQueryBuilder::PARAM_INT),
64
+                ]);
65
+        } else {
66
+            $query->update('jobs')
67
+                ->set('reserved_at', $query->expr()->literal(0, IQueryBuilder::PARAM_INT))
68
+                ->set('last_checked', $query->createNamedParameter($firstCheck, IQueryBuilder::PARAM_INT))
69
+                ->set('last_run', $query->createNamedParameter(0, IQueryBuilder::PARAM_INT))
70
+                ->where($query->expr()->eq('class', $query->createNamedParameter($class)))
71
+                ->andWhere($query->expr()->eq('argument_hash', $query->createNamedParameter(hash('sha256', $argumentJson))));
72
+        }
73
+        $query->executeStatement();
74
+    }
75
+
76
+    public function scheduleAfter(string $job, int $runAfter, mixed $argument = null): void {
77
+        $this->add($job, $argument, $runAfter);
78
+    }
79
+
80
+    #[Override]
81
+    public function remove(IJob|string $job, mixed $argument = null): void {
82
+        $class = ($job instanceof IJob) ? get_class($job) : $job;
83
+
84
+        $query = $this->connection->getQueryBuilder();
85
+        $query->delete('jobs')
86
+            ->where($query->expr()->eq('class', $query->createNamedParameter($class)));
87
+        if (!is_null($argument)) {
88
+            $argumentJson = json_encode($argument);
89
+            $query->andWhere($query->expr()->eq('argument_hash', $query->createNamedParameter(hash('sha256', $argumentJson))));
90
+        }
91
+
92
+        // Add galera safe delete chunking if using mysql
93
+        // Stops us hitting wsrep_max_ws_rows when large row counts are deleted
94
+        if ($this->connection->getDatabaseProvider() === IDBConnection::PLATFORM_MYSQL) {
95
+            // Then use chunked delete
96
+            $max = IQueryBuilder::MAX_ROW_DELETION;
97
+
98
+            $query->setMaxResults($max);
99
+
100
+            do {
101
+                $deleted = $query->executeStatement();
102
+            } while ($deleted === $max);
103
+        } else {
104
+            // Dont use chunked delete - let the DB handle the large row count natively
105
+            $query->executeStatement();
106
+        }
107
+    }
108
+
109
+    #[Override]
110
+    public function removeById(string $id): void {
111
+        $query = $this->connection->getQueryBuilder();
112
+        $query->delete('jobs')
113
+            ->where($query->expr()->eq('id', $query->createNamedParameter($id, IQueryBuilder::PARAM_INT)));
114
+        $query->executeStatement();
115
+    }
116
+
117
+    #[Override]
118
+    public function has(IJob|string $job, mixed $argument): bool {
119
+        $class = ($job instanceof IJob) ? get_class($job) : $job;
120
+        $argument = json_encode($argument);
121
+
122
+        $query = $this->connection->getQueryBuilder();
123
+        $query->select('id')
124
+            ->from('jobs')
125
+            ->where($query->expr()->eq('class', $query->createNamedParameter($class)))
126
+            ->andWhere($query->expr()->eq('argument_hash', $query->createNamedParameter(hash('sha256', $argument))))
127
+            ->setMaxResults(1);
128
+
129
+        $result = $query->executeQuery();
130
+        $row = $result->fetch();
131
+        $result->closeCursor();
132
+
133
+        return (bool)$row;
134
+    }
135
+
136
+    #[Override]
137
+    public function getJobs(IJob|string|null $job, ?int $limit, int $offset): array {
138
+        $iterable = $this->getJobsIterator($job, $limit, $offset);
139
+        return (is_array($iterable))
140
+            ? $iterable
141
+            : iterator_to_array($iterable);
142
+    }
143
+
144
+    #[Override]
145
+    public function getJobsIterator(IJob|string|null $job, ?int $limit, int $offset): iterable {
146
+        $query = $this->connection->getQueryBuilder();
147
+        $query->select('*')
148
+            ->from('jobs')
149
+            ->setMaxResults($limit)
150
+            ->setFirstResult($offset);
151
+
152
+        if ($job !== null) {
153
+            $class = ($job instanceof IJob) ? get_class($job) : $job;
154
+            $query->where($query->expr()->eq('class', $query->createNamedParameter($class)));
155
+        }
156
+
157
+        $result = $query->executeQuery();
158
+
159
+        while ($row = $result->fetch()) {
160
+            $job = $this->buildJob($row);
161
+            if ($job) {
162
+                yield $job;
163
+            }
164
+        }
165
+        $result->closeCursor();
166
+    }
167
+
168
+    #[Override]
169
+    public function getNext(bool $onlyTimeSensitive = false, ?array $jobClasses = null): ?IJob {
170
+        $query = $this->connection->getQueryBuilder();
171
+        $query->select('*')
172
+            ->from('jobs')
173
+            ->where($query->expr()->lte('reserved_at', $query->createNamedParameter($this->timeFactory->getTime() - 12 * 3600, IQueryBuilder::PARAM_INT)))
174
+            ->andWhere($query->expr()->lte('last_checked', $query->createNamedParameter($this->timeFactory->getTime(), IQueryBuilder::PARAM_INT)))
175
+            ->orderBy('last_checked', 'ASC')
176
+            ->setMaxResults(1);
177
+
178
+        if ($onlyTimeSensitive) {
179
+            $query->andWhere($query->expr()->eq('time_sensitive', $query->createNamedParameter(IJob::TIME_SENSITIVE, IQueryBuilder::PARAM_INT)));
180
+        }
181
+
182
+        if (!empty($jobClasses)) {
183
+            $orClasses = [];
184
+            foreach ($jobClasses as $jobClass) {
185
+                $orClasses[] = $query->expr()->eq('class', $query->createNamedParameter($jobClass, IQueryBuilder::PARAM_STR));
186
+            }
187
+            $query->andWhere($query->expr()->orX(...$orClasses));
188
+        }
189
+
190
+        $result = $query->executeQuery();
191
+        $row = $result->fetch();
192
+        $result->closeCursor();
193
+
194
+        if ($row) {
195
+            $job = $this->buildJob($row);
196
+
197
+            if ($job instanceof IParallelAwareJob && !$job->getAllowParallelRuns() && $this->hasReservedJob(get_class($job))) {
198
+                if (!isset($this->alreadyVisitedParallelBlocked[get_class($job)])) {
199
+                    $this->alreadyVisitedParallelBlocked[get_class($job)] = $job->getId();
200
+                } elseif ($this->alreadyVisitedParallelBlocked[get_class($job)] === $job->getId()) {
201
+                    $this->logger->info('Skipped through all jobs and revisited a IParallelAwareJob blocked job again, giving up.', ['app' => 'cron']);
202
+                    return null;
203
+                }
204
+                $this->logger->info('Skipping ' . get_class($job) . ' job with ID ' . $job->getId() . ' because another job with the same class is already running', ['app' => 'cron']);
205
+
206
+                $update = $this->connection->getQueryBuilder();
207
+                $update->update('jobs')
208
+                    ->set('last_checked', $update->createNamedParameter($this->timeFactory->getTime() + 1))
209
+                    ->where($update->expr()->eq('id', $update->createParameter('jobid')));
210
+                $update->setParameter('jobid', $row['id']);
211
+                $update->executeStatement();
212
+
213
+                return $this->getNext($onlyTimeSensitive, $jobClasses);
214
+            }
215
+
216
+            if ($job !== null && isset($this->alreadyVisitedParallelBlocked[get_class($job)])) {
217
+                unset($this->alreadyVisitedParallelBlocked[get_class($job)]);
218
+            }
219
+
220
+            if ($job instanceof \OCP\BackgroundJob\TimedJob) {
221
+                $now = $this->timeFactory->getTime();
222
+                $nextPossibleRun = $job->getLastRun() + $job->getInterval();
223
+                if ($now < $nextPossibleRun) {
224
+                    // This job is not ready for execution yet. Set timestamps to the future to avoid
225
+                    // re-checking with every cron run.
226
+                    // To avoid bugs that lead to jobs never executing again, the future timestamp is
227
+                    // capped at two days.
228
+                    $nextCheck = min($nextPossibleRun, $now + 48 * 3600);
229
+                    $updateTimedJob = $this->connection->getQueryBuilder();
230
+                    $updateTimedJob->update('jobs')
231
+                        ->set('last_checked', $updateTimedJob->createNamedParameter($nextCheck, IQueryBuilder::PARAM_INT))
232
+                        ->where($updateTimedJob->expr()->eq('id', $updateTimedJob->createParameter('jobid')));
233
+                    $updateTimedJob->setParameter('jobid', $row['id']);
234
+                    $updateTimedJob->executeStatement();
235
+
236
+                    return $this->getNext($onlyTimeSensitive, $jobClasses);
237
+                }
238
+            }
239
+
240
+            $update = $this->connection->getQueryBuilder();
241
+            $update->update('jobs')
242
+                ->set('reserved_at', $update->createNamedParameter($this->timeFactory->getTime()))
243
+                ->set('last_checked', $update->createNamedParameter($this->timeFactory->getTime()))
244
+                ->where($update->expr()->eq('id', $update->createParameter('jobid')))
245
+                ->andWhere($update->expr()->eq('reserved_at', $update->createParameter('reserved_at')))
246
+                ->andWhere($update->expr()->eq('last_checked', $update->createParameter('last_checked')));
247
+            $update->setParameter('jobid', $row['id']);
248
+            $update->setParameter('reserved_at', $row['reserved_at']);
249
+            $update->setParameter('last_checked', $row['last_checked']);
250
+            $count = $update->executeStatement();
251
+
252
+            if ($count === 0) {
253
+                // Background job already executed elsewhere, try again.
254
+                return $this->getNext($onlyTimeSensitive, $jobClasses);
255
+            }
256
+
257
+            if ($job === null) {
258
+                // set the last_checked to 12h in the future to not check failing jobs all over again
259
+                $reset = $this->connection->getQueryBuilder();
260
+                $reset->update('jobs')
261
+                    ->set('reserved_at', $reset->expr()->literal(0, IQueryBuilder::PARAM_INT))
262
+                    ->set('last_checked', $reset->createNamedParameter($this->timeFactory->getTime() + 12 * 3600, IQueryBuilder::PARAM_INT))
263
+                    ->where($reset->expr()->eq('id', $reset->createNamedParameter($row['id'], IQueryBuilder::PARAM_INT)));
264
+                $reset->executeStatement();
265
+
266
+                // Background job from disabled app, try again.
267
+                return $this->getNext($onlyTimeSensitive, $jobClasses);
268
+            }
269
+
270
+            return $job;
271
+        } else {
272
+            return null;
273
+        }
274
+    }
275
+
276
+    #[Override]
277
+    public function getById(string $id): ?IJob {
278
+        $row = $this->getDetailsById($id);
279
+
280
+        if ($row) {
281
+            return $this->buildJob($row);
282
+        }
283
+
284
+        return null;
285
+    }
286
+
287
+    #[Override]
288
+    public function getDetailsById(string $id): ?array {
289
+        $query = $this->connection->getQueryBuilder();
290
+        $query->select('*')
291
+            ->from('jobs')
292
+            ->where($query->expr()->eq('id', $query->createNamedParameter($id, IQueryBuilder::PARAM_INT)));
293
+        $result = $query->executeQuery();
294
+        $row = $result->fetch();
295
+        $result->closeCursor();
296
+
297
+        if ($row) {
298
+            return $row;
299
+        }
300
+
301
+        return null;
302
+    }
303
+
304
+    /**
305
+     * get the job object from a row in the db
306
+     *
307
+     * @param array{class:class-string<IJob>, id:mixed, last_run:mixed, argument:string} $row
308
+     * @return ?IJob the next job to run. Beware that this object may be a singleton and may be modified by the next call to buildJob.
309
+     */
310
+    private function buildJob(array $row): ?IJob {
311
+        try {
312
+            try {
313
+                // Try to load the job as a service
314
+                /** @var IJob $job */
315
+                $job = \OCP\Server::get($row['class']);
316
+            } catch (ContainerExceptionInterface $e) {
317
+                if (class_exists($row['class'])) {
318
+                    $class = $row['class'];
319
+                    $job = new $class();
320
+                } else {
321
+                    $this->logger->warning('failed to create instance of background job: ' . $row['class'], ['app' => 'cron', 'exception' => $e]);
322
+                    // Remove job from disabled app or old version of an app
323
+                    $this->removeById($row['id']);
324
+                    return null;
325
+                }
326
+            }
327
+
328
+            if (!($job instanceof IJob)) {
329
+                // This most likely means an invalid job was enqueued. We can ignore it.
330
+                return null;
331
+            }
332
+            $job->setId($row['id']);
333
+            $job->setLastRun((int)$row['last_run']);
334
+            $job->setArgument(json_decode($row['argument'], true));
335
+            return $job;
336
+        } catch (AutoloadNotAllowedException $e) {
337
+            // job is from a disabled app, ignore
338
+            return null;
339
+        }
340
+    }
341
+
342
+    /**
343
+     * set the job that was last ran
344
+     */
345
+    public function setLastJob(IJob $job): void {
346
+        $this->unlockJob($job);
347
+        $this->config->setAppValue('backgroundjob', 'lastjob', $job->getId());
348
+    }
349
+
350
+    #[Override]
351
+    public function unlockJob(IJob $job): void {
352
+        $query = $this->connection->getQueryBuilder();
353
+        $query->update('jobs')
354
+            ->set('reserved_at', $query->expr()->literal(0, IQueryBuilder::PARAM_INT))
355
+            ->where($query->expr()->eq('id', $query->createNamedParameter($job->getId(), IQueryBuilder::PARAM_INT)));
356
+        $query->executeStatement();
357
+    }
358
+
359
+    #[Override]
360
+    public function setLastRun(IJob $job): void {
361
+        $query = $this->connection->getQueryBuilder();
362
+        $query->update('jobs')
363
+            ->set('last_run', $query->createNamedParameter(time(), IQueryBuilder::PARAM_INT))
364
+            ->where($query->expr()->eq('id', $query->createNamedParameter($job->getId(), IQueryBuilder::PARAM_INT)));
365
+
366
+        if ($job instanceof \OCP\BackgroundJob\TimedJob
367
+            && !$job->isTimeSensitive()) {
368
+            $query->set('time_sensitive', $query->createNamedParameter(IJob::TIME_INSENSITIVE));
369
+        }
370
+
371
+        $query->executeStatement();
372
+    }
373
+
374
+    #[Override]
375
+    public function setExecutionTime(IJob $job, $timeTaken): void {
376
+        $query = $this->connection->getQueryBuilder();
377
+        $query->update('jobs')
378
+            ->set('execution_duration', $query->createNamedParameter($timeTaken, IQueryBuilder::PARAM_INT))
379
+            ->set('reserved_at', $query->createNamedParameter(0, IQueryBuilder::PARAM_INT))
380
+            ->where($query->expr()->eq('id', $query->createNamedParameter($job->getId(), IQueryBuilder::PARAM_INT)));
381
+        $query->executeStatement();
382
+    }
383
+
384
+    #[Override]
385
+    public function resetBackgroundJob(IJob $job): void {
386
+        $query = $this->connection->getQueryBuilder();
387
+        $query->update('jobs')
388
+            ->set('last_run', $query->createNamedParameter(0, IQueryBuilder::PARAM_INT))
389
+            ->set('reserved_at', $query->createNamedParameter(0, IQueryBuilder::PARAM_INT))
390
+            ->where($query->expr()->eq('id', $query->createNamedParameter($job->getId()), IQueryBuilder::PARAM_INT));
391
+        $query->executeStatement();
392
+    }
393
+
394
+    #[Override]
395
+    public function hasReservedJob(?string $className = null): bool {
396
+        $query = $this->connection->getQueryBuilder();
397
+        $query->select('*')
398
+            ->from('jobs')
399
+            ->where($query->expr()->gt('reserved_at', $query->createNamedParameter($this->timeFactory->getTime() - 6 * 3600, IQueryBuilder::PARAM_INT)))
400
+            ->setMaxResults(1);
401
+
402
+        if ($className !== null) {
403
+            $query->andWhere($query->expr()->eq('class', $query->createNamedParameter($className)));
404
+        }
405
+
406
+        try {
407
+            $result = $query->executeQuery();
408
+            $hasReservedJobs = $result->fetch() !== false;
409
+            $result->closeCursor();
410
+            return $hasReservedJobs;
411
+        } catch (Exception $e) {
412
+            $this->logger->debug('Querying reserved jobs failed', ['exception' => $e]);
413
+            return false;
414
+        }
415
+    }
416
+
417
+    #[Override]
418
+    public function countByClass(): array {
419
+        $query = $this->connection->getQueryBuilder();
420
+        $query->select('class')
421
+            ->selectAlias($query->func()->count('id'), 'count')
422
+            ->from('jobs')
423
+            ->orderBy('count')
424
+            ->groupBy('class');
425
+
426
+        $result = $query->executeQuery();
427
+
428
+        $jobs = [];
429
+
430
+        while (($row = $result->fetch()) !== false) {
431
+            /**
432
+             * @var array{count:int, class:class-string<IJob>} $row
433
+             */
434
+            $jobs[] = $row;
435
+        }
436
+
437
+        return $jobs;
438
+    }
439 439
 }
Please login to merge, or discard this patch.