Completed
Pull Request — master (#30)
by Matthew
07:24
created

JobManager::findJob()   B

Complexity

Conditions 4
Paths 3

Size

Total Lines 22
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 14
CRAP Score 4

Importance

Changes 0
Metric Value
dl 0
loc 22
ccs 14
cts 14
cp 1
rs 8.9197
c 0
b 0
f 0
cc 4
eloc 14
nc 3
nop 2
crap 4
1
<?php
2
3
namespace Dtc\QueueBundle\RabbitMQ;
4
5
use Dtc\QueueBundle\Model\BaseJob;
6
use Dtc\QueueBundle\Model\RetryableJob;
7
use Dtc\QueueBundle\Model\JobTiming;
8
use Dtc\QueueBundle\Manager\PriorityJobManager;
9
use Dtc\QueueBundle\Exception\ArgumentsNotSetException;
10
use Dtc\QueueBundle\Exception\ClassNotSubclassException;
11
use Dtc\QueueBundle\Exception\PriorityException;
12
use Dtc\QueueBundle\Exception\UnsupportedException;
13
use Dtc\QueueBundle\Manager\RunManager;
14
use Dtc\QueueBundle\Manager\JobTimingManager;
15
use Dtc\QueueBundle\Util\Util;
16
use PhpAmqpLib\Channel\AMQPChannel;
17
use PhpAmqpLib\Connection\AbstractConnection;
18
use PhpAmqpLib\Message\AMQPMessage;
19
20
class JobManager extends PriorityJobManager
21
{
22
    /** @var AMQPChannel */
23
    protected $channel;
24
25
    /** @var AbstractConnection */
26
    protected $connection;
27
    protected $queueArgs;
28
    protected $exchangeArgs;
29
30
    protected $channelSetup = false;
31
32
    protected $hostname;
33
    protected $pid;
34
35 2
    public function __construct(RunManager $runManager, JobTimingManager $jobTimingManager, $jobClass)
36
    {
37 2
        $this->hostname = gethostname() ?: '';
38 2
        $this->pid = getmypid();
39 2
        parent::__construct($runManager, $jobTimingManager, $jobClass);
40 2
    }
41
42
    /**
43
     * @param string $exchange
44
     * @param string $type
45
     * @param bool   $passive
46
     * @param bool   $durable
47
     * @param bool   $autoDelete
48
     */
49 1
    public function setExchangeArgs($exchange, $type, $passive, $durable, $autoDelete)
50
    {
51 1
        $this->exchangeArgs = [$exchange, $type, $passive, $durable, $autoDelete];
52 1
    }
53
54
    /**
55
     * @param string $queue
56
     * @param bool   $passive
57
     * @param bool   $durable
58
     * @param bool   $exclusive
59
     * @param bool   $autoDelete
60
     *
61
     * @throws PriorityException
62
     */
63 1
    public function setQueueArgs($queue, $passive, $durable, $exclusive, $autoDelete)
64
    {
65 1
        $arguments = [$queue, $passive, $durable, $exclusive, $autoDelete];
66
67 1
        $this->queueArgs = $arguments;
68 1
        if (!ctype_digit(strval($this->maxPriority))) {
69 1
            throw new PriorityException('Max Priority ('.$this->maxPriority.') needs to be a non-negative integer');
70
        }
71 1
        if (strval(intval($this->maxPriority)) !== strval($this->maxPriority)) {
72
            throw new PriorityException('Priority is higher than '.PHP_INT_MAX);
73
        }
74 1
    }
75
76 1
    public function setAMQPConnection(AbstractConnection $connection)
77
    {
78 1
        $this->connection = $connection;
79 1
        $this->channel = $connection->channel();
80 1
    }
81
82
    /**
83
     * @return AMQPChannel
84
     */
85
    public function getChannel()
86
    {
87
        return $this->channel;
88
    }
89
90
    /**
91
     * @throws ArgumentsNotSetException
92
     */
93 9
    protected function checkChannelArgs()
94
    {
95 9
        if (empty($this->queueArgs)) {
96 1
            throw new ArgumentsNotSetException(__METHOD__.': queue args need to be set via setQueueArgs(...)');
97
        }
98 9
        if (empty($this->exchangeArgs)) {
99 1
            throw new ArgumentsNotSetException(__METHOD__.': exchange args need to be set via setExchangeArgs(...)');
100
        }
101 9
    }
102
103 1
    protected function performChannelSetup()
104
    {
105 1
        call_user_func_array([$this->channel, 'exchange_declare'], $this->exchangeArgs);
106 1
        if ($this->maxPriority) {
107 1
            array_push($this->queueArgs, false);
108 1
            array_push($this->queueArgs, ['x-max-priority' => ['I', intval($this->maxPriority)]]);
109
        }
110 1
        call_user_func_array([$this->channel, 'queue_declare'], $this->queueArgs);
111 1
        $this->channel->queue_bind($this->queueArgs[0], $this->exchangeArgs[0]);
112 1
    }
113
114
    /**
115
     * @throws ArgumentsNotSetException
116
     */
117 9
    public function setupChannel()
118
    {
119 9
        $this->checkChannelArgs();
120
121 9
        if (!$this->channelSetup) {
122 1
            $this->performChannelSetup();
123 1
            $this->channelSetup = true;
124
        }
125 9
    }
126
127
    /**
128
     * @param \Dtc\QueueBundle\Model\Job $job
129
     *
130
     * @return \Dtc\QueueBundle\Model\Job
131
     *
132
     * @throws ClassNotSubclassException
133
     * @throws PriorityException
134
     * @throws ArgumentsNotSetException
135
     */
136 8
    public function prioritySave(\Dtc\QueueBundle\Model\Job $job)
137
    {
138 8
        if (!$job instanceof Job) {
139
            throw new ClassNotSubclassException('Must be derived from '.Job::class);
140
        }
141
142 8
        $this->setupChannel();
143
144 8
        $this->validateSaveable($job);
145 8
        $this->setJobId($job);
146
147 8
        $this->publishJob($job);
148
149 8
        return $job;
150
    }
151
152 8
    protected function publishJob(Job $job)
153
    {
154 8
        $msg = new AMQPMessage($job->toMessage());
155 8
        $this->setMsgPriority($msg, $job);
156
157 8
        $this->channel->basic_publish($msg, $this->exchangeArgs[0]);
158 8
    }
159
160
    /**
161
     * Attach a unique id to a job since RabbitMQ will not.
162
     *
163
     * @param \Dtc\QueueBundle\Model\Job $job
164
     */
165 8
    protected function setJobId(\Dtc\QueueBundle\Model\Job $job)
166
    {
167 8 View Code Duplication
        if (!$job->getId()) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
168 8
            $job->setId(uniqid($this->hostname.'-'.$this->pid, true));
169
        }
170 8
    }
171
172
    /**
173
     * Sets the priority of the AMQPMessage.
174
     *
175
     * @param AMQPMessage                $msg
176
     * @param \Dtc\QueueBundle\Model\Job $job
177
     */
178 8
    protected function setMsgPriority(AMQPMessage $msg, \Dtc\QueueBundle\Model\Job $job)
179
    {
180 8
        if (null !== $this->maxPriority) {
181 8
            $priority = $job->getPriority();
182 8
            $msg->set('priority', $priority);
183
        }
184 8
    }
185
186 8
    protected function calculatePriority($priority)
187
    {
188 8
        $priority = parent::calculatePriority($priority);
189 8
        if (null === $priority) {
190 7
            return 0;
191
        }
192
193 3
        return $priority;
194
    }
195
196
    /**
197
     * @param \Dtc\QueueBundle\Model\Job $job
198
     *
199
     * @throws PriorityException
200
     * @throws ClassNotSubclassException
201
     */
202 8 View Code Duplication
    protected function validateSaveable(\Dtc\QueueBundle\Model\Job $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
203
    {
204 8
        if (null !== $job->getPriority() && null === $this->maxPriority) {
205
            throw new PriorityException('This queue does not support priorities');
206
        }
207
208 8
        if (!$job instanceof Job) {
209
            throw new ClassNotSubclassException('Job needs to be instance of '.Job::class);
210
        }
211 8
    }
212
213
    /**
214
     * @param null $workerName
215
     * @param null $methodName
216
     * @param bool $prioritize
217
     *
218
     * @throws UnsupportedException
219
     */
220 7
    protected function verifyGetJobArgs($workerName = null, $methodName = null, $prioritize = true)
221
    {
222 7 View Code Duplication
        if (null !== $workerName || null !== $methodName || (null !== $this->maxPriority && true !== $prioritize)) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
223 2
            throw new UnsupportedException('Unsupported');
224
        }
225 6
    }
226
227
    /**
228
     * @param string $workerName
229
     *
230
     * @throws UnsupportedException
231
     * @throws ArgumentsNotSetException
232
     */
233 7
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
234
    {
235 7
        $this->verifyGetJobArgs($workerName, $methodName, $prioritize);
0 ignored issues
show
Bug introduced by
It seems like $workerName defined by parameter $workerName on line 233 can also be of type string; however, Dtc\QueueBundle\RabbitMQ...ger::verifyGetJobArgs() does only seem to accept null, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
236 6
        $this->setupChannel();
237
238
        do {
239 6
            $expiredJob = false;
240 6
            $job = $this->findJob($expiredJob, $runId);
241 6
        } while ($expiredJob);
242
243 6
        return $job;
244
    }
245
246
    /**
247
     * @param bool $expiredJob
248
     * @param $runId
249
     *
250
     * @return Job|null
251
     */
252 6
    protected function findJob(&$expiredJob, $runId)
253
    {
254 6
        $message = $this->channel->basic_get($this->queueArgs[0]);
255 6
        if ($message) {
256 6
            $job = new Job();
257 6
            $job->fromMessage($message->body);
258 6
            $job->setRunId($runId);
259
260 6
            if (($expiresAt = $job->getExpiresAt()) && $expiresAt->getTimestamp() < time()) {
261 1
                $expiredJob = true;
262 1
                $this->channel->basic_nack($message->delivery_info['delivery_tag']);
263 1
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_EXPIRED);
264
265 1
                return null;
266
            }
267 6
            $job->setDeliveryTag($message->delivery_info['delivery_tag']);
268
269 6
            return $job;
270
        }
271
272 3
        return null;
273
    }
274
275 2 View Code Duplication
    protected function resetJob(RetryableJob $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
276
    {
277 2
        if (!$job instanceof Job) {
278
            throw new \InvalidArgumentException('$job must be instance of '.Job::class);
279
        }
280 2
        $job->setStatus(BaseJob::STATUS_NEW);
281 2
        $job->setMessage(null);
282 2
        $job->setStartedAt(null);
283 2
        $job->setRetries($job->getRetries() + 1);
284 2
        $job->setUpdatedAt(Util::getMicrotimeDateTime());
285 2
        $this->publishJob($job);
286
287 2
        return true;
288
    }
289
290
    // Save History get called upon completion of the job
291 3
    protected function retryableSaveHistory(RetryableJob $job, $retry)
292
    {
293 3
        if (!$job instanceof Job) {
294
            throw new ClassNotSubclassException("Expected \Dtc\QueueBundle\RabbitMQ\Job, got ".get_class($job));
295
        }
296 3
        $deliveryTag = $job->getDeliveryTag();
297 3
        $this->channel->basic_ack($deliveryTag);
298
299 3
        return;
300
    }
301
302 2
    public function __destruct()
303
    {
304
        // There's some kind of problem trying to close the channel, otherwise we'd call $this->channel->close() at this point.
305 2
    }
306
}
307