JobManager   A
last analyzed

Complexity

Total Complexity 30

Size/Duplication

Total Lines 171
Duplicated Lines 0 %

Test Coverage

Coverage 83.33%

Importance

Changes 1
Bugs 1 Features 0
Metric Value
eloc 72
dl 0
loc 171
ccs 70
cts 84
cp 0.8333
rs 10
c 1
b 1
f 0
wmc 30

13 Methods

Rating   Name   Duplication   Size   Complexity  
A retryableSave() 0 7 2
A setReserveTimeout() 0 3 1
A setTube() 0 3 1
A setBeanstalkd() 0 3 1
A getWaitingJobCount() 0 5 2
A getBeanJob() 0 3 1
A deleteJob() 0 4 1
A findJob() 0 21 4
A retryableSaveHistory() 0 5 2
B putJob() 0 28 7
A resetJob() 0 13 2
A getJob() 0 20 5
A getStats() 0 3 1
1
<?php
2
3
namespace Dtc\QueueBundle\Beanstalkd;
4
5
use Dtc\QueueBundle\Exception\UnsupportedException;
6
use Dtc\QueueBundle\Manager\RetryableJobManager;
7
use Dtc\QueueBundle\Model\Job as BaseJob;
8
use Dtc\QueueBundle\Model\RetryableJob;
9
use Dtc\QueueBundle\Util\Util;
10
use Pheanstalk\Pheanstalk;
0 ignored issues
show
Bug introduced by
The type Pheanstalk\Pheanstalk was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
11
12
class JobManager extends RetryableJobManager
13
{
14
    public const DEFAULT_RESERVE_TIMEOUT = 5; // seconds
15
16
    /** @var Pheanstalk */
17
    protected $beanstalkd;
18
19
    protected $tube;
20
21
    protected $reserveTimeout = self::DEFAULT_RESERVE_TIMEOUT;
22
23 2
    public function setBeanstalkd(Pheanstalk $beanstalkd)
24
    {
25 2
        $this->beanstalkd = $beanstalkd;
26 2
    }
27
28
    public function setTube($tube)
29
    {
30
        $this->tube = $tube;
31
    }
32
33
    public function setReserveTimeout($timeout)
34
    {
35
        $this->reserveTimeout = $timeout;
36
    }
37
38 13
    public function retryableSave(RetryableJob $job)
39
    {
40 13
        if (!$job instanceof Job) {
41
            throw new \InvalidArgumentException('$job must be of type: '.Job::class);
42
        }
43
44 13
        return $this->putJob($job);
45
    }
46
47 13
    protected function putJob(Job $job)
48
    {
49
        /** @var Job $job */
50 13
        $message = $job->toMessage();
51 13
        $arguments = [$message];
52 13
        if (null !== $job->getPriority()) {
0 ignored issues
show
introduced by
The condition null !== $job->getPriority() is always true.
Loading history...
53 2
            $arguments[] = $job->getPriority();
54
        }
55 13
        if (null !== $job->getDelay()) {
0 ignored issues
show
introduced by
The condition null !== $job->getDelay() is always true.
Loading history...
56 3
            while (count($arguments) < 2) {
57 3
                $arguments[] = 0;
58
            }
59 3
            $arguments[] = $job->getDelay();
60
        }
61 13
        if (null !== $job->getTtr()) {
0 ignored issues
show
introduced by
The condition null !== $job->getTtr() is always true.
Loading history...
62 13
            while (count($arguments) < 3) {
63 10
                $arguments[] = 0;
64
            }
65 13
            $arguments[] = $job->getTtr();
66
        }
67 13
        if ($this->tube) {
68 6
            $this->beanstalkd->useTube($this->tube);
69
        }
70 13
        $beanJob = call_user_func_array([$this->beanstalkd, 'put'], $arguments);
71 13
        $job->setId($beanJob->getId());
72 13
        $job->setBeanJob($beanJob);
73
74 13
        return $job;
75
    }
76
77 4
    protected function resetJob(RetryableJob $job)
78
    {
79 4
        if (!$job instanceof Job) {
80
            throw new \InvalidArgumentException('$job must be instance of '.Job::class);
81
        }
82 4
        $job->setStatus(BaseJob::STATUS_NEW);
83 4
        $job->setMessage(null);
84 4
        $job->setStartedAt(null);
85 4
        $job->setRetries($job->getRetries() + 1);
86 4
        $job->setUpdatedAt(Util::getMicrotimeDateTime());
87 4
        $this->putJob($job);
88
89 4
        return true;
90
    }
91
92
    public function getBeanJob($jobId, $data)
93
    {
94
        return new \Pheanstalk\Job($jobId, $data);
0 ignored issues
show
Bug introduced by
The type Pheanstalk\Job was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
95
    }
96
97
    /**
98
     * @param string|null     $workerName
99
     * @param string|null     $methodName
100
     * @param bool            $prioritize
101
     * @param int|string|null $runId
102
     *
103
     * @return Job|null
104
     *
105
     * @throws UnsupportedException
106
     */
107 13
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
108
    {
109 13
        if (null !== $methodName) {
110
            throw new UnsupportedException('Unsupported');
111
        }
112 13
        if (null !== $workerName) {
113 2
            throw new UnsupportedException('Unsupported');
114
        }
115
116 11
        $beanstalkd = $this->beanstalkd;
117 11
        if ($this->tube) {
118 5
            $beanstalkd = $this->beanstalkd->watch($this->tube);
119
        }
120
121
        do {
122 11
            $expiredJob = false;
123 11
            $job = $this->findJob($beanstalkd, $expiredJob, $runId);
124 11
        } while ($expiredJob);
125
126 11
        return $job;
127
    }
128
129
    /**
130
     * @param bool            $expiredJob
131
     * @param int|string|null $runId
132
     *
133
     * @return Job|null
134
     */
135 11
    protected function findJob(Pheanstalk $beanstalkd, &$expiredJob, $runId)
136
    {
137 11
        $beanJob = $beanstalkd->reserveWithTimeout($this->reserveTimeout);
138 11
        if ($beanJob) {
139 9
            $job = new Job();
140 9
            $job->fromMessage($beanJob->getData());
141 9
            $job->setId($beanJob->getId());
142 9
            $job->setRunId($runId);
143
144 9
            if (($expiresAt = $job->getExpiresAt()) && $expiresAt->getTimestamp() < time()) {
145
                $expiredJob = true;
146
                $beanstalkd->delete($beanJob);
147
148
                return null;
149
            }
150 9
            $job->setBeanJob($beanJob);
151
152 9
            return $job;
153
        }
154
155 11
        return null;
156
    }
157
158 4
    public function deleteJob(BaseJob $job)
159
    {
160 4
        $this->beanstalkd
161 4
            ->delete($job->getBeanJob());
0 ignored issues
show
Bug introduced by
The method getBeanJob() does not exist on Dtc\QueueBundle\Model\Job. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

161
            ->delete($job->/** @scrutinizer ignore-call */ getBeanJob());
Loading history...
162 4
    }
163
164
    // Save History get called upon completion of the job
165 7
    public function retryableSaveHistory(RetryableJob $job, $retry)
166
    {
167 7
        if (!$retry) {
168 7
            $this->beanstalkd
169 7
                ->delete($job->getBeanJob());
0 ignored issues
show
Bug introduced by
The method getBeanJob() does not exist on Dtc\QueueBundle\Model\RetryableJob. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

169
                ->delete($job->/** @scrutinizer ignore-call */ getBeanJob());
Loading history...
170
        }
171 7
    }
172
173 5
    public function getWaitingJobCount($workerName = null, $methodName = null)
174
    {
175 5
        $stats = $this->getStats();
176
177 5
        return isset($stats['current-jobs-ready']) ? $stats['current-jobs-ready'] : 0;
178
    }
179
180 5
    public function getStats()
181
    {
182 5
        return $this->beanstalkd->statsTube($this->beanstalkd->listTubeUsed());
183
    }
184
}
185