Completed
Push — master ( f5ce61...9abbfe )
by Phil
03:10
created

Resque::retryFailedJobs()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 9
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 9
rs 9.6666
c 0
b 0
f 0
cc 2
eloc 6
nc 2
nop 0
1
<?php
2
3
namespace ResqueBundle\Resque;
4
5
use Psr\Log\NullLogger;
6
7
/**
8
 * Class Resque
9
 * @package ResqueBundle\Resque
10
 */
11
class Resque implements EnqueueInterface
12
{
13
    /**
14
     * @var array
15
     */
16
    private $kernelOptions;
17
18
    /**
19
     * @var array
20
     */
21
    private $redisConfiguration;
22
23
    /**
24
     * @var array
25
     */
26
    private $globalRetryStrategy = [];
27
28
    /**
29
     * @var array
30
     */
31
    private $jobRetryStrategy = [];
32
33
    /**
34
     * Resque constructor.
35
     * @param array $kernelOptions
36
     */
37
    public function __construct(array $kernelOptions)
38
    {
39
        $this->kernelOptions = $kernelOptions;
40
    }
41
42
    /**
43
     * @param $prefix
44
     */
45
    public function setPrefix($prefix)
46
    {
47
        \Resque_Redis::prefix($prefix);
48
    }
49
50
    /**
51
     * @param $strategy
52
     */
53
    public function setGlobalRetryStrategy($strategy)
54
    {
55
        $this->globalRetryStrategy = $strategy;
56
    }
57
58
    /**
59
     * @param $strategy
60
     */
61
    public function setJobRetryStrategy($strategy)
62
    {
63
        $this->jobRetryStrategy = $strategy;
64
    }
65
66
    /**
67
     * @return array
68
     */
69
    public function getRedisConfiguration()
70
    {
71
        return $this->redisConfiguration;
72
    }
73
74
    /**
75
     * @param $host
76
     * @param $port
77
     * @param $database
78
     */
79
    public function setRedisConfiguration($host, $port, $database)
80
    {
81
        $this->redisConfiguration = [
82
            'host'     => $host,
83
            'port'     => $port,
84
            'database' => $database,
85
        ];
86
        $host = substr($host, 0, 1) == '/' ? $host : $host . ':' . $port;
87
88
        \Resque::setBackend($host, $database);
89
    }
90
91
    /**
92
     * @param Job $job
93
     * @param bool $trackStatus
94
     * @return null|\Resque_Job_Status
95
     */
96
    public function enqueueOnce(Job $job, $trackStatus = FALSE)
97
    {
98
        $queue = new Queue($job->queue);
99
        $jobs = $queue->getJobs();
100
101
        foreach ($jobs AS $j) {
102
            if ($j->job->payload['class'] == get_class($job)) {
103
                if (count(array_intersect($j->args, $job->args)) == count($job->args)) {
104
                    return ($trackStatus) ? $j->job->payload['id'] : NULL;
105
                }
106
            }
107
        }
108
109
        return $this->enqueue($job, $trackStatus);
110
    }
111
112
    /**
113
     * @param Job $job
114
     * @param bool $trackStatus
115
     * @return null|\Resque_Job_Status
116
     */
117
    public function enqueue(Job $job, $trackStatus = FALSE)
118
    {
119
        if ($job instanceof ContainerAwareJob) {
120
            $job->setKernelOptions($this->kernelOptions);
121
        }
122
123
        $this->attachRetryStrategy($job);
124
125
        $result = \Resque::enqueue($job->queue, \get_class($job), $job->args, $trackStatus);
126
127
        if ($trackStatus && $result !== FALSE) {
128
            return new \Resque_Job_Status($result);
129
        }
130
131
        return NULL;
132
    }
133
134
    /**
135
     * Attach any applicable retry strategy to the job.
136
     *
137
     * @param Job $job
138
     */
139
    protected function attachRetryStrategy($job)
140
    {
141
        $class = get_class($job);
142
143
        if (isset($this->jobRetryStrategy[$class])) {
144
            if (count($this->jobRetryStrategy[$class])) {
145
                $job->args['resque.retry_strategy'] = $this->jobRetryStrategy[$class];
146
            }
147
            $job->args['resque.retry_strategy'] = $this->jobRetryStrategy[$class];
148
        } elseif (count($this->globalRetryStrategy)) {
149
            $job->args['resque.retry_strategy'] = $this->globalRetryStrategy;
150
        }
151
    }
152
153
    /**
154
     * @param $at
155
     * @param Job $job
156
     * @return null
157
     */
158 View Code Duplication
    public function enqueueAt($at, Job $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
159
    {
160
        if ($job instanceof ContainerAwareJob) {
161
            $job->setKernelOptions($this->kernelOptions);
162
        }
163
164
        $this->attachRetryStrategy($job);
165
166
        \ResqueScheduler::enqueueAt($at, $job->queue, \get_class($job), $job->args);
167
168
        return NULL;
169
    }
170
171
    /**
172
     * @param $in
173
     * @param Job $job
174
     * @return null
175
     */
176 View Code Duplication
    public function enqueueIn($in, Job $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
177
    {
178
        if ($job instanceof ContainerAwareJob) {
179
            $job->setKernelOptions($this->kernelOptions);
180
        }
181
182
        $this->attachRetryStrategy($job);
183
184
        \ResqueScheduler::enqueueIn($in, $job->queue, \get_class($job), $job->args);
185
186
        return NULL;
187
    }
188
189
    /**
190
     * @param Job $job
191
     * @return mixed
192
     */
193 View Code Duplication
    public function removedDelayed(Job $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
194
    {
195
        if ($job instanceof ContainerAwareJob) {
196
            $job->setKernelOptions($this->kernelOptions);
197
        }
198
199
        $this->attachRetryStrategy($job);
200
201
        return \ResqueScheduler::removeDelayed($job->queue, \get_class($job), $job->args);
202
    }
203
204
    /**
205
     * @param $at
206
     * @param Job $job
207
     * @return mixed
208
     */
209 View Code Duplication
    public function removeFromTimestamp($at, Job $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
210
    {
211
        if ($job instanceof ContainerAwareJob) {
212
            $job->setKernelOptions($this->kernelOptions);
213
        }
214
215
        $this->attachRetryStrategy($job);
216
217
        return \ResqueScheduler::removeDelayedJobFromTimestamp($at, $job->queue, \get_class($job), $job->args);
218
    }
219
220
    /**
221
     * @return array
222
     */
223
    public function getQueues()
224
    {
225
        return \array_map(function($queue) {
226
            return new Queue($queue);
227
        }, \Resque::queues());
228
    }
229
230
    /**
231
     * @param $queue
232
     * @return Queue
233
     */
234
    public function getQueue($queue)
235
    {
236
        return new Queue($queue);
237
    }
238
239
    /**
240
     * @return Worker[]
241
     */
242
    public function getWorkers()
243
    {
244
        return \array_map(function($worker) {
245
            return new Worker($worker);
246
        }, \Resque_Worker::all());
247
    }
248
249
    /**
250
     * @return Worker[]
251
     */
252
    public function getRunningWorkers()
253
    {
254
        return array_filter($this->getWorkers(), function (Worker $worker) {
255
            return $worker->getCurrentJob() !== null;
256
        });
257
    }
258
259
    /**
260
     * @param $id
261
     * @return Worker|null
262
     */
263
    public function getWorker($id)
264
    {
265
        $worker = \Resque_Worker::find($id);
266
267
        if (!$worker) {
268
            return NULL;
269
        }
270
271
        return new Worker($worker);
272
    }
273
274
    /**
275
     * @return int
276
     */
277
    public function getNumberOfWorkers()
278
    {
279
        return \Resque::redis()->scard('workers');
280
    }
281
282
    /**
283
     * @return int
284
     */
285
    public function getNumberOfWorkingWorkers()
286
    {
287
        $count = 0;
288
        foreach ($this->getWorkers() as $worker) {
289
            if ($worker->getCurrentJob() !== null) {
290
                $count++;
291
            }
292
        }
293
294
        return $count;
295
    }
296
297
    /**
298
     * @todo - Clean this up, for now, prune dead workers, just in case
299
     */
300
    public function pruneDeadWorkers()
301
    {
302
        $worker = new \Resque_Worker('temp');
303
        $worker->setLogger(new NullLogger());
304
        $worker->pruneDeadWorkers();
305
    }
306
307
    /**
308
     * @return array|mixed
309
     */
310
    public function getFirstDelayedJobTimestamp()
311
    {
312
        $timestamps = $this->getDelayedJobTimestamps();
313
        if (count($timestamps) > 0) {
314
            return $timestamps[0];
315
        }
316
317
        return [NULL, 0];
318
    }
319
320
    /**
321
     * @return array
322
     */
323
    public function getDelayedJobTimestamps()
324
    {
325
        $timestamps = \Resque::redis()->zrange('delayed_queue_schedule', 0, -1);
326
327
        //TODO: find a more efficient way to do this
328
        $out = [];
329
        foreach ($timestamps as $timestamp) {
330
            $out[] = [$timestamp, \Resque::redis()->llen('delayed:' . $timestamp)];
331
        }
332
333
        return $out;
334
    }
335
336
    /**
337
     * @return mixed
338
     */
339
    public function getNumberOfDelayedJobs()
340
    {
341
        return \ResqueScheduler::getDelayedQueueScheduleSize();
342
    }
343
344
    /**
345
     * @param $timestamp
346
     * @return array
347
     */
348 View Code Duplication
    public function getJobsForTimestamp($timestamp)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
349
    {
350
        $jobs = \Resque::redis()->lrange('delayed:' . $timestamp, 0, -1);
351
        $out = [];
352
        foreach ($jobs as $job) {
353
            $out[] = json_decode($job, TRUE);
354
        }
355
356
        return $out;
357
    }
358
359
    /**
360
     * @param $queue
361
     * @return int
362
     */
363
    public function clearQueue($queue)
364
    {
365
        return $this->getQueue($queue)->clear();
366
    }
367
368
    /**
369
     * @param int $start
370
     * @param int $count
371
     * @return array
372
     */
373 View Code Duplication
    public function getFailedJobs($start = -100, $count = 100)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
374
    {
375
        $jobs = \Resque::redis()->lrange('failed', $start, $count);
376
377
        $result = [];
378
379
        foreach ($jobs as $job) {
380
            $result[] = new FailedJob(json_decode($job, TRUE));
381
        }
382
383
        return $result;
384
    }
385
386
    /**
387
     * @return int
388
     */
389
    public function getNumberOfFailedJobs()
390
    {
391
        return \Resque::redis()->llen('failed');
392
    }
393
394
    /**
395
     * @return int
396
     */
397
    public function retryFailedJobs()
398
    {
399
        $jobs = \Resque::redis()->lrange('failed', 0, -1);
400
        foreach ($jobs as $job) {
401
            $failedJob = new FailedJob(json_decode($job, true));
402
            \Resque::enqueue($failedJob->getQueueName(), $failedJob->getName(), $failedJob->getArgs()[0]);
403
        }
404
        return count($jobs);
405
    }
406
407
    /**
408
     * @return int
409
     */
410
    public function clearFailedJobs()
411
    {
412
        $length = \Resque::redis()->llen('failed');
413
        if ($length > 0) {
414
            \Resque::redis()->del('failed');
415
        }
416
417
        return $length;
418
    }
419
}
420