Completed
Pull Request — master (#30)
by Matthew
23:29 queued 08:10
created

JobManager::getChannel()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 0
cts 2
cp 0
rs 10
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 0
crap 2
1
<?php
2
3
namespace Dtc\QueueBundle\RabbitMQ;
4
5
use Dtc\QueueBundle\Model\BaseJob;
6
use Dtc\QueueBundle\Manager\JobIdTrait;
7
use Dtc\QueueBundle\Model\RetryableJob;
8
use Dtc\QueueBundle\Model\JobTiming;
9
use Dtc\QueueBundle\Manager\PriorityJobManager;
10
use Dtc\QueueBundle\Exception\ArgumentsNotSetException;
11
use Dtc\QueueBundle\Exception\ClassNotSubclassException;
12
use Dtc\QueueBundle\Exception\PriorityException;
13
use Dtc\QueueBundle\Exception\UnsupportedException;
14
use Dtc\QueueBundle\Manager\RunManager;
15
use Dtc\QueueBundle\Manager\JobTimingManager;
16
use Dtc\QueueBundle\Util\Util;
17
use PhpAmqpLib\Channel\AMQPChannel;
18
use PhpAmqpLib\Connection\AbstractConnection;
19
use PhpAmqpLib\Message\AMQPMessage;
20
21
class JobManager extends PriorityJobManager
22
{
23
    use JobIdTrait;
24
25
    /** @var AMQPChannel */
26
    protected $channel;
27
28
    /** @var AbstractConnection */
29
    protected $connection;
30
    protected $queueArgs;
31
    protected $exchangeArgs;
32
33
    protected $channelSetup = false;
34
35
    protected $hostname;
36
    protected $pid;
37
38 2
    public function __construct(RunManager $runManager, JobTimingManager $jobTimingManager, $jobClass)
39
    {
40 2
        $this->hostname = gethostname() ?: '';
41 2
        $this->pid = getmypid();
42 2
        parent::__construct($runManager, $jobTimingManager, $jobClass);
43 2
    }
44
45
    /**
46
     * @param string $exchange
47
     * @param string $type
48
     * @param bool   $passive
49
     * @param bool   $durable
50
     * @param bool   $autoDelete
51
     */
52 1
    public function setExchangeArgs($exchange, $type, $passive, $durable, $autoDelete)
53
    {
54 1
        $this->exchangeArgs = [$exchange, $type, $passive, $durable, $autoDelete];
55 1
    }
56
57
    /**
58
     * @param string $queue
59
     * @param bool   $passive
60
     * @param bool   $durable
61
     * @param bool   $exclusive
62
     * @param bool   $autoDelete
63
     *
64
     * @throws PriorityException
65
     */
66 1
    public function setQueueArgs($queue, $passive, $durable, $exclusive, $autoDelete)
67
    {
68 1
        $arguments = [$queue, $passive, $durable, $exclusive, $autoDelete];
69
70 1
        $this->queueArgs = $arguments;
71 1
        if (!ctype_digit(strval($this->maxPriority))) {
72 1
            throw new PriorityException('Max Priority ('.$this->maxPriority.') needs to be a non-negative integer');
73
        }
74 1
        if (strval(intval($this->maxPriority)) !== strval($this->maxPriority)) {
75
            throw new PriorityException('Priority is higher than '.PHP_INT_MAX);
76
        }
77 1
    }
78
79 1
    public function setAMQPConnection(AbstractConnection $connection)
80
    {
81 1
        $this->connection = $connection;
82 1
        $this->channel = $connection->channel();
83 1
    }
84
85
    /**
86
     * @return AMQPChannel
87
     */
88
    public function getChannel()
89
    {
90
        return $this->channel;
91
    }
92
93
    /**
94
     * @throws ArgumentsNotSetException
95
     */
96 9
    protected function checkChannelArgs()
97
    {
98 9
        if (empty($this->queueArgs)) {
99 1
            throw new ArgumentsNotSetException(__METHOD__.': queue args need to be set via setQueueArgs(...)');
100
        }
101 9
        if (empty($this->exchangeArgs)) {
102 1
            throw new ArgumentsNotSetException(__METHOD__.': exchange args need to be set via setExchangeArgs(...)');
103
        }
104 9
    }
105
106 1
    protected function performChannelSetup()
107
    {
108 1
        call_user_func_array([$this->channel, 'exchange_declare'], $this->exchangeArgs);
109 1
        if ($this->maxPriority) {
110 1
            array_push($this->queueArgs, false);
111 1
            array_push($this->queueArgs, ['x-max-priority' => ['I', intval($this->maxPriority)]]);
112
        }
113 1
        call_user_func_array([$this->channel, 'queue_declare'], $this->queueArgs);
114 1
        $this->channel->queue_bind($this->queueArgs[0], $this->exchangeArgs[0]);
115 1
    }
116
117
    /**
118
     * @throws ArgumentsNotSetException
119
     */
120 9
    public function setupChannel()
121
    {
122 9
        $this->checkChannelArgs();
123
124 9
        if (!$this->channelSetup) {
125 1
            $this->performChannelSetup();
126 1
            $this->channelSetup = true;
127
        }
128 9
    }
129
130
    /**
131
     * @param \Dtc\QueueBundle\Model\Job $job
132
     *
133
     * @return \Dtc\QueueBundle\Model\Job
134
     *
135
     * @throws ClassNotSubclassException
136
     * @throws PriorityException
137
     * @throws ArgumentsNotSetException
138
     */
139 8
    public function prioritySave(\Dtc\QueueBundle\Model\Job $job)
140
    {
141 8
        if (!$job instanceof Job) {
142
            throw new ClassNotSubclassException('Must be derived from '.Job::class);
143
        }
144
145 8
        $this->setupChannel();
146
147 8
        $this->validateSaveable($job);
148 8
        $this->setJobId($job);
149
150 8
        $this->publishJob($job);
151
152 8
        return $job;
153
    }
154
155 8
    protected function publishJob(Job $job)
156
    {
157 8
        $msg = new AMQPMessage($job->toMessage());
158 8
        $this->setMsgPriority($msg, $job);
159
160 8
        $this->channel->basic_publish($msg, $this->exchangeArgs[0]);
161 8
    }
162
163
    /**
164
     * Sets the priority of the AMQPMessage.
165
     *
166
     * @param AMQPMessage                $msg
167
     * @param \Dtc\QueueBundle\Model\Job $job
168
     */
169 8
    protected function setMsgPriority(AMQPMessage $msg, \Dtc\QueueBundle\Model\Job $job)
170
    {
171 8
        if (null !== $this->maxPriority) {
172 8
            $priority = $job->getPriority();
173 8
            $msg->set('priority', $priority);
174
        }
175 8
    }
176
177 8
    protected function calculatePriority($priority)
178
    {
179 8
        $priority = parent::calculatePriority($priority);
180 8
        if (null === $priority) {
181 7
            return 0;
182
        }
183
184 3
        return $priority;
185
    }
186
187
    /**
188
     * @param \Dtc\QueueBundle\Model\Job $job
189
     *
190
     * @throws PriorityException
191
     * @throws ClassNotSubclassException
192
     */
193 8 View Code Duplication
    protected function validateSaveable(\Dtc\QueueBundle\Model\Job $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
194
    {
195 8
        if (null !== $job->getPriority() && null === $this->maxPriority) {
196
            throw new PriorityException('This queue does not support priorities');
197
        }
198
199 8
        if (!$job instanceof Job) {
200
            throw new ClassNotSubclassException('Job needs to be instance of '.Job::class);
201
        }
202 8
    }
203
204
    /**
205
     * @param string|null $workerName
206
     * @param string|null $methodName
207
     * @param bool        $prioritize
208
     *
209
     * @throws UnsupportedException
210
     */
211 7
    protected function verifyGetJobArgs($workerName = null, $methodName = null, $prioritize = true)
212
    {
213 7 View Code Duplication
        if (null !== $workerName || null !== $methodName || (null !== $this->maxPriority && true !== $prioritize)) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
214 2
            throw new UnsupportedException('Unsupported');
215
        }
216 6
    }
217
218
    /**
219
     * @param string|null $workerName
220
     * @param string|null $methodName
221
     *
222
     * @throws UnsupportedException
223
     * @throws ArgumentsNotSetException
224
     */
225 7
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
226
    {
227 7
        $this->verifyGetJobArgs($workerName, $methodName, $prioritize);
228 6
        $this->setupChannel();
229
230
        do {
231 6
            $expiredJob = false;
232 6
            $job = $this->findJob($expiredJob, $runId);
233 6
        } while ($expiredJob);
234
235 6
        return $job;
236
    }
237
238
    /**
239
     * @param bool $expiredJob
240
     * @param $runId
241
     *
242
     * @return Job|null
243
     */
244 6
    protected function findJob(&$expiredJob, $runId)
245
    {
246 6
        $message = $this->channel->basic_get($this->queueArgs[0]);
247 6
        if ($message) {
248 6
            $job = new Job();
249 6
            $job->fromMessage($message->body);
250 6
            $job->setRunId($runId);
251
252 6
            if (($expiresAt = $job->getExpiresAt()) && $expiresAt->getTimestamp() < time()) {
253 1
                $expiredJob = true;
254 1
                $this->channel->basic_nack($message->delivery_info['delivery_tag']);
255 1
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_EXPIRED);
256
257 1
                return null;
258
            }
259 6
            $job->setDeliveryTag($message->delivery_info['delivery_tag']);
260
261 6
            return $job;
262
        }
263
264 3
        return null;
265
    }
266
267 2 View Code Duplication
    protected function resetJob(RetryableJob $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
268
    {
269 2
        if (!$job instanceof Job) {
270
            throw new \InvalidArgumentException('$job must be instance of '.Job::class);
271
        }
272 2
        $job->setStatus(BaseJob::STATUS_NEW);
273 2
        $job->setMessage(null);
274 2
        $job->setStartedAt(null);
275 2
        $job->setRetries($job->getRetries() + 1);
276 2
        $job->setUpdatedAt(Util::getMicrotimeDateTime());
277 2
        $this->publishJob($job);
278
279 2
        return true;
280
    }
281
282
    // Save History get called upon completion of the job
283 3
    protected function retryableSaveHistory(RetryableJob $job, $retry)
284
    {
285 3
        if (!$job instanceof Job) {
286
            throw new ClassNotSubclassException("Expected \Dtc\QueueBundle\RabbitMQ\Job, got ".get_class($job));
287
        }
288 3
        $deliveryTag = $job->getDeliveryTag();
289 3
        $this->channel->basic_ack($deliveryTag);
290
291 3
        return;
292
    }
293
294 2
    public function __destruct()
295
    {
296
        // There's some kind of problem trying to close the channel, otherwise we'd call $this->channel->close() at this point.
297 2
    }
298
}
299