resquebundle /
resque
These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more
| 1 | <?php |
||
| 2 | |||
| 3 | namespace ResqueBundle\Resque; |
||
| 4 | |||
| 5 | use Psr\Log\NullLogger; |
||
| 6 | |||
| 7 | /** |
||
| 8 | * Class Resque |
||
| 9 | * @package ResqueBundle\Resque |
||
| 10 | */ |
||
| 11 | class Resque implements EnqueueInterface |
||
| 12 | { |
||
| 13 | /** |
||
| 14 | * @var array |
||
| 15 | */ |
||
| 16 | private $kernelOptions; |
||
| 17 | |||
| 18 | /** |
||
| 19 | * @var array |
||
| 20 | */ |
||
| 21 | private $redisConfiguration; |
||
| 22 | |||
| 23 | /** |
||
| 24 | * @var array |
||
| 25 | */ |
||
| 26 | private $globalRetryStrategy = []; |
||
| 27 | |||
| 28 | /** |
||
| 29 | * @var array |
||
| 30 | */ |
||
| 31 | private $jobRetryStrategy = []; |
||
| 32 | |||
| 33 | /** |
||
| 34 | * Resque constructor. |
||
| 35 | * @param array $kernelOptions |
||
| 36 | */ |
||
| 37 | public function __construct(array $kernelOptions) |
||
| 38 | { |
||
| 39 | $this->kernelOptions = $kernelOptions; |
||
| 40 | } |
||
| 41 | |||
| 42 | /** |
||
| 43 | * @param $prefix |
||
| 44 | */ |
||
| 45 | public function setPrefix($prefix) |
||
| 46 | { |
||
| 47 | \Resque_Redis::prefix($prefix); |
||
| 48 | } |
||
| 49 | |||
| 50 | /** |
||
| 51 | * @param $strategy |
||
| 52 | */ |
||
| 53 | public function setGlobalRetryStrategy($strategy) |
||
| 54 | { |
||
| 55 | $this->globalRetryStrategy = $strategy; |
||
| 56 | } |
||
| 57 | |||
| 58 | /** |
||
| 59 | * @param $strategy |
||
| 60 | */ |
||
| 61 | public function setJobRetryStrategy($strategy) |
||
| 62 | { |
||
| 63 | $this->jobRetryStrategy = $strategy; |
||
| 64 | } |
||
| 65 | |||
| 66 | /** |
||
| 67 | * @return array |
||
| 68 | */ |
||
| 69 | public function getRedisConfiguration() |
||
| 70 | { |
||
| 71 | return $this->redisConfiguration; |
||
| 72 | } |
||
| 73 | |||
| 74 | /** |
||
| 75 | * @param $host |
||
| 76 | * @param $port |
||
| 77 | * @param $database |
||
| 78 | */ |
||
| 79 | public function setRedisConfiguration($host, $port, $database) |
||
| 80 | { |
||
| 81 | $this->redisConfiguration = [ |
||
| 82 | 'host' => $host, |
||
| 83 | 'port' => $port, |
||
| 84 | 'database' => $database, |
||
| 85 | ]; |
||
| 86 | $host = substr($host, 0, 1) == '/' ? $host : $host . ':' . $port; |
||
| 87 | |||
| 88 | \Resque::setBackend($host, $database); |
||
| 89 | } |
||
| 90 | |||
| 91 | /** |
||
| 92 | * @param Job $job |
||
| 93 | * @param bool $trackStatus |
||
| 94 | * @return null|\Resque_Job_Status |
||
| 95 | */ |
||
| 96 | public function enqueueOnce(Job $job, $trackStatus = FALSE) |
||
| 97 | { |
||
| 98 | $queue = new Queue($job->queue); |
||
| 99 | $jobs = $queue->getJobs(); |
||
| 100 | |||
| 101 | foreach ($jobs AS $j) { |
||
| 102 | if ($j->job->payload['class'] == get_class($job)) { |
||
| 103 | if (count(array_intersect($j->args, $job->args)) == count($job->args)) { |
||
| 104 | return ($trackStatus) ? $j->job->payload['id'] : NULL; |
||
| 105 | } |
||
| 106 | } |
||
| 107 | } |
||
| 108 | |||
| 109 | return $this->enqueue($job, $trackStatus); |
||
| 110 | } |
||
| 111 | |||
| 112 | /** |
||
| 113 | * @param Job $job |
||
| 114 | * @param bool $trackStatus |
||
| 115 | * @return null|\Resque_Job_Status |
||
| 116 | */ |
||
| 117 | public function enqueue(Job $job, $trackStatus = FALSE) |
||
| 118 | { |
||
| 119 | if ($job instanceof ContainerAwareJob) { |
||
| 120 | $job->setKernelOptions($this->kernelOptions); |
||
| 121 | } |
||
| 122 | |||
| 123 | $this->attachRetryStrategy($job); |
||
| 124 | |||
| 125 | $result = \Resque::enqueue($job->queue, \get_class($job), $job->args, $trackStatus); |
||
| 126 | |||
| 127 | if ($trackStatus && $result) { |
||
|
0 ignored issues
–
show
|
|||
| 128 | return new \Resque_Job_Status($result); |
||
| 129 | } |
||
| 130 | |||
| 131 | return NULL; |
||
| 132 | } |
||
| 133 | |||
| 134 | /** |
||
| 135 | * Attach any applicable retry strategy to the job. |
||
| 136 | * |
||
| 137 | * @param Job $job |
||
| 138 | */ |
||
| 139 | protected function attachRetryStrategy($job) |
||
| 140 | { |
||
| 141 | $class = get_class($job); |
||
| 142 | |||
| 143 | if (isset($this->jobRetryStrategy[$class])) { |
||
| 144 | if (count($this->jobRetryStrategy[$class])) { |
||
| 145 | $job->args['resque.retry_strategy'] = $this->jobRetryStrategy[$class]; |
||
| 146 | } |
||
| 147 | $job->args['resque.retry_strategy'] = $this->jobRetryStrategy[$class]; |
||
| 148 | } elseif (count($this->globalRetryStrategy)) { |
||
| 149 | $job->args['resque.retry_strategy'] = $this->globalRetryStrategy; |
||
| 150 | } |
||
| 151 | } |
||
| 152 | |||
| 153 | /** |
||
| 154 | * @param $at |
||
| 155 | * @param Job $job |
||
| 156 | * @return null |
||
| 157 | */ |
||
| 158 | View Code Duplication | public function enqueueAt($at, Job $job) |
|
| 159 | { |
||
| 160 | if ($job instanceof ContainerAwareJob) { |
||
| 161 | $job->setKernelOptions($this->kernelOptions); |
||
| 162 | } |
||
| 163 | |||
| 164 | $this->attachRetryStrategy($job); |
||
| 165 | |||
| 166 | \ResqueScheduler::enqueueAt($at, $job->queue, \get_class($job), $job->args); |
||
| 167 | |||
| 168 | return NULL; |
||
| 169 | } |
||
| 170 | |||
| 171 | /** |
||
| 172 | * @param $in |
||
| 173 | * @param Job $job |
||
| 174 | * @return null |
||
| 175 | */ |
||
| 176 | View Code Duplication | public function enqueueIn($in, Job $job) |
|
| 177 | { |
||
| 178 | if ($job instanceof ContainerAwareJob) { |
||
| 179 | $job->setKernelOptions($this->kernelOptions); |
||
| 180 | } |
||
| 181 | |||
| 182 | $this->attachRetryStrategy($job); |
||
| 183 | |||
| 184 | \ResqueScheduler::enqueueIn($in, $job->queue, \get_class($job), $job->args); |
||
| 185 | |||
| 186 | return NULL; |
||
| 187 | } |
||
| 188 | |||
| 189 | /** |
||
| 190 | * @param Job $job |
||
| 191 | * @return mixed |
||
| 192 | */ |
||
| 193 | View Code Duplication | public function removedDelayed(Job $job) |
|
| 194 | { |
||
| 195 | if ($job instanceof ContainerAwareJob) { |
||
| 196 | $job->setKernelOptions($this->kernelOptions); |
||
| 197 | } |
||
| 198 | |||
| 199 | $this->attachRetryStrategy($job); |
||
| 200 | |||
| 201 | return \ResqueScheduler::removeDelayed($job->queue, \get_class($job), $job->args); |
||
| 202 | } |
||
| 203 | |||
| 204 | /** |
||
| 205 | * @param $at |
||
| 206 | * @param Job $job |
||
| 207 | * @return mixed |
||
| 208 | */ |
||
| 209 | View Code Duplication | public function removeFromTimestamp($at, Job $job) |
|
| 210 | { |
||
| 211 | if ($job instanceof ContainerAwareJob) { |
||
| 212 | $job->setKernelOptions($this->kernelOptions); |
||
| 213 | } |
||
| 214 | |||
| 215 | $this->attachRetryStrategy($job); |
||
| 216 | |||
| 217 | return \ResqueScheduler::removeDelayedJobFromTimestamp($at, $job->queue, \get_class($job), $job->args); |
||
| 218 | } |
||
| 219 | |||
| 220 | /** |
||
| 221 | * @return array |
||
| 222 | */ |
||
| 223 | public function getQueues() |
||
| 224 | { |
||
| 225 | return \array_map(function ($queue) { |
||
| 226 | return new Queue($queue); |
||
| 227 | }, \Resque::queues()); |
||
| 228 | } |
||
| 229 | |||
| 230 | /** |
||
| 231 | * @param $queue |
||
| 232 | * @return Queue |
||
| 233 | */ |
||
| 234 | public function getQueue($queue) |
||
| 235 | { |
||
| 236 | return new Queue($queue); |
||
| 237 | } |
||
| 238 | |||
| 239 | /** |
||
| 240 | * @return array |
||
| 241 | */ |
||
| 242 | public function getWorkers() |
||
| 243 | { |
||
| 244 | return \array_map(function ($worker) { |
||
| 245 | return new Worker($worker); |
||
| 246 | }, \Resque_Worker::all()); |
||
| 247 | } |
||
| 248 | |||
| 249 | /** |
||
| 250 | * @param $id |
||
| 251 | * @return Worker|null |
||
| 252 | */ |
||
| 253 | public function getWorker($id) |
||
| 254 | { |
||
| 255 | $worker = \Resque_Worker::find($id); |
||
| 256 | |||
| 257 | if (!$worker) { |
||
| 258 | return NULL; |
||
| 259 | } |
||
| 260 | |||
| 261 | return new Worker($worker); |
||
| 262 | } |
||
| 263 | |||
| 264 | /** |
||
| 265 | * @todo - Clean this up, for now, prune dead workers, just in case |
||
| 266 | */ |
||
| 267 | public function pruneDeadWorkers() |
||
| 268 | { |
||
| 269 | $worker = new \Resque_Worker('temp'); |
||
| 270 | $worker->setLogger(new NullLogger()); |
||
| 271 | $worker->pruneDeadWorkers(); |
||
| 272 | } |
||
| 273 | |||
| 274 | /** |
||
| 275 | * @return array|mixed |
||
| 276 | */ |
||
| 277 | public function getFirstDelayedJobTimestamp() |
||
| 278 | { |
||
| 279 | $timestamps = $this->getDelayedJobTimestamps(); |
||
| 280 | if (count($timestamps) > 0) { |
||
| 281 | return $timestamps[0]; |
||
| 282 | } |
||
| 283 | |||
| 284 | return [NULL, 0]; |
||
| 285 | } |
||
| 286 | |||
| 287 | /** |
||
| 288 | * @return array |
||
| 289 | */ |
||
| 290 | public function getDelayedJobTimestamps() |
||
| 291 | { |
||
| 292 | $timestamps = \Resque::redis()->zrange('delayed_queue_schedule', 0, -1); |
||
| 293 | |||
| 294 | //TODO: find a more efficient way to do this |
||
| 295 | $out = []; |
||
| 296 | foreach ($timestamps as $timestamp) { |
||
| 297 | $out[] = [$timestamp, \Resque::redis()->llen('delayed:' . $timestamp)]; |
||
| 298 | } |
||
| 299 | |||
| 300 | return $out; |
||
| 301 | } |
||
| 302 | |||
| 303 | /** |
||
| 304 | * @return mixed |
||
| 305 | */ |
||
| 306 | public function getNumberOfDelayedJobs() |
||
| 307 | { |
||
| 308 | return \ResqueScheduler::getDelayedQueueScheduleSize(); |
||
| 309 | } |
||
| 310 | |||
| 311 | /** |
||
| 312 | * @param $timestamp |
||
| 313 | * @return array |
||
| 314 | */ |
||
| 315 | View Code Duplication | public function getJobsForTimestamp($timestamp) |
|
| 316 | { |
||
| 317 | $jobs = \Resque::redis()->lrange('delayed:' . $timestamp, 0, -1); |
||
| 318 | $out = []; |
||
| 319 | foreach ($jobs as $job) { |
||
| 320 | $out[] = json_decode($job, TRUE); |
||
| 321 | } |
||
| 322 | |||
| 323 | return $out; |
||
| 324 | } |
||
| 325 | |||
| 326 | /** |
||
| 327 | * @param $queue |
||
| 328 | * @return int |
||
| 329 | */ |
||
| 330 | public function clearQueue($queue) |
||
| 331 | { |
||
| 332 | $length = \Resque::redis()->llen('queue:' . $queue); |
||
| 333 | \Resque::redis()->del('queue:' . $queue); |
||
| 334 | |||
| 335 | return $length; |
||
| 336 | } |
||
| 337 | |||
| 338 | /** |
||
| 339 | * @param int $start |
||
| 340 | * @param int $count |
||
| 341 | * @return array |
||
| 342 | */ |
||
| 343 | View Code Duplication | public function getFailedJobs($start = -100, $count = 100) |
|
| 344 | { |
||
| 345 | $jobs = \Resque::redis()->lrange('failed', $start, $count); |
||
| 346 | |||
| 347 | $result = []; |
||
| 348 | |||
| 349 | foreach ($jobs as $job) { |
||
| 350 | $result[] = new FailedJob(json_decode($job, TRUE)); |
||
| 351 | } |
||
| 352 | |||
| 353 | return $result; |
||
| 354 | } |
||
| 355 | } |
||
| 356 |
In PHP, under loose comparison (like
==, or!=, orswitchconditions), values of different types might be equal.For
stringvalues, the empty string''is a special case, in particular the following results might be unexpected: