Completed
Push — master ( 4109b7...859c70 )
by Matthew
08:07 queued 02:26
created

JobManager   A

Complexity

Total Complexity 19

Size/Duplication

Total Lines 132
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 6

Test Coverage

Coverage 66.13%

Importance

Changes 0
Metric Value
wmc 19
lcom 1
cbo 6
dl 0
loc 132
ccs 41
cts 62
cp 0.6613
rs 10
c 0
b 0
f 0

10 Methods

Rating   Name   Duplication   Size   Complexity  
A setBeanstalkd() 0 4 1
A setTube() 0 4 1
A setReserveTimeout() 0 4 1
A getBeanJob() 0 4 1
B findJob() 0 22 4
A deleteJob() 0 5 1
A getStats() 0 4 1
A save() 0 18 2
B getJob() 0 21 5
A saveHistory() 0 7 2
1
<?php
2
3
namespace Dtc\QueueBundle\Beanstalkd;
4
5
use Dtc\QueueBundle\Model\AbstractJobManager;
6
use Dtc\QueueBundle\Model\Job as BaseJob;
7
use Dtc\QueueBundle\Exception\UnsupportedException;
8
use Pheanstalk\Pheanstalk;
9
10
class JobManager extends AbstractJobManager
11
{
12
    const DEFAULT_RESERVE_TIMEOUT = 5; // seconds
13
14
    /** @var Pheanstalk */
15
    protected $beanstalkd;
16
17
    protected $tube;
18
19
    protected $reserveTimeout = self::DEFAULT_RESERVE_TIMEOUT;
20
21 1
    public function setBeanstalkd(Pheanstalk $beanstalkd)
22
    {
23 1
        $this->beanstalkd = $beanstalkd;
24 1
    }
25
26
    public function setTube($tube)
27
    {
28
        $this->tube = $tube;
29
    }
30
31
    public function setReserveTimeout($timeout)
32
    {
33
        $this->reserveTimeout = $timeout;
34
    }
35
36 4
    public function save(\Dtc\QueueBundle\Model\Job $job)
37
    {
38
        /** @var Job $job */
39 4
        $message = $job->toMessage();
40 4
        $arguments = [$message, $job->getPriority(), $job->getDelay(), $job->getTtr()];
41 4
        $method = 'put';
42 4
        if ($this->tube) {
43
            array_unshift($arguments, $this->tube);
44
            $method .= 'InTube';
45
        }
46 4
        $jobId = call_user_func_array([$this->beanstalkd, $method], $arguments);
47 4
        $job->setId($jobId);
48
49
        // Ideally we should get this from beanstalk, but to save the roundtrip time, we do this here
50 4
        $job->setBeanJob($this->getBeanJob($jobId, $message));
51
52 4
        return $job;
53
    }
54
55 4
    public function getBeanJob($jobId, $data)
56
    {
57 4
        return new \Pheanstalk\Job($jobId, $data);
58
    }
59
60
    /**
61
     * @param string|null     $workerName
62
     * @param string|null     $methodName
63
     * @param bool            $prioritize
64
     * @param int|string|null $runId
65
     *
66
     * @return Job|null
67
     *
68
     * @throws UnsupportedException
69
     */
70 5
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
71
    {
72 5
        if (null !== $methodName) {
73
            throw new UnsupportedException('Unsupported');
74
        }
75 5
        if (null !== $workerName) {
76 1
            throw new UnsupportedException('Unsupported');
77
        }
78
79 4
        $beanstalkd = $this->beanstalkd;
80 4
        if ($this->tube) {
81
            $beanstalkd = $this->beanstalkd->watch($this->tube);
82
        }
83
84
        do {
85 4
            $expiredJob = false;
86 4
            $job = $this->findJob($beanstalkd, $expiredJob, $runId);
87 4
        } while ($expiredJob);
88
89 4
        return $job;
90
    }
91
92
    /**
93
     * @param Pheanstalk      $beanstalkd
94
     * @param bool            $expiredJob
95
     * @param int|string|null $runId
96
     *
97
     * @return Job|null
98
     */
99 4
    protected function findJob(Pheanstalk $beanstalkd, &$expiredJob, $runId)
100
    {
101 4
        $beanJob = $beanstalkd->reserve($this->reserveTimeout);
102 4
        if ($beanJob) {
103 3
            $job = new Job();
104 3
            $job->fromMessage($beanJob->getData());
105 3
            $job->setId($beanJob->getId());
106 3
            $job->setRunId($runId);
107
108 3
            if (($expiresAt = $job->getExpiresAt()) && $expiresAt->getTimestamp() < time()) {
109
                $expiredJob = true;
110
                $beanstalkd->delete($beanJob);
111
112
                return null;
113
            }
114 3
            $job->setBeanJob($beanJob);
115
116 3
            return $job;
117
        }
118
119 2
        return null;
120
    }
121
122 2
    public function deleteJob(\Dtc\QueueBundle\Model\Job $job)
123
    {
124 2
        $this->beanstalkd
125 2
            ->delete($job);
126 2
    }
127
128
    // Save History get called upon completion of the job
129
    public function saveHistory(\Dtc\QueueBundle\Model\Job $job)
130
    {
131
        if (BaseJob::STATUS_SUCCESS === $job->getStatus()) {
132
            $this->beanstalkd
133
                ->delete($job);
134
        }
135
    }
136
137 1
    public function getStats()
138
    {
139 1
        return $this->beanstalkd->stats();
140
    }
141
}
142