Completed
Pull Request — master (#30)
by Matthew
16:57
created

JobManager::setTube()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 0
cts 3
cp 0
rs 10
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 1
crap 2
1
<?php
2
3
namespace Dtc\QueueBundle\Beanstalkd;
4
5
use Dtc\QueueBundle\Manager\RetryableJobManager;
6
use Dtc\QueueBundle\Model\RetryableJob;
7
use Dtc\QueueBundle\Model\Job as BaseJob;
8
use Dtc\QueueBundle\Exception\UnsupportedException;
9
use Pheanstalk\Pheanstalk;
10
11
class JobManager extends RetryableJobManager
12
{
13
    const DEFAULT_RESERVE_TIMEOUT = 5; // seconds
14
15
    /** @var Pheanstalk */
16
    protected $beanstalkd;
17
18
    protected $tube;
19
20
    protected $reserveTimeout = self::DEFAULT_RESERVE_TIMEOUT;
21
22 1
    public function setBeanstalkd(Pheanstalk $beanstalkd)
23
    {
24 1
        $this->beanstalkd = $beanstalkd;
25 1
    }
26
27
    public function setTube($tube)
28
    {
29
        $this->tube = $tube;
30
    }
31
32
    public function setReserveTimeout($timeout)
33
    {
34
        $this->reserveTimeout = $timeout;
35
    }
36
37 6
    public function retryableSave(RetryableJob $job)
38
    {
39 6
        if (!$job instanceof Job) {
40
            throw new \InvalidArgumentException('$job must be of type: '.Job::class);
41
        }
42
43 6
        return $this->putJob($job);
44
    }
45
46 6
    protected function putJob(Job $job)
47
    {
48
        /** @var Job $job */
49 6
        $message = $job->toMessage();
50 6
        $arguments = [$message, $job->getPriority(), $job->getDelay(), $job->getTtr()];
51 6
        $method = 'put';
52 6
        if ($this->tube) {
53
            array_unshift($arguments, $this->tube);
54
            $method .= 'InTube';
55
        }
56 6
        $jobId = call_user_func_array([$this->beanstalkd, $method], $arguments);
57 6
        $job->setId($jobId);
58
59
        // Ideally we should get this from beanstalk, but to save the roundtrip time, we do this here
60 6
        $job->setBeanJob($this->getBeanJob($jobId, $message));
61
62 6
        return $job;
63
    }
64
65 2 View Code Duplication
    protected function resetJob(RetryableJob $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
66
    {
67 2
        if (!$job instanceof Job) {
68 1
            throw new \InvalidArgumentException('$job must be instance of '.Job::class);
69
        }
70 2
        $job->setStatus(BaseJob::STATUS_NEW);
71 2
        $job->setMessage(null);
72 2
        $job->setStartedAt(null);
73 2
        $job->setRetries($job->getRetries() + 1);
74 2
        $job->setUpdatedAt(new \DateTime());
75 2
        $this->putJob($job);
76
77 2
        return true;
78
    }
79
80 6
    public function getBeanJob($jobId, $data)
81
    {
82 6
        return new \Pheanstalk\Job($jobId, $data);
83
    }
84
85
    /**
86
     * @param string|null     $workerName
87
     * @param string|null     $methodName
88
     * @param bool            $prioritize
89
     * @param int|string|null $runId
90
     *
91
     * @return Job|null
92
     *
93
     * @throws UnsupportedException
94
     */
95 5
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
96
    {
97 5
        if (null !== $methodName) {
98
            throw new UnsupportedException('Unsupported');
99
        }
100 5
        if (null !== $workerName) {
101 1
            throw new UnsupportedException('Unsupported');
102
        }
103
104 4
        $beanstalkd = $this->beanstalkd;
105 4
        if ($this->tube) {
106
            $beanstalkd = $this->beanstalkd->watch($this->tube);
107
        }
108
109
        do {
110 4
            $expiredJob = false;
111 4
            $job = $this->findJob($beanstalkd, $expiredJob, $runId);
112 4
        } while ($expiredJob);
113
114 4
        return $job;
115
    }
116
117
    /**
118
     * @param Pheanstalk      $beanstalkd
119
     * @param bool            $expiredJob
120
     * @param int|string|null $runId
121
     *
122
     * @return Job|null
123
     */
124 4
    protected function findJob(Pheanstalk $beanstalkd, &$expiredJob, $runId)
125
    {
126 4
        $beanJob = $beanstalkd->reserve($this->reserveTimeout);
127 4
        if ($beanJob) {
128 3
            $job = new Job();
129 3
            $job->fromMessage($beanJob->getData());
130 3
            $job->setId($beanJob->getId());
131 3
            $job->setRunId($runId);
132
133 3
            if (($expiresAt = $job->getExpiresAt()) && $expiresAt->getTimestamp() < time()) {
134
                $expiredJob = true;
135
                $beanstalkd->delete($beanJob);
136
137
                return null;
138
            }
139 3
            $job->setBeanJob($beanJob);
140
141 3
            return $job;
142
        }
143
144 2
        return null;
145
    }
146
147 2
    public function deleteJob(\Dtc\QueueBundle\Model\Job $job)
148
    {
149 2
        $this->beanstalkd
150 2
            ->delete($job);
151 2
    }
152
153
    // Save History get called upon completion of the job
154 2
    public function retryableSaveHistory(RetryableJob $job, $retry)
155
    {
156 2
        if (!$retry) {
157 2
            $this->beanstalkd
158 2
                ->delete($job);
159 2
        }
160 2
    }
161
162 1
    public function getStats()
163
    {
164 1
        return $this->beanstalkd->stats();
165
    }
166
}
167