Completed
Pull Request — master (#30)
by Matthew
23:29 queued 08:10
created

JobManager::retryableSave()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 8
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 2.0625

Importance

Changes 0
Metric Value
dl 0
loc 8
ccs 3
cts 4
cp 0.75
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 4
nc 2
nop 1
crap 2.0625
1
<?php
2
3
namespace Dtc\QueueBundle\Beanstalkd;
4
5
use Dtc\QueueBundle\Manager\RetryableJobManager;
6
use Dtc\QueueBundle\Model\RetryableJob;
7
use Dtc\QueueBundle\Model\Job as BaseJob;
8
use Dtc\QueueBundle\Exception\UnsupportedException;
9
use Dtc\QueueBundle\Util\Util;
10
use Pheanstalk\Pheanstalk;
11
12
class JobManager extends RetryableJobManager
13
{
14
    const DEFAULT_RESERVE_TIMEOUT = 5; // seconds
15
16
    /** @var Pheanstalk */
17
    protected $beanstalkd;
18
19
    protected $tube;
20
21
    protected $reserveTimeout = self::DEFAULT_RESERVE_TIMEOUT;
22
23 2
    public function setBeanstalkd(Pheanstalk $beanstalkd)
24
    {
25 2
        $this->beanstalkd = $beanstalkd;
26 2
    }
27
28
    public function setTube($tube)
29
    {
30
        $this->tube = $tube;
31
    }
32
33
    public function setReserveTimeout($timeout)
34
    {
35
        $this->reserveTimeout = $timeout;
36
    }
37
38 7
    public function retryableSave(RetryableJob $job)
39
    {
40 7
        if (!$job instanceof Job) {
41
            throw new \InvalidArgumentException('$job must be of type: '.Job::class);
42
        }
43
44 7
        return $this->putJob($job);
45
    }
46
47 7
    protected function putJob(Job $job)
48
    {
49
        /** @var Job $job */
50 7
        $message = $job->toMessage();
51 7
        $arguments = [$message, $job->getPriority(), $job->getDelay(), $job->getTtr()];
52 7
        $method = 'put';
53 7
        if ($this->tube) {
54
            array_unshift($arguments, $this->tube);
55
            $method .= 'InTube';
56
        }
57 7
        $jobId = call_user_func_array([$this->beanstalkd, $method], $arguments);
58 7
        $job->setId($jobId);
59
60
        // Ideally we should get this from beanstalk, but to save the roundtrip time, we do this here
61 7
        $job->setBeanJob($this->getBeanJob($jobId, $message));
62
63 7
        return $job;
64
    }
65
66 2 View Code Duplication
    protected function resetJob(RetryableJob $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
67
    {
68 2
        if (!$job instanceof Job) {
69
            throw new \InvalidArgumentException('$job must be instance of '.Job::class);
70
        }
71 2
        $job->setStatus(BaseJob::STATUS_NEW);
72 2
        $job->setMessage(null);
73 2
        $job->setStartedAt(null);
74 2
        $job->setRetries($job->getRetries() + 1);
75 2
        $job->setUpdatedAt(Util::getMicrotimeDateTime());
76 2
        $this->putJob($job);
77
78 2
        return true;
79
    }
80
81 7
    public function getBeanJob($jobId, $data)
82
    {
83 7
        return new \Pheanstalk\Job($jobId, $data);
84
    }
85
86
    /**
87
     * @param string|null     $workerName
88
     * @param string|null     $methodName
89
     * @param bool            $prioritize
90
     * @param int|string|null $runId
91
     *
92
     * @return Job|null
93
     *
94
     * @throws UnsupportedException
95
     */
96 6
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
97
    {
98 6
        if (null !== $methodName) {
99
            throw new UnsupportedException('Unsupported');
100
        }
101 6
        if (null !== $workerName) {
102 1
            throw new UnsupportedException('Unsupported');
103
        }
104
105 5
        $beanstalkd = $this->beanstalkd;
106 5
        if ($this->tube) {
107
            $beanstalkd = $this->beanstalkd->watch($this->tube);
108
        }
109
110
        do {
111 5
            $expiredJob = false;
112 5
            $job = $this->findJob($beanstalkd, $expiredJob, $runId);
113 5
        } while ($expiredJob);
114
115 5
        return $job;
116
    }
117
118
    /**
119
     * @param Pheanstalk      $beanstalkd
120
     * @param bool            $expiredJob
121
     * @param int|string|null $runId
122
     *
123
     * @return Job|null
124
     */
125 5
    protected function findJob(Pheanstalk $beanstalkd, &$expiredJob, $runId)
126
    {
127 5
        $beanJob = $beanstalkd->reserve($this->reserveTimeout);
128 5
        if ($beanJob) {
129 4
            $job = new Job();
130 4
            $job->fromMessage($beanJob->getData());
131 4
            $job->setId($beanJob->getId());
132 4
            $job->setRunId($runId);
133
134 4
            if (($expiresAt = $job->getExpiresAt()) && $expiresAt->getTimestamp() < time()) {
135
                $expiredJob = true;
136
                $beanstalkd->delete($beanJob);
137
138
                return null;
139
            }
140 4
            $job->setBeanJob($beanJob);
141
142 4
            return $job;
143
        }
144
145 3
        return null;
146
    }
147
148 2
    public function deleteJob(\Dtc\QueueBundle\Model\Job $job)
149
    {
150 2
        $this->beanstalkd
151 2
            ->delete($job);
152 2
    }
153
154
    // Save History get called upon completion of the job
155 3
    public function retryableSaveHistory(RetryableJob $job, $retry)
156
    {
157 3
        if (!$retry) {
158 3
            $this->beanstalkd
159 3
                ->delete($job);
160
        }
161 3
    }
162
163 1
    public function getStats()
164
    {
165 1
        return $this->beanstalkd->stats();
166
    }
167
}
168