Completed
Push — master ( 58e792...f01f49 )
by Matthew
05:34
created

JobManager   A

Complexity

Total Complexity 17

Size/Duplication

Total Lines 103
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 5

Test Coverage

Coverage 57.41%

Importance

Changes 0
Metric Value
wmc 17
lcom 1
cbo 5
dl 0
loc 103
ccs 31
cts 54
cp 0.5741
rs 10
c 0
b 0
f 0

9 Methods

Rating   Name   Duplication   Size   Complexity  
A setBeanstalkd() 0 4 1
A setTube() 0 4 1
A setReserveTimeout() 0 4 1
A save() 0 18 2
A getBeanJob() 0 4 1
C getJob() 0 32 7
A deleteJob() 0 5 1
A saveHistory() 0 7 2
A getStats() 0 4 1
1
<?php
2
3
namespace Dtc\QueueBundle\Beanstalkd;
4
5
use Dtc\QueueBundle\Model\AbstractJobManager;
6
use Dtc\QueueBundle\Model\Job as BaseJob;
7
use Pheanstalk\Pheanstalk;
8
9
class JobManager extends AbstractJobManager
10
{
11
    const DEFAULT_RESERVE_TIMEOUT = 5; // seconds
12
13
    /** @var Pheanstalk */
14
    protected $beanstalkd;
15
16
    protected $tube;
17
18
    protected $reserveTimeout = self::DEFAULT_RESERVE_TIMEOUT;
19
20
    public function setBeanstalkd(Pheanstalk $beanstalkd)
21
    {
22
        $this->beanstalkd = $beanstalkd;
23
    }
24
25
    public function setTube($tube)
26
    {
27
        $this->tube = $tube;
28
    }
29
30
    public function setReserveTimeout($timeout)
31
    {
32
        $this->reserveTimeout = $timeout;
33
    }
34
35 5
    public function save(\Dtc\QueueBundle\Model\Job $job)
36
    {
37
        /** @var Job $job */
38 5
        $message = $job->toMessage();
39 5
        $arguments = [$message, $job->getPriority(), $job->getDelay(), $job->getTtr()];
40 5
        $method = 'put';
41 5
        if ($this->tube) {
42
            array_unshift($arguments, $this->tube);
43
            $method .= 'InTube';
44
        }
45 5
        $jobId = call_user_func_array([$this->beanstalkd, $method], $arguments);
46 5
        $job->setId($jobId);
47
48
        // Ideally we should get this from beanstalk, but to save the roundtrip time, we do this here
49 5
        $job->setBeanJob($this->getBeanJob($jobId, $message));
50
51 5
        return $job;
52
    }
53
54 5
    public function getBeanJob($jobId, $data)
55
    {
56 5
        return new \Pheanstalk\Job($jobId, $data);
57
    }
58
59 5
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
60
    {
61 5
        if ($methodName) {
62
            throw new \Exception('Unsupported');
63
        }
64
65 5
        $beanstalkd = $this->beanstalkd;
66 5
        if ($this->tube) {
67
            $beanstalkd = $this->beanstalkd->watch($this->tube);
68
        }
69
70 5
        $expiredJob = false;
71
72
        do {
73 5
            $beanJob = $beanstalkd->reserve($this->reserveTimeout);
74 5
            if ($beanJob) {
75 4
                $job = new Job();
76 4
                $job->fromMessage($beanJob->getData());
77 4
                $job->setId($beanJob->getId());
78 4
                $job->setRunId($runId);
79
80 4
                if (($expiresAt = $job->getExpiresAt()) && $expiresAt->getTimestamp() < time()) {
81
                    $expiredJob = true;
82
                    $this->beanstalkd->delete($beanJob);
83
                    continue;
84
                }
85 4
                $job->setBeanJob($beanJob);
86
87 4
                return $job;
88
            }
89 2
        } while ($expiredJob);
90 2
    }
91
92 2
    public function deleteJob(\Dtc\QueueBundle\Model\Job $job)
93
    {
94 2
        $this->beanstalkd
95 2
            ->delete($job);
96 2
    }
97
98
    // Save History get called upon completion of the job
99
    public function saveHistory(\Dtc\QueueBundle\Model\Job $job)
100
    {
101
        if (BaseJob::STATUS_SUCCESS === $job->getStatus()) {
102
            $this->beanstalkd
103
                ->delete($job);
104
        }
105
    }
106
107
    public function getStats()
108
    {
109
        return $this->beanstalkd->stats();
110
    }
111
}
112