Completed
Pull Request — master (#30)
by Matthew
18:57 queued 16:20
created

JobManager::getJob()   A

Complexity

Conditions 2
Paths 1

Size

Total Lines 12
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 2

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 12
ccs 7
cts 7
cp 1
rs 9.4285
cc 2
eloc 8
nc 1
nop 4
crap 2
1
<?php
2
3
namespace Dtc\QueueBundle\RabbitMQ;
4
5
use Dtc\QueueBundle\Model\BaseJob;
6
use Dtc\QueueBundle\Model\RetryableJob;
7
use Dtc\QueueBundle\Model\JobTiming;
8
use Dtc\QueueBundle\Manager\PriorityJobManager;
9
use Dtc\QueueBundle\Exception\ArgumentsNotSetException;
10
use Dtc\QueueBundle\Exception\ClassNotSubclassException;
11
use Dtc\QueueBundle\Exception\PriorityException;
12
use Dtc\QueueBundle\Exception\UnsupportedException;
13
use Dtc\QueueBundle\Manager\RunManager;
14
use Dtc\QueueBundle\Manager\JobTimingManager;
15
use PhpAmqpLib\Channel\AMQPChannel;
16
use PhpAmqpLib\Connection\AbstractConnection;
17
use PhpAmqpLib\Message\AMQPMessage;
18
19
class JobManager extends PriorityJobManager
20
{
21
    /** @var AMQPChannel */
22
    protected $channel;
23
24
    /** @var AbstractConnection */
25
    protected $connection;
26
    protected $queueArgs;
27
    protected $exchangeArgs;
28
29
    protected $channelSetup = false;
30
31
    protected $hostname;
32
    protected $pid;
33
34 2
    public function __construct(RunManager $runManager, JobTimingManager $jobTimingManager, $jobClass)
35
    {
36 2
        $this->hostname = gethostname() ?: '';
37 2
        $this->pid = getmypid();
38 2
        parent::__construct($runManager, $jobTimingManager, $jobClass);
39 2
    }
40
41
    /**
42
     * @param string $exchange
43
     * @param string $type
44
     * @param bool   $passive
45
     * @param bool   $durable
46
     * @param bool   $autoDelete
47
     */
48 1
    public function setExchangeArgs($exchange, $type, $passive, $durable, $autoDelete)
49
    {
50 1
        $this->exchangeArgs = [$exchange, $type, $passive, $durable, $autoDelete];
51 1
    }
52
53
    /**
54
     * @param string $queue
55
     * @param bool   $passive
56
     * @param bool   $durable
57
     * @param bool   $exclusive
58
     * @param bool   $autoDelete
59
     *
60
     * @throws PriorityException
61
     */
62 1
    public function setQueueArgs($queue, $passive, $durable, $exclusive, $autoDelete)
63
    {
64 1
        $arguments = [$queue, $passive, $durable, $exclusive, $autoDelete];
65
66 1
        $this->queueArgs = $arguments;
67 1
        if (!ctype_digit(strval($this->maxPriority))) {
68 1
            throw new PriorityException('Max Priority ('.$this->maxPriority.') needs to be a non-negative integer');
69
        }
70 1
        if (strval(intval($this->maxPriority)) !== strval($this->maxPriority)) {
71
            throw new PriorityException('Priority is higher than '.PHP_INT_MAX);
72
        }
73 1
    }
74
75 1
    public function setAMQPConnection(AbstractConnection $connection)
76
    {
77 1
        $this->connection = $connection;
78 1
        $this->channel = $connection->channel();
79 1
    }
80
81
    /**
82
     * @return AMQPChannel
83
     */
84
    public function getChannel()
85
    {
86
        return $this->channel;
87
    }
88
89
    /**
90
     * @throws ArgumentsNotSetException
91
     */
92 9
    protected function checkChannelArgs()
93
    {
94 9
        if (empty($this->queueArgs)) {
95 1
            throw new ArgumentsNotSetException(__METHOD__.': queue args need to be set via setQueueArgs(...)');
96
        }
97 9
        if (empty($this->exchangeArgs)) {
98 1
            throw new ArgumentsNotSetException(__METHOD__.': exchange args need to be set via setExchangeArgs(...)');
99
        }
100 9
    }
101
102 1
    protected function performChannelSetup()
103
    {
104 1
        call_user_func_array([$this->channel, 'exchange_declare'], $this->exchangeArgs);
105 1
        if ($this->maxPriority) {
106 1
            array_push($this->queueArgs, false);
107 1
            array_push($this->queueArgs, ['x-max-priority' => ['I', intval($this->maxPriority)]]);
108
        }
109 1
        call_user_func_array([$this->channel, 'queue_declare'], $this->queueArgs);
110 1
        $this->channel->queue_bind($this->queueArgs[0], $this->exchangeArgs[0]);
111 1
    }
112
113
    /**
114
     * @throws ArgumentsNotSetException
115
     */
116 9
    public function setupChannel()
117
    {
118 9
        $this->checkChannelArgs();
119
120 9
        if (!$this->channelSetup) {
121 1
            $this->performChannelSetup();
122 1
            $this->channelSetup = true;
123
        }
124 9
    }
125
126
    /**
127
     * @param \Dtc\QueueBundle\Model\Job $job
128
     *
129
     * @return \Dtc\QueueBundle\Model\Job
130
     *
131
     * @throws ClassNotSubclassException
132
     * @throws PriorityException
133
     * @throws ArgumentsNotSetException
134
     */
135 8
    public function prioritySave(\Dtc\QueueBundle\Model\Job $job)
136
    {
137 8
        if (!$job instanceof Job) {
138
            throw new ClassNotSubclassException('Must be derived from '.Job::class);
139
        }
140
141 8
        $this->setupChannel();
142
143 8
        $this->validateSaveable($job);
144 8
        $this->setJobId($job);
145
146 8
        $this->publishJob($job);
147
148 8
        return $job;
149
    }
150
151 8
    protected function publishJob(Job $job)
152
    {
153 8
        $msg = new AMQPMessage($job->toMessage());
154 8
        $this->setMsgPriority($msg, $job);
155
156 8
        $this->channel->basic_publish($msg, $this->exchangeArgs[0]);
157 8
    }
158
159
    /**
160
     * Attach a unique id to a job since RabbitMQ will not.
161
     *
162
     * @param \Dtc\QueueBundle\Model\Job $job
163
     */
164 8
    protected function setJobId(\Dtc\QueueBundle\Model\Job $job)
165
    {
166 8 View Code Duplication
        if (!$job->getId()) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
167 8
            $job->setId(uniqid($this->hostname.'-'.$this->pid, true));
168
        }
169 8
    }
170
171
    /**
172
     * Sets the priority of the AMQPMessage.
173
     *
174
     * @param AMQPMessage                $msg
175
     * @param \Dtc\QueueBundle\Model\Job $job
176
     */
177 8
    protected function setMsgPriority(AMQPMessage $msg, \Dtc\QueueBundle\Model\Job $job)
178
    {
179 8
        if (null !== $this->maxPriority) {
180 8
            $priority = $job->getPriority();
181 8
            $msg->set('priority', $priority);
182
        }
183 8
    }
184
185 8
    protected function calculatePriority($priority)
186
    {
187 8
        $priority = parent::calculatePriority($priority);
188 8
        if (null === $priority) {
189 7
            return 0;
190
        }
191
192 3
        return $priority;
193
    }
194
195
    /**
196
     * @param \Dtc\QueueBundle\Model\Job $job
197
     *
198
     * @throws PriorityException
199
     * @throws ClassNotSubclassException
200
     */
201 8 View Code Duplication
    protected function validateSaveable(\Dtc\QueueBundle\Model\Job $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
202
    {
203 8
        if (null !== $job->getPriority() && null === $this->maxPriority) {
204
            throw new PriorityException('This queue does not support priorities');
205
        }
206
207 8
        if (!$job instanceof Job) {
208
            throw new ClassNotSubclassException('Job needs to be instance of '.Job::class);
209
        }
210 8
    }
211
212
    /**
213
     * @param null $workerName
214
     * @param null $methodName
215
     * @param bool $prioritize
216
     *
217
     * @throws UnsupportedException
218
     */
219 7
    protected function verifyGetJobArgs($workerName = null, $methodName = null, $prioritize = true)
220
    {
221 7 View Code Duplication
        if (null !== $workerName || null !== $methodName || (null !== $this->maxPriority && true !== $prioritize)) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
222 2
            throw new UnsupportedException('Unsupported');
223
        }
224 6
    }
225
226
    /**
227
     * @param string $workerName
228
     *
229
     * @throws UnsupportedException
230
     * @throws ArgumentsNotSetException
231
     */
232 7
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
233
    {
234 7
        $this->verifyGetJobArgs($workerName, $methodName, $prioritize);
0 ignored issues
show
Bug introduced by
It seems like $workerName defined by parameter $workerName on line 232 can also be of type string; however, Dtc\QueueBundle\RabbitMQ...ger::verifyGetJobArgs() does only seem to accept null, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
235 6
        $this->setupChannel();
236
237
        do {
238 6
            $expiredJob = false;
239 6
            $job = $this->findJob($expiredJob, $runId);
240 6
        } while ($expiredJob);
241
242 6
        return $job;
243
    }
244
245
    /**
246
     * @param bool $expiredJob
247
     * @param $runId
248
     *
249
     * @return Job|null
250
     */
251 6
    protected function findJob(&$expiredJob, $runId)
252
    {
253 6
        $message = $this->channel->basic_get($this->queueArgs[0]);
254 6
        if ($message) {
255 6
            $job = new Job();
256 6
            $job->fromMessage($message->body);
257 6
            $job->setRunId($runId);
258
259 6
            if (($expiresAt = $job->getExpiresAt()) && $expiresAt->getTimestamp() < time()) {
260 1
                $expiredJob = true;
261 1
                $this->channel->basic_nack($message->delivery_info['delivery_tag']);
262 1
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_EXPIRED);
263
264 1
                return null;
265
            }
266 6
            $job->setDeliveryTag($message->delivery_info['delivery_tag']);
267
268 6
            return $job;
269
        }
270
271 3
        return null;
272
    }
273
274 2 View Code Duplication
    protected function resetJob(RetryableJob $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
275
    {
276 2
        if (!$job instanceof Job) {
277
            throw new \InvalidArgumentException('$job must be instance of '.Job::class);
278
        }
279 2
        $job->setStatus(BaseJob::STATUS_NEW);
280 2
        $job->setMessage(null);
281 2
        $job->setStartedAt(null);
282 2
        $job->setRetries($job->getRetries() + 1);
283 2
        $job->setUpdatedAt(new \DateTime());
284 2
        $this->publishJob($job);
285
286 2
        return true;
287
    }
288
289
    // Save History get called upon completion of the job
290 3
    protected function retryableSaveHistory(RetryableJob $job, $retry)
291
    {
292 3
        if (!$job instanceof Job) {
293
            throw new ClassNotSubclassException("Expected \Dtc\QueueBundle\RabbitMQ\Job, got ".get_class($job));
294
        }
295 3
        $deliveryTag = $job->getDeliveryTag();
296 3
        $this->channel->basic_ack($deliveryTag);
297
298 3
        return;
299
    }
300
301 2
    public function __destruct()
302
    {
303
        // There's some kind of problem trying to close the channel, otherwise we'd call $this->channel->close() at this point.
304 2
    }
305
}
306