Completed
Pull Request — master (#13)
by Marijan
17:08
created

Queue::addForObject()   B

Complexity

Conditions 3
Paths 3

Size

Total Lines 28
Code Lines 16

Duplication

Lines 28
Ratio 100 %

Code Coverage

Tests 20
CRAP Score 3

Importance

Changes 0
Metric Value
dl 28
loc 28
rs 8.8571
c 0
b 0
f 0
ccs 20
cts 20
cp 1
cc 3
eloc 16
nc 3
nop 5
crap 3
1
<?php
2
3
namespace TreeHouse\WorkerBundle;
4
5
use InvalidArgumentException;
6
use Pheanstalk\Exception;
7
use Pheanstalk\Job;
8
use Pheanstalk\PheanstalkInterface;
9
use Psr\Log\LoggerInterface;
10
use Psr\Log\LogLevel;
11
use TreeHouse\WorkerBundle\Executor\ObjectPayloadInterface;
12
13
class Queue
14
{
15
    /**
16
     * @var PheanstalkInterface
17
     */
18
    private $pheanstalk;
19
    /**
20
     * @var LoggerInterface
21
     */
22
    private $logger;
23
    /**
24
     * @var int
25
     */
26
    private $defaultTtr = PheanstalkInterface::DEFAULT_TTR;
27
    /**
28
     * @var ExecutorPool
29
     */
30
    private $executorPool;
31
32
    /**
33
     * @param PheanstalkInterface $pheanstalk
34
     * @param ExecutorPool $executorPool
35
     * @param LoggerInterface $logger
36
     */
37 34
    public function __construct(
38
        PheanstalkInterface $pheanstalk,
39
        ExecutorPool $executorPool,
40
        LoggerInterface $logger
41
    ) {
42 34
        $this->pheanstalk = $pheanstalk;
43 34
        $this->logger = $logger;
44 34
        $this->executorPool = $executorPool;
45 34
    }
46
47
    /**
48
     * @return PheanstalkInterface
49
     *
50
     * @deprecated Removed in next major version
51
     */
52
    public function getPheanstalk()
53
    {
54
        return $this->pheanstalk;
55
    }
56
57
    /**
58
     * @return LoggerInterface
59
     *
60
     * @deprecated Removed in next major version
61
     */
62
    public function getLogger()
63
    {
64
        return $this->logger;
65
    }
66
67
    /**
68
     * @param int $defaultTtr
69
     *
70
     * @return $this
71
     */
72
    public function setDefaultTtr($defaultTtr)
73
    {
74
        $this->defaultTtr = $defaultTtr;
75
76
        return $this;
77
    }
78
79
    /**
80
     * @param string $action
81
     *
82
     * @throws Exception
83
     *
84
     * @return array
85
     */
86 3 View Code Duplication
    public function getActionStats($action)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
87
    {
88
        try {
89 3
            return $this->pheanstalk->statsTube($action);
90 2
        } catch (Exception $exception) {
91 2
            if (false !== strpos($exception->getMessage(), 'NOT_FOUND')) {
92 1
                return null;
93
            }
94
95 1
            throw $exception;
96
        }
97
    }
98
99
    /**
100
     * Add a job to the queue.
101
     *
102
     * @param string $action The action
103
     * @param array $payload The job's payload
104
     * @param string|int $delay The delay after which the job can be reserved.
105
     *                             Can be a number of seconds, or a date-diff
106
     *                             string relative from now, like "10 seconds".
107
     * @param int $priority From 0 (most urgent) to 0xFFFFFFFF (least urgent)
108
     * @param int $ttr Time To Run: seconds a job can be reserved for
109
     *
110
     * @throws InvalidArgumentException When the action is not defined
111
     * @throws InvalidArgumentException When `$delay` or `$priority` is negative
112
     *
113
     * @return int The job id
114
     */
115 9 View Code Duplication
    public function add($action, array $payload, $delay = null, $priority = null, $ttr = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
116
    {
117 9
        if (false === $this->executorPool->hasExecutor($action)) {
118 1
            throw new InvalidArgumentException(
119 1
                sprintf(
120 1
                    'Action "%s" is not defined in QueueManager',
121
                    $action
122 1
                )
123 1
            );
124
        }
125
126 8
        if (null === $delay) {
127 3
            $delay = PheanstalkInterface::DEFAULT_DELAY;
0 ignored issues
show
Coding Style introduced by
Consider using a different name than the parameter $delay. This often makes code more readable.
Loading history...
128 3
        }
129
130 8
        if (null === $priority) {
131 4
            $priority = PheanstalkInterface::DEFAULT_PRIORITY;
0 ignored issues
show
Coding Style introduced by
Consider using a different name than the parameter $priority. This often makes code more readable.
Loading history...
132 4
        }
133
134 8
        if (null === $ttr) {
135 5
            $ttr = $this->defaultTtr;
0 ignored issues
show
Coding Style introduced by
Consider using a different name than the parameter $ttr. This often makes code more readable.
Loading history...
136 5
        }
137
138 8
        if (!is_numeric($delay)) {
139 2
            $delay = strtotime(sprintf('+ %s', $delay)) - time();
0 ignored issues
show
Coding Style introduced by
Consider using a different name than the parameter $delay. This often makes code more readable.
Loading history...
140 2
        }
141
142 8
        if ($delay < 0) {
143 2
            throw new InvalidArgumentException(
144 2
                sprintf('You cannot schedule a job in the past (delay was %d)', $delay)
145 2
            );
146
        }
147
148 6
        if ($priority < 0) {
149 1
            throw new InvalidArgumentException(
150 1
                sprintf('The priority for a job cannot be negative (was %d)', $priority)
151 1
            );
152
        }
153
154 5
        $payload = json_encode($payload);
0 ignored issues
show
Coding Style introduced by
Consider using a different name than the parameter $payload. This often makes code more readable.
Loading history...
155 5
        $jobId = $this->pheanstalk->putInTube($action, $payload, $priority, $delay, $ttr);
156
157 5
        $this->logJob(
158 5
            $jobId,
159 5
            sprintf(
160 5
                'Added job in tube "%s" with: payload: %s, priority: %d, delay: %ds, ttr: %s',
161 5
                $action,
162 5
                $payload,
163 5
                $priority,
164 5
                $delay,
165
                $ttr
166 5
            )
167 5
        );
168
169 5
        return $jobId;
170
    }
171
172
    /**
173
     * Adds a job to the queue for an object.
174
     *
175
     * @param string $action The action
176
     * @param object $object The object to add a job for
177
     * @param string|int $delay The delay after which the job can be reserved.
178
     *                             Can be a number of seconds, or a date-diff
179
     *                             string relative from now, like "10 seconds".
180
     * @param int $priority From 0 (most urgent) to 0xFFFFFFFF (least urgent)
181
     * @param int $ttr Time To Run: seconds a job can be reserved for
182
     *
183
     * @throws \LogicException           If the executor does not accepts objects as payloads
184
     * @throws InvalidArgumentException If the executor does not accept the given object
185
     * @throws InvalidArgumentException When the action is not defined
186
     *
187
     * @return int The job id
188
     */
189 4 View Code Duplication
    public function addForObject($action, $object, $delay = null, $priority = null, $ttr = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
190
    {
191 4
        $executor = $this->executorPool->getExecutor($action);
192
193 4
        if (!$executor instanceof ObjectPayloadInterface) {
194 1
            throw new \LogicException(
195 1
                sprintf(
196 1
                    'The executor for action "%s" cannot be used for objects. Implement the ObjectPayloadInterface in class "%s" to enable this.',
197 1
                    $action,
198 1
                    get_class($executor)
199 1
                )
200 1
            );
201
        }
202
203 3
        if (!$executor->supportsObject($object)) {
204 1
            throw new InvalidArgumentException(
205 1
                sprintf(
206 1
                    'The executor for action "%s" does not support %s objects',
207 1
                    $action,
208 1
                    get_class($object)
209 1
                )
210 1
            );
211
        }
212
213 2
        $payload = $executor->getObjectPayload($object);
214
215 2
        return $this->add($action, $payload, $delay, $priority, $ttr);
216
    }
217
218
    /**
219
     * Reschedules a job.
220
     *
221
     * @param Job $job
222
     * @param \DateTime $date
223
     * @param integer $priority
224
     *
225
     * @throws InvalidArgumentException When `$date` is in the past
226
     */
227 3 View Code Duplication
    public function reschedule(Job $job, \DateTime $date, $priority = PheanstalkInterface::DEFAULT_PRIORITY)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
228
    {
229 3
        if ($date < new \DateTime()) {
230 1
            throw new InvalidArgumentException(
231 1
                sprintf(
232 1
                    'You cannot reschedule a job in the past (got %s, and the current date is %s)',
233 1
                    $date->format(DATE_ISO8601),
234 1
                    date(DATE_ISO8601)
235 1
                )
236 1
            );
237
        }
238
239 2
        $this->pheanstalk->release($job, $priority, $date->getTimestamp() - time());
240
241 2
        $this->logJob($job->getId(), sprintf('Rescheduled job for %s', $date->format('Y-m-d H:i:s')));
242 2
    }
243
244
    /**
245
     * @param string|string[] $actions
246
     */
247 3 View Code Duplication
    public function watch($actions)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
248
    {
249 3
        if (!is_array($actions)) {
250 2
            $actions = [$actions];
0 ignored issues
show
Coding Style introduced by
Consider using a different name than the parameter $actions. This often makes code more readable.
Loading history...
251 2
        }
252
253 3
        foreach ($actions as $action) {
254 3
            $this->pheanstalk->watch($action);
255
256 3
            $this->logger->debug(sprintf('Watching tube "%s"', $action));
257 3
        }
258 3
    }
259
260
    /**
261
     * @param string|string[] $actions
262
     */
263 1
    public function watchOnly($actions)
264
    {
265 1
        $watching = $this->pheanstalk->listTubesWatched();
266
267 1
        $this->watch($actions);
268 1
        $this->ignore($watching);
269 1
    }
270
271
    /**
272
     * @param string|string[] $actions
273
     */
274 2 View Code Duplication
    public function ignore($actions)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
275
    {
276 2
        if (!is_array($actions)) {
277 1
            $actions = [$actions];
0 ignored issues
show
Coding Style introduced by
Consider using a different name than the parameter $actions. This often makes code more readable.
Loading history...
278 1
        }
279
280 2
        foreach ($actions as $action) {
281 2
            $this->pheanstalk->ignore($action);
282
283 2
            $this->logger->debug(sprintf('Ignoring tube "%s"', $action));
284 2
        }
285 2
    }
286
287
    /**
288
     * @param int $timeout
289
     *
290
     * @return Job|bool A job if there is one, false otherwise
291
     */
292 2
    public function get($timeout = null)
293
    {
294 2
        return $this->pheanstalk->reserve($timeout);
295
    }
296
297
    /**
298
     * Inspects the next job from the queue. Note that this does not reserve
299
     * the job, so it will still be given to a worker if/once it's ready.
300
     *
301
     * @param string $action The action to peek
302
     * @param string $state The state to peek for, can be 'ready', 'delayed' or 'buried'
303
     *
304
     * @throws InvalidArgumentException When $action is not a defined action
305
     * @throws InvalidArgumentException When $state is not a valid state
306
     * @throws Exception                 When Pheanstalk decides to do this
307
     *
308
     * @return Job The next job for the given state, or null if there is no next job
309
     */
310 6 View Code Duplication
    public function peek($action, $state = 'ready')
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
311
    {
312 6
        if (false === $this->executorPool->hasExecutor($action)) {
313 1
            throw new InvalidArgumentException(
314 1
                sprintf(
315 1
                    'Action "%s" is not defined in QueueManager',
316
                    $action
317 1
                )
318 1
            );
319
        }
320
321 5
        $states = ['ready', 'delayed', 'buried'];
322
323 5
        if (!in_array($state, $states)) {
324 2
            throw new InvalidArgumentException(
325 2
                sprintf('$state must be one of %s, got %s', json_encode($states), json_encode($state))
326 2
            );
327
        }
328
329 3
        $peekMethod = sprintf('peek%s', ucfirst($state));
330
331
        try {
332 3
            return $this->pheanstalk->$peekMethod($action);
333 1
        } catch (Exception $exception) {
334 1
            if (false !== strpos($exception->getMessage(), 'NOT_FOUND')) {
335 1
                return null;
336
            }
337
338
            throw $exception;
339
        }
340
    }
341
342
    /**
343
     * Permanently deletes a job.
344
     *
345
     * @param Job $job
346
     */
347 2
    public function delete(Job $job)
348
    {
349 2
        $this->pheanstalk->delete($job);
350
351 2
        $this->logJob($job->getId(), 'Job deleted');
352 2
    }
353
354
    /**
355
     * Puts a job into a 'buried' state, revived only by 'kick' command.
356
     *
357
     * @param Job $job
358
     */
359 1
    public function bury(Job $job)
360
    {
361 1
        $this->pheanstalk->bury($job);
362
363 1
        $this->logJob($job->getId(), 'Job buried');
364 1
    }
365
366
    /**
367
     * Puts a job into a 'buried' state, revived only by 'kick' command.
368
     *
369
     * @param string $action
370
     * @param int $max
371
     *
372
     * @return int The number of kicked jobs
373
     */
374 1 View Code Duplication
    public function kick($action, $max)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
375
    {
376 1
        $this->pheanstalk->useTube($action);
377
378 1
        $kicked = $this->pheanstalk->kick($max);
379
380 1
        $this->logger->debug(
381 1
            sprintf('Kicked %d "%s" jobs back onto the ready queue', $kicked, $action)
382 1
        );
383
384 1
        return $kicked;
385
    }
386
387
    /**
388
     * @param Job $job
389
     *
390
     * @return array
391
     */
392 1
    public function getJobStats(Job $job)
393
    {
394 1
        return $this->pheanstalk->statsJob($job);
395
    }
396
397
    /**
398
     * CAUTION: this removes all items from an action's queue.
399
     * This is an irreversible action!
400
     *
401
     * @param string $action
402
     * @param array $states
403
     */
404 2 View Code Duplication
    public function clear($action, array $states = [])
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
405
    {
406 2
        if (empty($states)) {
407
            $states = ['ready', 'delayed', 'buried'];
0 ignored issues
show
Coding Style introduced by
Consider using a different name than the parameter $states. This often makes code more readable.
Loading history...
408
        }
409
410 2
        foreach ($states as $state) {
411 2
            $this->clearTube($action, $state);
412 1
        }
413 1
    }
414
415
    /**
416
     * @param string $tube
417
     * @param string $state
418
     *
419
     * @throws Exception
420
     */
421 2 View Code Duplication
    protected function clearTube($tube, $state = 'ready')
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
422
    {
423 2
        $this->logger->info(sprintf('Clearing all jobs with the "%s" state in tube "%s"', $state, $tube));
424
425 2
        while ($job = $this->peek($tube, $state)) {
426
            try {
427 1
                $this->delete($job);
428 1
            } catch (Exception $e) {
429
                // job could have been deleted by another process
430
                if (false === strpos($e->getMessage(), 'NOT_FOUND')) {
431
                    throw $e;
432
                }
433
            }
434 1
        }
435 1
    }
436
437
    /**
438
     * @param int $jobId
439
     * @param string $msg
440
     * @param string $level
441
     * @param array $context
442
     */
443 10
    private function logJob($jobId, $msg, $level = LogLevel::DEBUG, array $context = [])
444
    {
445 10
        $this->logger->log($level, sprintf('[%s] %s', $jobId, $msg), $context);
446 10
    }
447
}
448