JobManager   A
last analyzed

Complexity

Total Complexity 38

Size/Duplication

Total Lines 260
Duplicated Lines 0 %

Test Coverage

Coverage 96.46%

Importance

Changes 4
Bugs 0 Features 0
Metric Value
eloc 98
c 4
b 0
f 0
dl 0
loc 260
ccs 109
cts 113
cp 0.9646
rs 9.36
wmc 38

18 Methods

Rating   Name   Duplication   Size   Complexity  
A setAMQPConnection() 0 4 1
A setExchangeArgs() 0 3 1
A getChannel() 0 3 1
A checkChannelArgs() 0 7 3
A performChannelSetup() 0 9 2
A setupChannel() 0 7 2
A __construct() 0 5 2
A setQueueArgs() 0 10 3
A resetJob() 0 14 2
A findJob() 0 21 4
A retryableSaveHistory() 0 11 3
A __destruct() 0 2 1
A prioritySave() 0 14 2
A publishJob() 0 6 1
A calculatePriority() 0 8 2
A getJob() 0 11 2
A setMsgPriority() 0 5 2
A getWaitingJobCount() 0 14 4
1
<?php
2
3
namespace Dtc\QueueBundle\RabbitMQ;
4
5
use Dtc\QueueBundle\Exception\ArgumentsNotSetException;
6
use Dtc\QueueBundle\Exception\ClassNotSubclassException;
7
use Dtc\QueueBundle\Exception\PriorityException;
8
use Dtc\QueueBundle\Exception\UnsupportedException;
9
use Dtc\QueueBundle\Manager\JobIdTrait;
10
use Dtc\QueueBundle\Manager\JobTimingManager;
11
use Dtc\QueueBundle\Manager\PriorityJobManager;
12
use Dtc\QueueBundle\Manager\RunManager;
13
use Dtc\QueueBundle\Manager\SaveableTrait;
14
use Dtc\QueueBundle\Manager\VerifyTrait;
15
use Dtc\QueueBundle\Model\BaseJob;
16
use Dtc\QueueBundle\Model\JobTiming;
17
use Dtc\QueueBundle\Model\RetryableJob;
18
use Dtc\QueueBundle\Util\Util;
19
use PhpAmqpLib\Channel\AMQPChannel;
0 ignored issues
show
Bug introduced by
The type PhpAmqpLib\Channel\AMQPChannel was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
20
use PhpAmqpLib\Connection\AbstractConnection;
0 ignored issues
show
Bug introduced by
The type PhpAmqpLib\Connection\AbstractConnection was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
21
use PhpAmqpLib\Message\AMQPMessage;
0 ignored issues
show
Bug introduced by
The type PhpAmqpLib\Message\AMQPMessage was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
22
23
class JobManager extends PriorityJobManager
24
{
25
    use JobIdTrait;
26
    use SaveableTrait;
27
    use VerifyTrait;
28
29
    /** @var AMQPChannel */
30
    protected $channel;
31
32
    /** @var AbstractConnection */
33
    protected $connection;
34
    protected $queueArgs;
35
    protected $exchangeArgs;
36
37
    protected $channelSetup = false;
38
39
    protected $hostname;
40
    protected $pid;
41
42 2
    public function __construct(RunManager $runManager, JobTimingManager $jobTimingManager, $jobClass)
43
    {
44 2
        $this->hostname = gethostname() ?: '';
45 2
        $this->pid = getmypid();
46 2
        parent::__construct($runManager, $jobTimingManager, $jobClass);
47 2
    }
48
49
    /**
50
     * @param string $exchange
51
     * @param string $type
52
     * @param bool   $passive
53
     * @param bool   $durable
54
     * @param bool   $autoDelete
55
     */
56 1
    public function setExchangeArgs($exchange, $type, $passive, $durable, $autoDelete)
57
    {
58 1
        $this->exchangeArgs = [$exchange, $type, $passive, $durable, $autoDelete];
59 1
    }
60
61
    /**
62
     * @param string $queue
63
     * @param bool   $passive
64
     * @param bool   $durable
65
     * @param bool   $exclusive
66
     * @param bool   $autoDelete
67
     *
68
     * @throws PriorityException
69
     */
70 1
    public function setQueueArgs($queue, $passive, $durable, $exclusive, $autoDelete)
71
    {
72 1
        $arguments = [$queue, $passive, $durable, $exclusive, $autoDelete];
73
74 1
        $this->queueArgs = $arguments;
75 1
        if (!ctype_digit(strval($this->maxPriority))) {
76 1
            throw new PriorityException('Max Priority ('.$this->maxPriority.') needs to be a non-negative integer');
77
        }
78 1
        if (strval(intval($this->maxPriority)) !== strval($this->maxPriority)) {
79
            throw new PriorityException('Priority is higher than '.PHP_INT_MAX);
80
        }
81 1
    }
82
83 1
    public function setAMQPConnection(AbstractConnection $connection)
84
    {
85 1
        $this->connection = $connection;
86 1
        $this->channel = $connection->channel();
87 1
    }
88
89
    /**
90
     * @return AMQPChannel
91
     */
92 1
    public function getChannel()
93
    {
94 1
        return $this->channel;
95
    }
96
97
    /**
98
     * @throws ArgumentsNotSetException
99
     */
100 11
    protected function checkChannelArgs()
101
    {
102 11
        if (empty($this->queueArgs)) {
103 1
            throw new ArgumentsNotSetException(__METHOD__.': queue args need to be set via setQueueArgs(...)');
104
        }
105 11
        if (empty($this->exchangeArgs)) {
106 1
            throw new ArgumentsNotSetException(__METHOD__.': exchange args need to be set via setExchangeArgs(...)');
107
        }
108 11
    }
109
110 1
    protected function performChannelSetup()
111
    {
112 1
        call_user_func_array([$this->channel, 'exchange_declare'], $this->exchangeArgs);
113 1
        if ($this->maxPriority) {
114 1
            array_push($this->queueArgs, false);
115 1
            array_push($this->queueArgs, ['x-max-priority' => ['I', intval($this->maxPriority)]]);
116
        }
117 1
        call_user_func_array([$this->channel, 'queue_declare'], $this->queueArgs);
118 1
        $this->channel->queue_bind($this->queueArgs[0], $this->exchangeArgs[0]);
119 1
    }
120
121
    /**
122
     * @throws ArgumentsNotSetException
123
     */
124 11
    public function setupChannel()
125
    {
126 11
        $this->checkChannelArgs();
127
128 11
        if (!$this->channelSetup) {
129 1
            $this->performChannelSetup();
130 1
            $this->channelSetup = true;
131
        }
132 11
    }
133
134
    /**
135
     * @return \Dtc\QueueBundle\Model\Job
136
     *
137
     * @throws ClassNotSubclassException
138
     * @throws PriorityException
139
     * @throws ArgumentsNotSetException
140
     */
141 9
    public function prioritySave(\Dtc\QueueBundle\Model\Job $job)
142
    {
143 9
        if (!$job instanceof Job) {
144
            throw new ClassNotSubclassException('Must be derived from '.Job::class);
145
        }
146
147 9
        $this->setupChannel();
148
149 9
        $this->validateSaveable($job);
150 9
        $this->setJobId($job);
151
152 9
        $this->publishJob($job);
153
154 9
        return $job;
155
    }
156
157 9
    protected function publishJob(Job $job)
158
    {
159 9
        $msg = new AMQPMessage($job->toMessage());
160 9
        $this->setMsgPriority($msg, $job);
161
162 9
        $this->channel->basic_publish($msg, $this->exchangeArgs[0]);
163 9
    }
164
165
    /**
166
     * Sets the priority of the AMQPMessage.
167
     */
168 9
    protected function setMsgPriority(AMQPMessage $msg, \Dtc\QueueBundle\Model\Job $job)
169
    {
170 9
        if (null !== $this->maxPriority) {
171 9
            $priority = $job->getPriority();
172 9
            $msg->set('priority', $priority);
173
        }
174 9
    }
175
176 9
    protected function calculatePriority($priority)
177
    {
178 9
        $priority = parent::calculatePriority($priority);
179 9
        if (null === $priority) {
180 8
            return 0;
181
        }
182
183 3
        return $priority;
184
    }
185
186
    /**
187
     * @param string|null $workerName
188
     * @param string|null $methodName
189
     *
190
     * @throws UnsupportedException
191
     * @throws ArgumentsNotSetException
192
     */
193 9
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
194
    {
195 9
        $this->verifyGetJobArgs($workerName, $methodName, $prioritize);
196 8
        $this->setupChannel();
197
198
        do {
199 8
            $expiredJob = false;
200 8
            $job = $this->findJob($expiredJob, $runId);
201 8
        } while ($expiredJob);
202
203 8
        return $job;
204
    }
205
206
    /**
207
     * @param bool $expiredJob
208
     * @param $runId
209
     *
210
     * @return Job|null
211
     */
212 8
    protected function findJob(&$expiredJob, $runId)
213
    {
214 8
        $message = $this->channel->basic_get($this->queueArgs[0]);
215 8
        if ($message) {
216 8
            $job = new Job();
217 8
            $job->fromMessage($message->body);
218 8
            $job->setRunId($runId);
219
220 8
            if (($expiresAt = $job->getExpiresAt()) && $expiresAt->getTimestamp() < time()) {
221 1
                $expiredJob = true;
222 1
                $this->channel->basic_nack($message->delivery_info['delivery_tag']);
223 1
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_EXPIRED);
224
225 1
                return null;
226
            }
227 8
            $job->setDeliveryTag($message->delivery_info['delivery_tag']);
228
229 8
            return $job;
230
        }
231
232 6
        return null;
233
    }
234
235 2
    protected function resetJob(RetryableJob $job)
236
    {
237 2
        if (!$job instanceof Job) {
238
            throw new \InvalidArgumentException('$job must be instance of '.Job::class);
239
        }
240 2
        $job->setStatus(BaseJob::STATUS_NEW);
241 2
        $job->setMessage(null);
242 2
        $job->setStartedAt(null);
243 2
        $job->setDeliveryTag(null);
244 2
        $job->setRetries($job->getRetries() + 1);
245 2
        $job->setUpdatedAt(Util::getMicrotimeDateTime());
246 2
        $this->publishJob($job);
247
248 2
        return true;
249
    }
250
251
    // Save History get called upon completion of the job
252 3
    protected function retryableSaveHistory(RetryableJob $job, $retry)
253
    {
254 3
        if (!$job instanceof Job) {
255
            throw new ClassNotSubclassException("Expected \Dtc\QueueBundle\RabbitMQ\Job, got ".get_class($job));
256
        }
257 3
        $deliveryTag = $job->getDeliveryTag();
258 3
        if (null !== $deliveryTag) {
0 ignored issues
show
introduced by
The condition null !== $deliveryTag is always true.
Loading history...
259 3
            $this->channel->basic_ack($deliveryTag);
260
        }
261
262 3
        return;
263
    }
264
265 3
    public function getWaitingJobCount($workerName = null, $methodName = null)
266
    {
267 3
        $this->setupChannel();
268
269 3
        if ($workerName) {
270 1
            throw new UnsupportedException('Waiting Job Count by $workerName is not supported');
271
        }
272 3
        if ($methodName) {
273 1
            throw new UnsupportedException('Waiting Job Count by $methodName is not supported');
274
        }
275
276 3
        $count = call_user_func_array([$this->channel, 'queue_declare'], $this->queueArgs);
277
278 3
        return isset($count[1]) ? $count[1] : 0;
279
    }
280
281 2
    public function __destruct()
282
    {
283
        // There's some kind of problem trying to close the channel, otherwise we'd call $this->channel->close() at this point.
284 2
    }
285
}
286