Passed
Pull Request — master (#84)
by Joe
07:37
created

JobManager   A

Complexity

Total Complexity 25

Size/Duplication

Total Lines 161
Duplicated Lines 0 %

Test Coverage

Coverage 80.26%

Importance

Changes 0
Metric Value
wmc 25
dl 0
loc 161
ccs 61
cts 76
cp 0.8026
rs 10
c 0
b 0
f 0

13 Methods

Rating   Name   Duplication   Size   Complexity  
A getWaitingJobCount() 0 5 2
A getBeanJob() 0 3 1
A deleteJob() 0 4 1
A findJob() 0 21 4
A setReserveTimeout() 0 3 1
A retryableSaveHistory() 0 5 2
A retryableSave() 0 7 2
A resetJob() 0 13 2
A setTube() 0 3 1
A setBeanstalkd() 0 3 1
A getStats() 0 3 1
A putJob() 0 17 2
A getJob() 0 20 5
1
<?php
2
3
namespace Dtc\QueueBundle\Beanstalkd;
4
5
use Dtc\QueueBundle\Manager\RetryableJobManager;
6
use Dtc\QueueBundle\Model\RetryableJob;
7
use Dtc\QueueBundle\Model\Job as BaseJob;
8
use Dtc\QueueBundle\Exception\UnsupportedException;
9
use Dtc\QueueBundle\Util\Util;
10
use Pheanstalk\Pheanstalk;
11
12
class JobManager extends RetryableJobManager
13
{
14
    const DEFAULT_RESERVE_TIMEOUT = 5; // seconds
15
16
    /** @var Pheanstalk */
17
    protected $beanstalkd;
18
19
    protected $tube;
20
21
    protected $reserveTimeout = self::DEFAULT_RESERVE_TIMEOUT;
22
23 2
    public function setBeanstalkd(Pheanstalk $beanstalkd)
24
    {
25 2
        $this->beanstalkd = $beanstalkd;
26 2
    }
27
28
    public function setTube($tube)
29
    {
30
        $this->tube = $tube;
31
    }
32
33
    public function setReserveTimeout($timeout)
34
    {
35
        $this->reserveTimeout = $timeout;
36
    }
37
38 7
    public function retryableSave(RetryableJob $job)
39
    {
40 7
        if (!$job instanceof Job) {
41
            throw new \InvalidArgumentException('$job must be of type: '.Job::class);
42
        }
43
44 7
        return $this->putJob($job);
45
    }
46
47 7
    protected function putJob(Job $job)
48
    {
49
        /** @var Job $job */
50 7
        $message = $job->toMessage();
51 7
        $arguments = [$message, $job->getPriority(), $job->getDelay(), $job->getTtr()];
52 7
        $method = 'put';
53 7
        if ($this->tube) {
54
            array_unshift($arguments, $this->tube);
55
            $method .= 'InTube';
56
        }
57 7
        $jobId = call_user_func_array([$this->beanstalkd, $method], $arguments);
58 7
        $job->setId($jobId);
59
60
        // Ideally we should get this from beanstalk, but to save the roundtrip time, we do this here
61 7
        $job->setBeanJob($this->getBeanJob($jobId, $message));
62
63 7
        return $job;
64
    }
65
66 2
    protected function resetJob(RetryableJob $job)
67
    {
68 2
        if (!$job instanceof Job) {
69
            throw new \InvalidArgumentException('$job must be instance of '.Job::class);
70
        }
71 2
        $job->setStatus(BaseJob::STATUS_NEW);
72 2
        $job->setMessage(null);
73 2
        $job->setStartedAt(null);
74 2
        $job->setRetries($job->getRetries() + 1);
75 2
        $job->setUpdatedAt(Util::getMicrotimeDateTime());
76 2
        $this->putJob($job);
77
78 2
        return true;
79
    }
80
81 7
    public function getBeanJob($jobId, $data)
82
    {
83 7
        return new \Pheanstalk\Job($jobId, $data);
84
    }
85
86
    /**
87
     * @param string|null     $workerName
88
     * @param string|null     $methodName
89
     * @param bool            $prioritize
90
     * @param int|string|null $runId
91
     *
92
     * @return Job|null
93
     *
94
     * @throws UnsupportedException
95
     */
96 7
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
97
    {
98 7
        if (null !== $methodName) {
99
            throw new UnsupportedException('Unsupported');
100
        }
101 7
        if (null !== $workerName) {
102 1
            throw new UnsupportedException('Unsupported');
103
        }
104
105 6
        $beanstalkd = $this->beanstalkd;
106 6
        if ($this->tube) {
107
            $beanstalkd = $this->beanstalkd->watch($this->tube);
108
        }
109
110
        do {
111 6
            $expiredJob = false;
112 6
            $job = $this->findJob($beanstalkd, $expiredJob, $runId);
113 6
        } while ($expiredJob);
114
115 6
        return $job;
116
    }
117
118
    /**
119
     * @param Pheanstalk      $beanstalkd
120
     * @param bool            $expiredJob
121
     * @param int|string|null $runId
122
     *
123
     * @return Job|null
124
     */
125 6
    protected function findJob(Pheanstalk $beanstalkd, &$expiredJob, $runId)
126
    {
127 6
        $beanJob = $beanstalkd->reserve($this->reserveTimeout);
128 6
        if ($beanJob) {
129 5
            $job = new Job();
130 5
            $job->fromMessage($beanJob->getData());
131 5
            $job->setId($beanJob->getId());
132 5
            $job->setRunId($runId);
133
134 5
            if (($expiresAt = $job->getExpiresAt()) && $expiresAt->getTimestamp() < time()) {
135
                $expiredJob = true;
136
                $beanstalkd->delete($beanJob);
137
138
                return null;
139
            }
140 5
            $job->setBeanJob($beanJob);
141
142 5
            return $job;
143
        }
144
145 5
        return null;
146
    }
147
148 2
    public function deleteJob(\Dtc\QueueBundle\Model\Job $job)
149
    {
150 2
        $this->beanstalkd
151 2
            ->delete($job);
152 2
    }
153
154
    // Save History get called upon completion of the job
155 4
    public function retryableSaveHistory(RetryableJob $job, $retry)
156
    {
157 4
        if (!$retry) {
158 4
            $this->beanstalkd
159 4
                ->delete($job);
160
        }
161 4
    }
162
163 3
    public function getWaitingJobCount($workerName = null, $methodName = null)
164
    {
165 3
        $stats = $this->getStats();
166
167 3
        return isset($stats['current-jobs-ready']) ? $stats['current-jobs-ready'] : 0;
168
    }
169
170 3
    public function getStats()
171
    {
172 3
        return $this->beanstalkd->stats();
173
    }
174
}
175