1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace ResqueBundle\Resque; |
4
|
|
|
|
5
|
|
|
use Psr\Log\NullLogger; |
6
|
|
|
|
7
|
|
|
/** |
8
|
|
|
* Class Resque |
9
|
|
|
* @package ResqueBundle\Resque |
10
|
|
|
*/ |
11
|
|
|
class Resque implements EnqueueInterface |
12
|
|
|
{ |
13
|
|
|
/** |
14
|
|
|
* @var array |
15
|
|
|
*/ |
16
|
|
|
private $kernelOptions; |
17
|
|
|
|
18
|
|
|
/** |
19
|
|
|
* @var array |
20
|
|
|
*/ |
21
|
|
|
private $redisConfiguration; |
22
|
|
|
|
23
|
|
|
/** |
24
|
|
|
* @var array |
25
|
|
|
*/ |
26
|
|
|
private $globalRetryStrategy = []; |
27
|
|
|
|
28
|
|
|
/** |
29
|
|
|
* @var array |
30
|
|
|
*/ |
31
|
|
|
private $jobRetryStrategy = []; |
32
|
|
|
|
33
|
|
|
/** |
34
|
|
|
* Resque constructor. |
35
|
|
|
* @param array $kernelOptions |
36
|
|
|
*/ |
37
|
|
|
public function __construct(array $kernelOptions) |
38
|
|
|
{ |
39
|
|
|
$this->kernelOptions = $kernelOptions; |
40
|
|
|
} |
41
|
|
|
|
42
|
|
|
/** |
43
|
|
|
* @param $prefix |
44
|
|
|
*/ |
45
|
|
|
public function setPrefix($prefix) |
46
|
|
|
{ |
47
|
|
|
\Resque_Redis::prefix($prefix); |
48
|
|
|
} |
49
|
|
|
|
50
|
|
|
/** |
51
|
|
|
* @param $strategy |
52
|
|
|
*/ |
53
|
|
|
public function setGlobalRetryStrategy($strategy) |
54
|
|
|
{ |
55
|
|
|
$this->globalRetryStrategy = $strategy; |
56
|
|
|
} |
57
|
|
|
|
58
|
|
|
/** |
59
|
|
|
* @param $strategy |
60
|
|
|
*/ |
61
|
|
|
public function setJobRetryStrategy($strategy) |
62
|
|
|
{ |
63
|
|
|
$this->jobRetryStrategy = $strategy; |
64
|
|
|
} |
65
|
|
|
|
66
|
|
|
/** |
67
|
|
|
* @return array |
68
|
|
|
*/ |
69
|
|
|
public function getRedisConfiguration() |
70
|
|
|
{ |
71
|
|
|
return $this->redisConfiguration; |
72
|
|
|
} |
73
|
|
|
|
74
|
|
|
/** |
75
|
|
|
* @param $host |
76
|
|
|
* @param $port |
77
|
|
|
* @param $database |
78
|
|
|
*/ |
79
|
|
|
public function setRedisConfiguration($host, $port, $database) |
80
|
|
|
{ |
81
|
|
|
$this->redisConfiguration = [ |
82
|
|
|
'host' => $host, |
83
|
|
|
'port' => $port, |
84
|
|
|
'database' => $database, |
85
|
|
|
]; |
86
|
|
|
$host = substr($host, 0, 1) == '/' ? $host : $host . ':' . $port; |
87
|
|
|
|
88
|
|
|
\Resque::setBackend($host, $database); |
89
|
|
|
} |
90
|
|
|
|
91
|
|
|
/** |
92
|
|
|
* @param Job $job |
93
|
|
|
* @param bool $trackStatus |
94
|
|
|
* @return null|\Resque_Job_Status |
95
|
|
|
*/ |
96
|
|
|
public function enqueueOnce(Job $job, $trackStatus = FALSE) |
97
|
|
|
{ |
98
|
|
|
$queue = new Queue($job->queue); |
99
|
|
|
$jobs = $queue->getJobs(); |
100
|
|
|
|
101
|
|
|
foreach ($jobs AS $j) { |
102
|
|
|
if ($j->job->payload['class'] == get_class($job)) { |
103
|
|
|
if (count(array_intersect($j->args, $job->args)) == count($job->args)) { |
104
|
|
|
return ($trackStatus) ? $j->job->payload['id'] : NULL; |
105
|
|
|
} |
106
|
|
|
} |
107
|
|
|
} |
108
|
|
|
|
109
|
|
|
return $this->enqueue($job, $trackStatus); |
110
|
|
|
} |
111
|
|
|
|
112
|
|
|
/** |
113
|
|
|
* @param Job $job |
114
|
|
|
* @param bool $trackStatus |
115
|
|
|
* @return null|\Resque_Job_Status |
116
|
|
|
*/ |
117
|
|
|
public function enqueue(Job $job, $trackStatus = FALSE) |
118
|
|
|
{ |
119
|
|
|
if ($job instanceof ContainerAwareJob) { |
120
|
|
|
$job->setKernelOptions($this->kernelOptions); |
121
|
|
|
} |
122
|
|
|
|
123
|
|
|
$this->attachRetryStrategy($job); |
124
|
|
|
|
125
|
|
|
$result = \Resque::enqueue($job->queue, \get_class($job), $job->args, $trackStatus); |
126
|
|
|
|
127
|
|
|
if ($trackStatus && $result !== FALSE) { |
128
|
|
|
return new \Resque_Job_Status($result); |
129
|
|
|
} |
130
|
|
|
|
131
|
|
|
return NULL; |
132
|
|
|
} |
133
|
|
|
|
134
|
|
|
/** |
135
|
|
|
* Attach any applicable retry strategy to the job. |
136
|
|
|
* |
137
|
|
|
* @param Job $job |
138
|
|
|
*/ |
139
|
|
|
protected function attachRetryStrategy($job) |
140
|
|
|
{ |
141
|
|
|
$class = get_class($job); |
142
|
|
|
|
143
|
|
|
if (isset($this->jobRetryStrategy[$class])) { |
144
|
|
|
if (count($this->jobRetryStrategy[$class])) { |
145
|
|
|
$job->args['resque.retry_strategy'] = $this->jobRetryStrategy[$class]; |
146
|
|
|
} |
147
|
|
|
$job->args['resque.retry_strategy'] = $this->jobRetryStrategy[$class]; |
148
|
|
|
} elseif (count($this->globalRetryStrategy)) { |
149
|
|
|
$job->args['resque.retry_strategy'] = $this->globalRetryStrategy; |
150
|
|
|
} |
151
|
|
|
} |
152
|
|
|
|
153
|
|
|
/** |
154
|
|
|
* @param $at |
155
|
|
|
* @param Job $job |
156
|
|
|
* @return null |
157
|
|
|
*/ |
158
|
|
View Code Duplication |
public function enqueueAt($at, Job $job) |
|
|
|
|
159
|
|
|
{ |
160
|
|
|
if ($job instanceof ContainerAwareJob) { |
161
|
|
|
$job->setKernelOptions($this->kernelOptions); |
162
|
|
|
} |
163
|
|
|
|
164
|
|
|
$this->attachRetryStrategy($job); |
165
|
|
|
|
166
|
|
|
\ResqueScheduler::enqueueAt($at, $job->queue, \get_class($job), $job->args); |
167
|
|
|
|
168
|
|
|
return NULL; |
169
|
|
|
} |
170
|
|
|
|
171
|
|
|
/** |
172
|
|
|
* @param $in |
173
|
|
|
* @param Job $job |
174
|
|
|
* @return null |
175
|
|
|
*/ |
176
|
|
View Code Duplication |
public function enqueueIn($in, Job $job) |
|
|
|
|
177
|
|
|
{ |
178
|
|
|
if ($job instanceof ContainerAwareJob) { |
179
|
|
|
$job->setKernelOptions($this->kernelOptions); |
180
|
|
|
} |
181
|
|
|
|
182
|
|
|
$this->attachRetryStrategy($job); |
183
|
|
|
|
184
|
|
|
\ResqueScheduler::enqueueIn($in, $job->queue, \get_class($job), $job->args); |
185
|
|
|
|
186
|
|
|
return NULL; |
187
|
|
|
} |
188
|
|
|
|
189
|
|
|
/** |
190
|
|
|
* @param Job $job |
191
|
|
|
* @return mixed |
192
|
|
|
*/ |
193
|
|
View Code Duplication |
public function removedDelayed(Job $job) |
|
|
|
|
194
|
|
|
{ |
195
|
|
|
if ($job instanceof ContainerAwareJob) { |
196
|
|
|
$job->setKernelOptions($this->kernelOptions); |
197
|
|
|
} |
198
|
|
|
|
199
|
|
|
$this->attachRetryStrategy($job); |
200
|
|
|
|
201
|
|
|
return \ResqueScheduler::removeDelayed($job->queue, \get_class($job), $job->args); |
202
|
|
|
} |
203
|
|
|
|
204
|
|
|
/** |
205
|
|
|
* @param $at |
206
|
|
|
* @param Job $job |
207
|
|
|
* @return mixed |
208
|
|
|
*/ |
209
|
|
View Code Duplication |
public function removeFromTimestamp($at, Job $job) |
|
|
|
|
210
|
|
|
{ |
211
|
|
|
if ($job instanceof ContainerAwareJob) { |
212
|
|
|
$job->setKernelOptions($this->kernelOptions); |
213
|
|
|
} |
214
|
|
|
|
215
|
|
|
$this->attachRetryStrategy($job); |
216
|
|
|
|
217
|
|
|
return \ResqueScheduler::removeDelayedJobFromTimestamp($at, $job->queue, \get_class($job), $job->args); |
218
|
|
|
} |
219
|
|
|
|
220
|
|
|
/** |
221
|
|
|
* @return array |
222
|
|
|
*/ |
223
|
|
|
public function getQueues() |
224
|
|
|
{ |
225
|
|
|
return \array_map(function($queue) { |
226
|
|
|
return new Queue($queue); |
227
|
|
|
}, \Resque::queues()); |
228
|
|
|
} |
229
|
|
|
|
230
|
|
|
/** |
231
|
|
|
* @param $queue |
232
|
|
|
* @return Queue |
233
|
|
|
*/ |
234
|
|
|
public function getQueue($queue) |
235
|
|
|
{ |
236
|
|
|
return new Queue($queue); |
237
|
|
|
} |
238
|
|
|
|
239
|
|
|
/** |
240
|
|
|
* @return Worker[] |
241
|
|
|
*/ |
242
|
|
|
public function getWorkers() |
243
|
|
|
{ |
244
|
|
|
return \array_map(function($worker) { |
245
|
|
|
return new Worker($worker); |
246
|
|
|
}, \Resque_Worker::all()); |
247
|
|
|
} |
248
|
|
|
|
249
|
|
|
/** |
250
|
|
|
* @return Worker[] |
251
|
|
|
*/ |
252
|
|
|
public function getRunningWorkers() |
253
|
|
|
{ |
254
|
|
|
return array_filter($this->getWorkers(), function (Worker $worker) { |
255
|
|
|
return $worker->getCurrentJob() !== null; |
256
|
|
|
}); |
257
|
|
|
} |
258
|
|
|
|
259
|
|
|
/** |
260
|
|
|
* @param $id |
261
|
|
|
* @return Worker|null |
262
|
|
|
*/ |
263
|
|
|
public function getWorker($id) |
264
|
|
|
{ |
265
|
|
|
$worker = \Resque_Worker::find($id); |
266
|
|
|
|
267
|
|
|
if (!$worker) { |
268
|
|
|
return NULL; |
269
|
|
|
} |
270
|
|
|
|
271
|
|
|
return new Worker($worker); |
272
|
|
|
} |
273
|
|
|
|
274
|
|
|
/** |
275
|
|
|
* @return int |
276
|
|
|
*/ |
277
|
|
|
public function getNumberOfWorkers() |
278
|
|
|
{ |
279
|
|
|
return \Resque::redis()->scard('workers'); |
280
|
|
|
} |
281
|
|
|
|
282
|
|
|
/** |
283
|
|
|
* @return int |
284
|
|
|
*/ |
285
|
|
|
public function getNumberOfWorkingWorkers() |
286
|
|
|
{ |
287
|
|
|
$count = 0; |
288
|
|
|
foreach ($this->getWorkers() as $worker) { |
289
|
|
|
if ($worker->getCurrentJob() !== null) { |
290
|
|
|
$count++; |
291
|
|
|
} |
292
|
|
|
} |
293
|
|
|
|
294
|
|
|
return $count; |
295
|
|
|
} |
296
|
|
|
|
297
|
|
|
/** |
298
|
|
|
* @todo - Clean this up, for now, prune dead workers, just in case |
299
|
|
|
*/ |
300
|
|
|
public function pruneDeadWorkers() |
301
|
|
|
{ |
302
|
|
|
$worker = new \Resque_Worker('temp'); |
303
|
|
|
$worker->setLogger(new NullLogger()); |
304
|
|
|
$worker->pruneDeadWorkers(); |
305
|
|
|
} |
306
|
|
|
|
307
|
|
|
/** |
308
|
|
|
* @return array|mixed |
309
|
|
|
*/ |
310
|
|
|
public function getFirstDelayedJobTimestamp() |
311
|
|
|
{ |
312
|
|
|
$timestamps = $this->getDelayedJobTimestamps(); |
313
|
|
|
if (count($timestamps) > 0) { |
314
|
|
|
return $timestamps[0]; |
315
|
|
|
} |
316
|
|
|
|
317
|
|
|
return [NULL, 0]; |
318
|
|
|
} |
319
|
|
|
|
320
|
|
|
/** |
321
|
|
|
* @return array |
322
|
|
|
*/ |
323
|
|
|
public function getDelayedJobTimestamps() |
324
|
|
|
{ |
325
|
|
|
$timestamps = \Resque::redis()->zrange('delayed_queue_schedule', 0, -1); |
326
|
|
|
|
327
|
|
|
//TODO: find a more efficient way to do this |
328
|
|
|
$out = []; |
329
|
|
|
foreach ($timestamps as $timestamp) { |
330
|
|
|
$out[] = [$timestamp, \Resque::redis()->llen('delayed:' . $timestamp)]; |
331
|
|
|
} |
332
|
|
|
|
333
|
|
|
return $out; |
334
|
|
|
} |
335
|
|
|
|
336
|
|
|
/** |
337
|
|
|
* @return mixed |
338
|
|
|
*/ |
339
|
|
|
public function getNumberOfDelayedJobs() |
340
|
|
|
{ |
341
|
|
|
return \ResqueScheduler::getDelayedQueueScheduleSize(); |
342
|
|
|
} |
343
|
|
|
|
344
|
|
|
/** |
345
|
|
|
* @param $timestamp |
346
|
|
|
* @return array |
347
|
|
|
*/ |
348
|
|
View Code Duplication |
public function getJobsForTimestamp($timestamp) |
|
|
|
|
349
|
|
|
{ |
350
|
|
|
$jobs = \Resque::redis()->lrange('delayed:' . $timestamp, 0, -1); |
351
|
|
|
$out = []; |
352
|
|
|
foreach ($jobs as $job) { |
353
|
|
|
$out[] = json_decode($job, TRUE); |
354
|
|
|
} |
355
|
|
|
|
356
|
|
|
return $out; |
357
|
|
|
} |
358
|
|
|
|
359
|
|
|
/** |
360
|
|
|
* @param $queue |
361
|
|
|
* @return int |
362
|
|
|
*/ |
363
|
|
|
public function clearQueue($queue) |
364
|
|
|
{ |
365
|
|
|
return $this->getQueue($queue)->clear(); |
366
|
|
|
} |
367
|
|
|
|
368
|
|
|
/** |
369
|
|
|
* @param int $start |
370
|
|
|
* @param int $count |
371
|
|
|
* @return array |
372
|
|
|
*/ |
373
|
|
View Code Duplication |
public function getFailedJobs($start = -100, $count = 100) |
|
|
|
|
374
|
|
|
{ |
375
|
|
|
$jobs = \Resque::redis()->lrange('failed', $start, $count); |
376
|
|
|
|
377
|
|
|
$result = []; |
378
|
|
|
|
379
|
|
|
foreach ($jobs as $job) { |
380
|
|
|
$result[] = new FailedJob(json_decode($job, TRUE)); |
381
|
|
|
} |
382
|
|
|
|
383
|
|
|
return $result; |
384
|
|
|
} |
385
|
|
|
|
386
|
|
|
/** |
387
|
|
|
* @return int |
388
|
|
|
*/ |
389
|
|
|
public function getNumberOfFailedJobs() |
390
|
|
|
{ |
391
|
|
|
return \Resque::redis()->llen('failed'); |
392
|
|
|
} |
393
|
|
|
|
394
|
|
|
/** |
395
|
|
|
* @return int |
396
|
|
|
*/ |
397
|
|
|
public function retryFailedJobs() |
398
|
|
|
{ |
399
|
|
|
$jobs = \Resque::redis()->lrange('failed', 0, -1); |
400
|
|
|
foreach ($jobs as $job) { |
401
|
|
|
$failedJob = new FailedJob(json_decode($job, true)); |
402
|
|
|
\Resque::enqueue($failedJob->getQueueName(), $failedJob->getName(), $failedJob->getArgs()[0]); |
403
|
|
|
} |
404
|
|
|
return count($jobs); |
405
|
|
|
} |
406
|
|
|
|
407
|
|
|
/** |
408
|
|
|
* @return int |
409
|
|
|
*/ |
410
|
|
|
public function clearFailedJobs() |
411
|
|
|
{ |
412
|
|
|
$length = \Resque::redis()->llen('failed'); |
413
|
|
|
if ($length > 0) { |
414
|
|
|
\Resque::redis()->del('failed'); |
415
|
|
|
} |
416
|
|
|
|
417
|
|
|
return $length; |
418
|
|
|
} |
419
|
|
|
} |
420
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.