Completed
Push — master ( bb64d4...c4393f )
by Phil
02:36
created

Resque.php (1 issue)

Upgrade to new PHP Analysis Engine

These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more

1
<?php
2
3
namespace ResqueBundle\Resque;
4
5
use Psr\Log\NullLogger;
6
7
/**
8
 * Class Resque
9
 * @package ResqueBundle\Resque
10
 */
11
class Resque implements EnqueueInterface
12
{
13
    /**
14
     * @var array
15
     */
16
    private $kernelOptions;
17
18
    /**
19
     * @var array
20
     */
21
    private $redisConfiguration;
22
23
    /**
24
     * @var array
25
     */
26
    private $globalRetryStrategy = [];
27
28
    /**
29
     * @var array
30
     */
31
    private $jobRetryStrategy = [];
32
33
    /**
34
     * Resque constructor.
35
     * @param array $kernelOptions
36
     */
37
    public function __construct(array $kernelOptions)
38
    {
39
        $this->kernelOptions = $kernelOptions;
40
    }
41
42
    /**
43
     * @param $prefix
44
     */
45
    public function setPrefix($prefix)
46
    {
47
        \Resque_Redis::prefix($prefix);
48
    }
49
50
    /**
51
     * @param $strategy
52
     */
53
    public function setGlobalRetryStrategy($strategy)
54
    {
55
        $this->globalRetryStrategy = $strategy;
56
    }
57
58
    /**
59
     * @param $strategy
60
     */
61
    public function setJobRetryStrategy($strategy)
62
    {
63
        $this->jobRetryStrategy = $strategy;
64
    }
65
66
    /**
67
     * @return array
68
     */
69
    public function getRedisConfiguration()
70
    {
71
        return $this->redisConfiguration;
72
    }
73
74
    /**
75
     * @param $host
76
     * @param $port
77
     * @param $database
78
     */
79
    public function setRedisConfiguration($host, $port, $database)
80
    {
81
        $this->redisConfiguration = [
82
            'host'     => $host,
83
            'port'     => $port,
84
            'database' => $database,
85
        ];
86
        $host = substr($host, 0, 1) == '/' ? $host : $host . ':' . $port;
87
88
        \Resque::setBackend($host, $database);
89
    }
90
91
    /**
92
     * @param Job $job
93
     * @param bool $trackStatus
94
     * @return null|\Resque_Job_Status
95
     */
96
    public function enqueueOnce(Job $job, $trackStatus = FALSE)
97
    {
98
        $queue = new Queue($job->queue);
99
        $jobs = $queue->getJobs();
100
101
        foreach ($jobs AS $j) {
102
            if ($j->job->payload['class'] == get_class($job)) {
103
                if (count(array_intersect($j->args, $job->args)) == count($job->args)) {
104
                    return ($trackStatus) ? $j->job->payload['id'] : NULL;
105
                }
106
            }
107
        }
108
109
        return $this->enqueue($job, $trackStatus);
110
    }
111
112
    /**
113
     * @param Job $job
114
     * @param bool $trackStatus
115
     * @return null|\Resque_Job_Status
116
     */
117
    public function enqueue(Job $job, $trackStatus = FALSE)
118
    {
119
        if ($job instanceof ContainerAwareJob) {
120
            $job->setKernelOptions($this->kernelOptions);
121
        }
122
123
        $this->attachRetryStrategy($job);
124
125
        $result = \Resque::enqueue($job->queue, \get_class($job), $job->args, $trackStatus);
126
127
        if ($trackStatus && $result) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $result of type false|string is loosely compared to true; this is ambiguous if the string can be empty. You might want to explicitly use !== false instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For string values, the empty string '' is a special case, in particular the following results might be unexpected:

''   == false // true
''   == null  // true
'ab' == false // false
'ab' == null  // false

// It is often better to use strict comparison
'' === false // false
'' === null  // false
Loading history...
128
            return new \Resque_Job_Status($result);
129
        }
130
131
        return NULL;
132
    }
133
134
    /**
135
     * Attach any applicable retry strategy to the job.
136
     *
137
     * @param Job $job
138
     */
139
    protected function attachRetryStrategy($job)
140
    {
141
        $class = get_class($job);
142
143
        if (isset($this->jobRetryStrategy[$class])) {
144
            if (count($this->jobRetryStrategy[$class])) {
145
                $job->args['resque.retry_strategy'] = $this->jobRetryStrategy[$class];
146
            }
147
            $job->args['resque.retry_strategy'] = $this->jobRetryStrategy[$class];
148
        } elseif (count($this->globalRetryStrategy)) {
149
            $job->args['resque.retry_strategy'] = $this->globalRetryStrategy;
150
        }
151
    }
152
153
    /**
154
     * @param $at
155
     * @param Job $job
156
     * @return null
157
     */
158 View Code Duplication
    public function enqueueAt($at, Job $job)
159
    {
160
        if ($job instanceof ContainerAwareJob) {
161
            $job->setKernelOptions($this->kernelOptions);
162
        }
163
164
        $this->attachRetryStrategy($job);
165
166
        \ResqueScheduler::enqueueAt($at, $job->queue, \get_class($job), $job->args);
167
168
        return NULL;
169
    }
170
171
    /**
172
     * @param $in
173
     * @param Job $job
174
     * @return null
175
     */
176 View Code Duplication
    public function enqueueIn($in, Job $job)
177
    {
178
        if ($job instanceof ContainerAwareJob) {
179
            $job->setKernelOptions($this->kernelOptions);
180
        }
181
182
        $this->attachRetryStrategy($job);
183
184
        \ResqueScheduler::enqueueIn($in, $job->queue, \get_class($job), $job->args);
185
186
        return NULL;
187
    }
188
189
    /**
190
     * @param Job $job
191
     * @return mixed
192
     */
193 View Code Duplication
    public function removedDelayed(Job $job)
194
    {
195
        if ($job instanceof ContainerAwareJob) {
196
            $job->setKernelOptions($this->kernelOptions);
197
        }
198
199
        $this->attachRetryStrategy($job);
200
201
        return \ResqueScheduler::removeDelayed($job->queue, \get_class($job), $job->args);
202
    }
203
204
    /**
205
     * @param $at
206
     * @param Job $job
207
     * @return mixed
208
     */
209 View Code Duplication
    public function removeFromTimestamp($at, Job $job)
210
    {
211
        if ($job instanceof ContainerAwareJob) {
212
            $job->setKernelOptions($this->kernelOptions);
213
        }
214
215
        $this->attachRetryStrategy($job);
216
217
        return \ResqueScheduler::removeDelayedJobFromTimestamp($at, $job->queue, \get_class($job), $job->args);
218
    }
219
220
    /**
221
     * @return array
222
     */
223
    public function getQueues()
224
    {
225
        return \array_map(function ($queue) {
226
            return new Queue($queue);
227
        }, \Resque::queues());
228
    }
229
230
    /**
231
     * @param $queue
232
     * @return Queue
233
     */
234
    public function getQueue($queue)
235
    {
236
        return new Queue($queue);
237
    }
238
239
    /**
240
     * @return array
241
     */
242
    public function getWorkers()
243
    {
244
        return \array_map(function ($worker) {
245
            return new Worker($worker);
246
        }, \Resque_Worker::all());
247
    }
248
249
    /**
250
     * @param $id
251
     * @return Worker|null
252
     */
253
    public function getWorker($id)
254
    {
255
        $worker = \Resque_Worker::find($id);
256
257
        if (!$worker) {
258
            return NULL;
259
        }
260
261
        return new Worker($worker);
262
    }
263
264
    /**
265
     * @todo - Clean this up, for now, prune dead workers, just in case
266
     */
267
    public function pruneDeadWorkers()
268
    {
269
        $worker = new \Resque_Worker('temp');
270
        $worker->setLogger(new NullLogger());
271
        $worker->pruneDeadWorkers();
272
    }
273
274
    /**
275
     * @return array|mixed
276
     */
277
    public function getFirstDelayedJobTimestamp()
278
    {
279
        $timestamps = $this->getDelayedJobTimestamps();
280
        if (count($timestamps) > 0) {
281
            return $timestamps[0];
282
        }
283
284
        return [NULL, 0];
285
    }
286
287
    /**
288
     * @return array
289
     */
290
    public function getDelayedJobTimestamps()
291
    {
292
        $timestamps = \Resque::redis()->zrange('delayed_queue_schedule', 0, -1);
293
294
        //TODO: find a more efficient way to do this
295
        $out = [];
296
        foreach ($timestamps as $timestamp) {
297
            $out[] = [$timestamp, \Resque::redis()->llen('delayed:' . $timestamp)];
298
        }
299
300
        return $out;
301
    }
302
303
    /**
304
     * @return mixed
305
     */
306
    public function getNumberOfDelayedJobs()
307
    {
308
        return \ResqueScheduler::getDelayedQueueScheduleSize();
309
    }
310
311
    /**
312
     * @param $timestamp
313
     * @return array
314
     */
315 View Code Duplication
    public function getJobsForTimestamp($timestamp)
316
    {
317
        $jobs = \Resque::redis()->lrange('delayed:' . $timestamp, 0, -1);
318
        $out = [];
319
        foreach ($jobs as $job) {
320
            $out[] = json_decode($job, TRUE);
321
        }
322
323
        return $out;
324
    }
325
326
    /**
327
     * @param $queue
328
     * @return int
329
     */
330
    public function clearQueue($queue)
331
    {
332
        $length = \Resque::redis()->llen('queue:' . $queue);
333
        \Resque::redis()->del('queue:' . $queue);
334
335
        return $length;
336
    }
337
338
    /**
339
     * @param int $start
340
     * @param int $count
341
     * @return array
342
     */
343 View Code Duplication
    public function getFailedJobs($start = -100, $count = 100)
344
    {
345
        $jobs = \Resque::redis()->lrange('failed', $start, $count);
346
347
        $result = [];
348
349
        foreach ($jobs as $job) {
350
            $result[] = new FailedJob(json_decode($job, TRUE));
351
        }
352
353
        return $result;
354
    }
355
}
356