Passed
Push — master ( 26ea1a...43abdf )
by Matthew
07:12
created

JobManager::putJob()   B

Complexity

Conditions 7
Paths 16

Size

Total Lines 30
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 19
CRAP Score 7.0422

Importance

Changes 0
Metric Value
cc 7
eloc 20
c 0
b 0
f 0
nc 16
nop 1
dl 0
loc 30
ccs 19
cts 21
cp 0.9048
crap 7.0422
rs 8.6666
1
<?php
2
3
namespace Dtc\QueueBundle\Beanstalkd;
4
5
use Dtc\QueueBundle\Manager\RetryableJobManager;
6
use Dtc\QueueBundle\Model\RetryableJob;
7
use Dtc\QueueBundle\Model\Job as BaseJob;
8
use Dtc\QueueBundle\Exception\UnsupportedException;
9
use Dtc\QueueBundle\Util\Util;
10
use Pheanstalk\Pheanstalk;
11
12
class JobManager extends RetryableJobManager
13
{
14
    const DEFAULT_RESERVE_TIMEOUT = 5; // seconds
15
16
    /** @var Pheanstalk */
17
    protected $beanstalkd;
18
19
    protected $tube;
20
21
    protected $reserveTimeout = self::DEFAULT_RESERVE_TIMEOUT;
22
23 2
    public function setBeanstalkd(Pheanstalk $beanstalkd)
24
    {
25 2
        $this->beanstalkd = $beanstalkd;
26 2
    }
27
28
    public function setTube($tube)
29
    {
30
        $this->tube = $tube;
31
    }
32
33
    public function setReserveTimeout($timeout)
34
    {
35
        $this->reserveTimeout = $timeout;
36
    }
37
38 7
    public function retryableSave(RetryableJob $job)
39
    {
40 7
        if (!$job instanceof Job) {
41
            throw new \InvalidArgumentException('$job must be of type: '.Job::class);
42
        }
43
44 7
        return $this->putJob($job);
45
    }
46
47 7
    protected function putJob(Job $job)
48
    {
49
        /** @var Job $job */
50 7
        $message = $job->toMessage();
51 7
        $arguments = [$message];
52 7
        if ($job->getPriority() !== null) {
0 ignored issues
show
introduced by
The condition $job->getPriority() !== null is always true.
Loading history...
53 1
            $arguments[] = $job->getPriority();
54
        }
55 7
        if ($job->getDelay() !== null) {
0 ignored issues
show
introduced by
The condition $job->getDelay() !== null is always true.
Loading history...
56 2
            while (count($arguments) < 2) {
57 2
                $arguments[] = 0;
58
            }
59 2
            $arguments[] = $job->getDelay();
60
        }
61 7
        if ($job->getTtr() !== null) {
0 ignored issues
show
introduced by
The condition $job->getTtr() !== null is always true.
Loading history...
62 7
            while (count($arguments) < 3) {
63 5
                $arguments[] = 0;
64
            }
65 7
            $arguments[] = $job->getTtr();
66
        }
67 7
        $method = 'put';
68 7
        if ($this->tube) {
69
            array_unshift($arguments, $this->tube);
70
            $method .= 'InTube';
71
        }
72 7
        $beanJob = call_user_func_array([$this->beanstalkd, $method], $arguments);
73 7
        $job->setId($beanJob->getId());
74 7
        $job->setBeanJob($beanJob);
75
76 7
        return $job;
77
    }
78
79 2
    protected function resetJob(RetryableJob $job)
80
    {
81 2
        if (!$job instanceof Job) {
82
            throw new \InvalidArgumentException('$job must be instance of '.Job::class);
83
        }
84 2
        $job->setStatus(BaseJob::STATUS_NEW);
85 2
        $job->setMessage(null);
86 2
        $job->setStartedAt(null);
87 2
        $job->setRetries($job->getRetries() + 1);
88 2
        $job->setUpdatedAt(Util::getMicrotimeDateTime());
89 2
        $this->putJob($job);
90
91 2
        return true;
92
    }
93
94
    public function getBeanJob($jobId, $data)
95
    {
96
        return new \Pheanstalk\Job($jobId, $data);
97
    }
98
99
    /**
100
     * @param string|null     $workerName
101
     * @param string|null     $methodName
102
     * @param bool            $prioritize
103
     * @param int|string|null $runId
104
     *
105
     * @return Job|null
106
     *
107
     * @throws UnsupportedException
108
     */
109 7
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
110
    {
111 7
        if (null !== $methodName) {
112
            throw new UnsupportedException('Unsupported');
113
        }
114 7
        if (null !== $workerName) {
115 1
            throw new UnsupportedException('Unsupported');
116
        }
117
118 6
        $beanstalkd = $this->beanstalkd;
119 6
        if ($this->tube) {
120
            $beanstalkd = $this->beanstalkd->watch($this->tube);
121
        }
122
123
        do {
124 6
            $expiredJob = false;
125 6
            $job = $this->findJob($beanstalkd, $expiredJob, $runId);
126 6
        } while ($expiredJob);
127
128 6
        return $job;
129
    }
130
131
    /**
132
     * @param Pheanstalk      $beanstalkd
133
     * @param bool            $expiredJob
134
     * @param int|string|null $runId
135
     *
136
     * @return Job|null
137
     */
138 6
    protected function findJob(Pheanstalk $beanstalkd, &$expiredJob, $runId)
139
    {
140 6
        $beanJob = $beanstalkd->reserveWithTimeout($this->reserveTimeout);
141 6
        if ($beanJob) {
142 5
            $job = new Job();
143 5
            $job->fromMessage($beanJob->getData());
144 5
            $job->setId($beanJob->getId());
145 5
            $job->setRunId($runId);
146
147 5
            if (($expiresAt = $job->getExpiresAt()) && $expiresAt->getTimestamp() < time()) {
148
                $expiredJob = true;
149
                $beanstalkd->delete($beanJob);
150
151
                return null;
152
            }
153 5
            $job->setBeanJob($beanJob);
154
155 5
            return $job;
156
        }
157
158 5
        return null;
159
    }
160
161 2
    public function deleteJob(\Dtc\QueueBundle\Model\Job $job)
162
    {
163 2
        $this->beanstalkd
164 2
            ->delete($job->getBeanJob());
0 ignored issues
show
Bug introduced by
The method getBeanJob() does not exist on Dtc\QueueBundle\Model\Job. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

164
            ->delete($job->/** @scrutinizer ignore-call */ getBeanJob());
Loading history...
165 2
    }
166
167
    // Save History get called upon completion of the job
168 4
    public function retryableSaveHistory(RetryableJob $job, $retry)
169
    {
170 4
        if (!$retry) {
171 4
            $this->beanstalkd
172 4
                ->delete($job->getBeanJob());
0 ignored issues
show
Bug introduced by
The method getBeanJob() does not exist on Dtc\QueueBundle\Model\RetryableJob. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

172
                ->delete($job->/** @scrutinizer ignore-call */ getBeanJob());
Loading history...
173
        }
174 4
    }
175
176 3
    public function getWaitingJobCount($workerName = null, $methodName = null)
177
    {
178 3
        $stats = $this->getStats();
179
180 3
        return isset($stats['current-jobs-ready']) ? $stats['current-jobs-ready'] : 0;
181
    }
182
183 3
    public function getStats()
184
    {
185 3
        return $this->beanstalkd->stats();
186
    }
187
}
188