Completed
Push — master ( 122dc2...fa7a2a )
by Matthew
09:15
created

JobManager::findJob()   B

Complexity

Conditions 4
Paths 3

Size

Total Lines 22
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 11
CRAP Score 4.1574

Importance

Changes 0
Metric Value
dl 0
loc 22
ccs 11
cts 14
cp 0.7856
rs 8.9197
c 0
b 0
f 0
cc 4
eloc 14
nc 3
nop 3
crap 4.1574
1
<?php
2
3
namespace Dtc\QueueBundle\Beanstalkd;
4
5
use Dtc\QueueBundle\Model\AbstractJobManager;
6
use Dtc\QueueBundle\Model\Job as BaseJob;
7
use Pheanstalk\Pheanstalk;
8
9
class JobManager extends AbstractJobManager
10
{
11
    const DEFAULT_RESERVE_TIMEOUT = 5; // seconds
12
13
    /** @var Pheanstalk */
14
    protected $beanstalkd;
15
16
    protected $tube;
17
18
    protected $reserveTimeout = self::DEFAULT_RESERVE_TIMEOUT;
19
20 2
    public function setBeanstalkd(Pheanstalk $beanstalkd)
21
    {
22 2
        $this->beanstalkd = $beanstalkd;
23 2
    }
24
25
    public function setTube($tube)
26
    {
27
        $this->tube = $tube;
28
    }
29
30
    public function setReserveTimeout($timeout)
31
    {
32
        $this->reserveTimeout = $timeout;
33
    }
34
35 5
    public function save(\Dtc\QueueBundle\Model\Job $job)
36
    {
37
        /** @var Job $job */
38 5
        $message = $job->toMessage();
39 5
        $arguments = [$message, $job->getPriority(), $job->getDelay(), $job->getTtr()];
40 5
        $method = 'put';
41 5
        if ($this->tube) {
42
            array_unshift($arguments, $this->tube);
43
            $method .= 'InTube';
44
        }
45 5
        $jobId = call_user_func_array([$this->beanstalkd, $method], $arguments);
46 5
        $job->setId($jobId);
47
48
        // Ideally we should get this from beanstalk, but to save the roundtrip time, we do this here
49 5
        $job->setBeanJob($this->getBeanJob($jobId, $message));
50
51 5
        return $job;
52
    }
53
54 5
    public function getBeanJob($jobId, $data)
55
    {
56 5
        return new \Pheanstalk\Job($jobId, $data);
57
    }
58
59
    /**
60
     * @param string|null     $workerName
61
     * @param string|null     $methodName
62
     * @param bool            $prioritize
63
     * @param int|string|null $runId
64
     *
65
     * @return Job|null
66
     *
67
     * @throws \Exception
68
     */
69 6
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
70
    {
71 6
        if (null !== $methodName) {
72
            throw new \Exception('Unsupported');
73
        }
74 6
        if (null !== $workerName) {
75 1
            throw new \Exception('Unsupported');
76
        }
77
78 5
        $beanstalkd = $this->beanstalkd;
79 5
        if ($this->tube) {
80
            $beanstalkd = $this->beanstalkd->watch($this->tube);
81
        }
82
83
        do {
84 5
            $expiredJob = false;
85 5
            $job = $this->findJob($beanstalkd, $expiredJob, $runId);
86 5
        } while ($expiredJob);
87
88 5
        return $job;
89
    }
90
91
    /**
92
     * @param Pheanstalk      $beanstalkd
93
     * @param bool            $expiredJob
94
     * @param int|string|null $runId
95
     *
96
     * @return Job|null
97
     */
98 5
    protected function findJob(Pheanstalk $beanstalkd, &$expiredJob, $runId)
99
    {
100 5
        $beanJob = $beanstalkd->reserve($this->reserveTimeout);
101 5
        if ($beanJob) {
102 4
            $job = new Job();
103 4
            $job->fromMessage($beanJob->getData());
104 4
            $job->setId($beanJob->getId());
105 4
            $job->setRunId($runId);
106
107 4
            if (($expiresAt = $job->getExpiresAt()) && $expiresAt->getTimestamp() < time()) {
108
                $expiredJob = true;
109
                $beanstalkd->delete($beanJob);
110
111
                return null;
112
            }
113 4
            $job->setBeanJob($beanJob);
114
115 4
            return $job;
116
        }
117
118 3
        return null;
119
    }
120
121 2
    public function deleteJob(\Dtc\QueueBundle\Model\Job $job)
122
    {
123 2
        $this->beanstalkd
124 2
            ->delete($job);
125 2
    }
126
127
    // Save History get called upon completion of the job
128 1
    public function saveHistory(\Dtc\QueueBundle\Model\Job $job)
129
    {
130 1
        if (BaseJob::STATUS_SUCCESS === $job->getStatus()) {
131 1
            $this->beanstalkd
132 1
                ->delete($job);
133
        }
134 1
    }
135
136 1
    public function getStats()
137
    {
138 1
        return $this->beanstalkd->stats();
139
    }
140
}
141