1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Mpclarkson\ResqueBundle; |
4
|
|
|
|
5
|
|
|
use Psr\Log\NullLogger; |
6
|
|
|
|
7
|
|
|
/** |
8
|
|
|
* Class Resque |
9
|
|
|
* @package Mpclarkson\ResqueBundle |
10
|
|
|
*/ |
11
|
|
|
class Resque implements EnqueueInterface |
12
|
|
|
{ |
13
|
|
|
/** |
14
|
|
|
* @var array |
15
|
|
|
*/ |
16
|
|
|
private $kernelOptions; |
17
|
|
|
|
18
|
|
|
/** |
19
|
|
|
* @var array |
20
|
|
|
*/ |
21
|
|
|
private $redisConfiguration; |
22
|
|
|
|
23
|
|
|
/** |
24
|
|
|
* @var array |
25
|
|
|
*/ |
26
|
|
|
private $globalRetryStrategy = []; |
27
|
|
|
|
28
|
|
|
/** |
29
|
|
|
* @var array |
30
|
|
|
*/ |
31
|
|
|
private $jobRetryStrategy = []; |
32
|
|
|
|
33
|
|
|
/** |
34
|
|
|
* Resque constructor. |
35
|
|
|
* @param array $kernelOptions |
36
|
|
|
*/ |
37
|
|
|
public function __construct(array $kernelOptions) |
38
|
|
|
{ |
39
|
|
|
$this->kernelOptions = $kernelOptions; |
40
|
|
|
} |
41
|
|
|
|
42
|
|
|
/** |
43
|
|
|
* @param $prefix |
44
|
|
|
*/ |
45
|
|
|
public function setPrefix($prefix) |
46
|
|
|
{ |
47
|
|
|
\Resque_Redis::prefix($prefix); |
48
|
|
|
} |
49
|
|
|
|
50
|
|
|
/** |
51
|
|
|
* @param $strategy |
52
|
|
|
*/ |
53
|
|
|
public function setGlobalRetryStrategy($strategy) |
54
|
|
|
{ |
55
|
|
|
$this->globalRetryStrategy = $strategy; |
56
|
|
|
} |
57
|
|
|
|
58
|
|
|
/** |
59
|
|
|
* @param $strategy |
60
|
|
|
*/ |
61
|
|
|
public function setJobRetryStrategy($strategy) |
62
|
|
|
{ |
63
|
|
|
$this->jobRetryStrategy = $strategy; |
64
|
|
|
} |
65
|
|
|
|
66
|
|
|
/** |
67
|
|
|
* @return array |
68
|
|
|
*/ |
69
|
|
|
public function getRedisConfiguration() |
70
|
|
|
{ |
71
|
|
|
return $this->redisConfiguration; |
72
|
|
|
} |
73
|
|
|
|
74
|
|
|
/** |
75
|
|
|
* @param $host |
76
|
|
|
* @param $port |
77
|
|
|
* @param $database |
78
|
|
|
*/ |
79
|
|
|
public function setRedisConfiguration($host, $port, $database) |
80
|
|
|
{ |
81
|
|
|
$this->redisConfiguration = [ |
82
|
|
|
'host' => $host, |
83
|
|
|
'port' => $port, |
84
|
|
|
'database' => $database, |
85
|
|
|
]; |
86
|
|
|
$host = substr($host, 0, 1) == '/' ? $host : $host . ':' . $port; |
87
|
|
|
|
88
|
|
|
\Resque::setBackend($host, $database); |
89
|
|
|
} |
90
|
|
|
|
91
|
|
|
/** |
92
|
|
|
* @param Job $job |
93
|
|
|
* @param bool $trackStatus |
94
|
|
|
* @return null|\Resque_Job_Status |
95
|
|
|
*/ |
96
|
|
|
public function enqueueOnce(Job $job, $trackStatus = FALSE) |
97
|
|
|
{ |
98
|
|
|
$queue = new Queue($job->queue); |
99
|
|
|
$jobs = $queue->getJobs(); |
100
|
|
|
|
101
|
|
|
foreach ($jobs AS $j) { |
102
|
|
|
if ($j->job->payload['class'] == get_class($job)) { |
103
|
|
|
if (count(array_intersect($j->args, $job->args)) == count($job->args)) { |
104
|
|
|
return ($trackStatus) ? $j->job->payload['id'] : NULL; |
105
|
|
|
} |
106
|
|
|
} |
107
|
|
|
} |
108
|
|
|
|
109
|
|
|
return $this->enqueue($job, $trackStatus); |
110
|
|
|
} |
111
|
|
|
|
112
|
|
|
/** |
113
|
|
|
* @param Job $job |
114
|
|
|
* @param bool $trackStatus |
115
|
|
|
* @return null|\Resque_Job_Status |
116
|
|
|
*/ |
117
|
|
|
public function enqueue(Job $job, $trackStatus = FALSE) |
118
|
|
|
{ |
119
|
|
|
if ($job instanceof ContainerAwareJob) { |
120
|
|
|
$job->setKernelOptions($this->kernelOptions); |
121
|
|
|
} |
122
|
|
|
|
123
|
|
|
$this->attachRetryStrategy($job); |
124
|
|
|
|
125
|
|
|
$result = \Resque::enqueue($job->queue, \get_class($job), $job->args, $trackStatus); |
126
|
|
|
|
127
|
|
|
if ($trackStatus) { |
128
|
|
|
return new \Resque_Job_Status($result); |
|
|
|
|
129
|
|
|
} |
130
|
|
|
|
131
|
|
|
return NULL; |
132
|
|
|
} |
133
|
|
|
|
134
|
|
|
/** |
135
|
|
|
* Attach any applicable retry strategy to the job. |
136
|
|
|
* |
137
|
|
|
* @param Job $job |
138
|
|
|
*/ |
139
|
|
|
protected function attachRetryStrategy($job) |
140
|
|
|
{ |
141
|
|
|
$class = get_class($job); |
142
|
|
|
|
143
|
|
|
if (isset($this->jobRetryStrategy[$class])) { |
144
|
|
|
if (count($this->jobRetryStrategy[$class])) { |
145
|
|
|
$job->args['resque.retry_strategy'] = $this->jobRetryStrategy[$class]; |
146
|
|
|
} |
147
|
|
|
$job->args['resque.retry_strategy'] = $this->jobRetryStrategy[$class]; |
148
|
|
|
} elseif (count($this->globalRetryStrategy)) { |
149
|
|
|
$job->args['resque.retry_strategy'] = $this->globalRetryStrategy; |
150
|
|
|
} |
151
|
|
|
} |
152
|
|
|
|
153
|
|
|
/** |
154
|
|
|
* @param $at |
155
|
|
|
* @param Job $job |
156
|
|
|
* @return null |
157
|
|
|
*/ |
158
|
|
View Code Duplication |
public function enqueueAt($at, Job $job) |
|
|
|
|
159
|
|
|
{ |
160
|
|
|
if ($job instanceof ContainerAwareJob) { |
161
|
|
|
$job->setKernelOptions($this->kernelOptions); |
162
|
|
|
} |
163
|
|
|
|
164
|
|
|
$this->attachRetryStrategy($job); |
165
|
|
|
|
166
|
|
|
\ResqueScheduler::enqueueAt($at, $job->queue, \get_class($job), $job->args); |
167
|
|
|
|
168
|
|
|
return NULL; |
169
|
|
|
} |
170
|
|
|
|
171
|
|
|
/** |
172
|
|
|
* @param $in |
173
|
|
|
* @param Job $job |
174
|
|
|
* @return null |
175
|
|
|
*/ |
176
|
|
View Code Duplication |
public function enqueueIn($in, Job $job) |
|
|
|
|
177
|
|
|
{ |
178
|
|
|
if ($job instanceof ContainerAwareJob) { |
179
|
|
|
$job->setKernelOptions($this->kernelOptions); |
180
|
|
|
} |
181
|
|
|
|
182
|
|
|
$this->attachRetryStrategy($job); |
183
|
|
|
|
184
|
|
|
\ResqueScheduler::enqueueIn($in, $job->queue, \get_class($job), $job->args); |
185
|
|
|
|
186
|
|
|
return NULL; |
187
|
|
|
} |
188
|
|
|
|
189
|
|
|
/** |
190
|
|
|
* @param Job $job |
191
|
|
|
* @return mixed |
192
|
|
|
*/ |
193
|
|
View Code Duplication |
public function removedDelayed(Job $job) |
|
|
|
|
194
|
|
|
{ |
195
|
|
|
if ($job instanceof ContainerAwareJob) { |
196
|
|
|
$job->setKernelOptions($this->kernelOptions); |
197
|
|
|
} |
198
|
|
|
|
199
|
|
|
$this->attachRetryStrategy($job); |
200
|
|
|
|
201
|
|
|
return \ResqueScheduler::removeDelayed($job->queue, \get_class($job), $job->args); |
202
|
|
|
} |
203
|
|
|
|
204
|
|
|
/** |
205
|
|
|
* @param $at |
206
|
|
|
* @param Job $job |
207
|
|
|
* @return mixed |
208
|
|
|
*/ |
209
|
|
View Code Duplication |
public function removeFromTimestamp($at, Job $job) |
|
|
|
|
210
|
|
|
{ |
211
|
|
|
if ($job instanceof ContainerAwareJob) { |
212
|
|
|
$job->setKernelOptions($this->kernelOptions); |
213
|
|
|
} |
214
|
|
|
|
215
|
|
|
$this->attachRetryStrategy($job); |
216
|
|
|
|
217
|
|
|
return \ResqueScheduler::removeDelayedJobFromTimestamp($at, $job->queue, \get_class($job), $job->args); |
218
|
|
|
} |
219
|
|
|
|
220
|
|
|
/** |
221
|
|
|
* @return array |
222
|
|
|
*/ |
223
|
|
|
public function getQueues() |
224
|
|
|
{ |
225
|
|
|
return \array_map(function ($queue) { |
226
|
|
|
return new Queue($queue); |
227
|
|
|
}, \Resque::queues()); |
228
|
|
|
} |
229
|
|
|
|
230
|
|
|
/** |
231
|
|
|
* @param $queue |
232
|
|
|
* @return Queue |
233
|
|
|
*/ |
234
|
|
|
public function getQueue($queue) |
235
|
|
|
{ |
236
|
|
|
return new Queue($queue); |
237
|
|
|
} |
238
|
|
|
|
239
|
|
|
/** |
240
|
|
|
* @return array |
241
|
|
|
*/ |
242
|
|
|
public function getWorkers() |
243
|
|
|
{ |
244
|
|
|
return \array_map(function ($worker) { |
245
|
|
|
return new Worker($worker); |
246
|
|
|
}, \Resque_Worker::all()); |
247
|
|
|
} |
248
|
|
|
|
249
|
|
|
/** |
250
|
|
|
* @param $id |
251
|
|
|
* @return Worker|null |
252
|
|
|
*/ |
253
|
|
|
public function getWorker($id) |
254
|
|
|
{ |
255
|
|
|
$worker = \Resque_Worker::find($id); |
256
|
|
|
|
257
|
|
|
if (!$worker) { |
258
|
|
|
return NULL; |
259
|
|
|
} |
260
|
|
|
|
261
|
|
|
return new Worker($worker); |
262
|
|
|
} |
263
|
|
|
|
264
|
|
|
/** |
265
|
|
|
* @todo - Clean this up, for now, prune dead workers, just in case |
266
|
|
|
*/ |
267
|
|
|
public function pruneDeadWorkers() |
268
|
|
|
{ |
269
|
|
|
$worker = new \Resque_Worker('temp'); |
270
|
|
|
$worker->setLogger(new NullLogger()); |
271
|
|
|
$worker->pruneDeadWorkers(); |
272
|
|
|
} |
273
|
|
|
|
274
|
|
|
/** |
275
|
|
|
* @return array|mixed |
276
|
|
|
*/ |
277
|
|
|
public function getFirstDelayedJobTimestamp() |
278
|
|
|
{ |
279
|
|
|
$timestamps = $this->getDelayedJobTimestamps(); |
280
|
|
|
if (count($timestamps) > 0) { |
281
|
|
|
return $timestamps[0]; |
282
|
|
|
} |
283
|
|
|
|
284
|
|
|
return [NULL, 0]; |
285
|
|
|
} |
286
|
|
|
|
287
|
|
|
/** |
288
|
|
|
* @return array |
289
|
|
|
*/ |
290
|
|
|
public function getDelayedJobTimestamps() |
291
|
|
|
{ |
292
|
|
|
$timestamps = \Resque::redis()->zrange('delayed_queue_schedule', 0, -1); |
293
|
|
|
|
294
|
|
|
//TODO: find a more efficient way to do this |
295
|
|
|
$out = []; |
296
|
|
|
foreach ($timestamps as $timestamp) { |
297
|
|
|
$out[] = [$timestamp, \Resque::redis()->llen('delayed:' . $timestamp)]; |
298
|
|
|
} |
299
|
|
|
|
300
|
|
|
return $out; |
301
|
|
|
} |
302
|
|
|
|
303
|
|
|
/** |
304
|
|
|
* @return mixed |
305
|
|
|
*/ |
306
|
|
|
public function getNumberOfDelayedJobs() |
307
|
|
|
{ |
308
|
|
|
return \ResqueScheduler::getDelayedQueueScheduleSize(); |
309
|
|
|
} |
310
|
|
|
|
311
|
|
|
/** |
312
|
|
|
* @param $timestamp |
313
|
|
|
* @return array |
314
|
|
|
*/ |
315
|
|
View Code Duplication |
public function getJobsForTimestamp($timestamp) |
|
|
|
|
316
|
|
|
{ |
317
|
|
|
$jobs = \Resque::redis()->lrange('delayed:' . $timestamp, 0, -1); |
318
|
|
|
$out = []; |
319
|
|
|
foreach ($jobs as $job) { |
320
|
|
|
$out[] = json_decode($job, TRUE); |
321
|
|
|
} |
322
|
|
|
|
323
|
|
|
return $out; |
324
|
|
|
} |
325
|
|
|
|
326
|
|
|
/** |
327
|
|
|
* @param $queue |
328
|
|
|
* @return int |
329
|
|
|
*/ |
330
|
|
|
public function clearQueue($queue) |
331
|
|
|
{ |
332
|
|
|
$length = \Resque::redis()->llen('queue:' . $queue); |
333
|
|
|
\Resque::redis()->del('queue:' . $queue); |
334
|
|
|
|
335
|
|
|
return $length; |
336
|
|
|
} |
337
|
|
|
|
338
|
|
|
/** |
339
|
|
|
* @param int $start |
340
|
|
|
* @param int $count |
341
|
|
|
* @return array |
342
|
|
|
*/ |
343
|
|
View Code Duplication |
public function getFailedJobs($start = -100, $count = 100) |
|
|
|
|
344
|
|
|
{ |
345
|
|
|
$jobs = \Resque::redis()->lrange('failed', $start, $count); |
346
|
|
|
|
347
|
|
|
$result = []; |
348
|
|
|
|
349
|
|
|
foreach ($jobs as $job) { |
350
|
|
|
$result[] = new FailedJob(json_decode($job, TRUE)); |
351
|
|
|
} |
352
|
|
|
|
353
|
|
|
return $result; |
354
|
|
|
} |
355
|
|
|
} |
356
|
|
|
|
This check looks for type mismatches where the missing type is
false
. This is usually indicative of an error condtion.Consider the follow example
This function either returns a new
DateTime
object or false, if there was an error. This is a typical pattern in PHP programming to show that an error has occurred without raising an exception. The calling code should check for this returnedfalse
before passing on the value to another function or method that may not be able to handle afalse
.