Completed
Push — master ( 8ffdf8...f67228 )
by Matthew
08:43
created

JobManager::validateSaveable()   A

Complexity

Conditions 4
Paths 3

Size

Total Lines 10
Code Lines 5

Duplication

Lines 10
Ratio 100 %

Code Coverage

Tests 4
CRAP Score 4.5923

Importance

Changes 0
Metric Value
dl 10
loc 10
ccs 4
cts 6
cp 0.6667
rs 9.2
c 0
b 0
f 0
cc 4
eloc 5
nc 3
nop 1
crap 4.5923
1
<?php
2
3
namespace Dtc\QueueBundle\RabbitMQ;
4
5
use Dtc\QueueBundle\Manager\SaveableTrait;
6
use Dtc\QueueBundle\Manager\VerifyTrait;
7
use Dtc\QueueBundle\Model\BaseJob;
8
use Dtc\QueueBundle\Manager\JobIdTrait;
9
use Dtc\QueueBundle\Model\RetryableJob;
10
use Dtc\QueueBundle\Model\JobTiming;
11
use Dtc\QueueBundle\Manager\PriorityJobManager;
12
use Dtc\QueueBundle\Exception\ArgumentsNotSetException;
13
use Dtc\QueueBundle\Exception\ClassNotSubclassException;
14
use Dtc\QueueBundle\Exception\PriorityException;
15
use Dtc\QueueBundle\Exception\UnsupportedException;
16
use Dtc\QueueBundle\Manager\RunManager;
17
use Dtc\QueueBundle\Manager\JobTimingManager;
18
use Dtc\QueueBundle\Util\Util;
19
use PhpAmqpLib\Channel\AMQPChannel;
20
use PhpAmqpLib\Connection\AbstractConnection;
21
use PhpAmqpLib\Message\AMQPMessage;
22
23
class JobManager extends PriorityJobManager
24
{
25
    use JobIdTrait;
26
    use VerifyTrait;
27
    use SaveableTrait;
28
29
    /** @var AMQPChannel */
30
    protected $channel;
31
32
    /** @var AbstractConnection */
33
    protected $connection;
34
    protected $queueArgs;
35
    protected $exchangeArgs;
36
37
    protected $channelSetup = false;
38
39
    protected $hostname;
40
    protected $pid;
41
42 2
    public function __construct(RunManager $runManager, JobTimingManager $jobTimingManager, $jobClass)
43
    {
44 2
        $this->hostname = gethostname() ?: '';
45 2
        $this->pid = getmypid();
46 2
        parent::__construct($runManager, $jobTimingManager, $jobClass);
47 2
    }
48
49
    /**
50
     * @param string $exchange
51
     * @param string $type
52
     * @param bool   $passive
53
     * @param bool   $durable
54
     * @param bool   $autoDelete
55
     */
56 1
    public function setExchangeArgs($exchange, $type, $passive, $durable, $autoDelete)
57
    {
58 1
        $this->exchangeArgs = [$exchange, $type, $passive, $durable, $autoDelete];
59 1
    }
60
61
    /**
62
     * @param string $queue
63
     * @param bool   $passive
64
     * @param bool   $durable
65
     * @param bool   $exclusive
66
     * @param bool   $autoDelete
67
     *
68
     * @throws PriorityException
69
     */
70 1
    public function setQueueArgs($queue, $passive, $durable, $exclusive, $autoDelete)
71
    {
72 1
        $arguments = [$queue, $passive, $durable, $exclusive, $autoDelete];
73
74 1
        $this->queueArgs = $arguments;
75 1
        if (!ctype_digit(strval($this->maxPriority))) {
76 1
            throw new PriorityException('Max Priority ('.$this->maxPriority.') needs to be a non-negative integer');
77
        }
78 1
        if (strval(intval($this->maxPriority)) !== strval($this->maxPriority)) {
79
            throw new PriorityException('Priority is higher than '.PHP_INT_MAX);
80
        }
81 1
    }
82
83 1
    public function setAMQPConnection(AbstractConnection $connection)
84
    {
85 1
        $this->connection = $connection;
86 1
        $this->channel = $connection->channel();
87 1
    }
88
89
    /**
90
     * @return AMQPChannel
91
     */
92 1
    public function getChannel()
93
    {
94 1
        return $this->channel;
95
    }
96
97
    /**
98
     * @throws ArgumentsNotSetException
99
     */
100 11
    protected function checkChannelArgs()
101
    {
102 11
        if (empty($this->queueArgs)) {
103 1
            throw new ArgumentsNotSetException(__METHOD__.': queue args need to be set via setQueueArgs(...)');
104
        }
105 11
        if (empty($this->exchangeArgs)) {
106 1
            throw new ArgumentsNotSetException(__METHOD__.': exchange args need to be set via setExchangeArgs(...)');
107
        }
108 11
    }
109
110 1
    protected function performChannelSetup()
111
    {
112 1
        call_user_func_array([$this->channel, 'exchange_declare'], $this->exchangeArgs);
113 1
        if ($this->maxPriority) {
114 1
            array_push($this->queueArgs, false);
115 1
            array_push($this->queueArgs, ['x-max-priority' => ['I', intval($this->maxPriority)]]);
116
        }
117 1
        call_user_func_array([$this->channel, 'queue_declare'], $this->queueArgs);
118 1
        $this->channel->queue_bind($this->queueArgs[0], $this->exchangeArgs[0]);
119 1
    }
120
121
    /**
122
     * @throws ArgumentsNotSetException
123
     */
124 11
    public function setupChannel()
125
    {
126 11
        $this->checkChannelArgs();
127
128 11
        if (!$this->channelSetup) {
129 1
            $this->performChannelSetup();
130 1
            $this->channelSetup = true;
131
        }
132 11
    }
133
134
    /**
135
     * @param \Dtc\QueueBundle\Model\Job $job
136
     *
137
     * @return \Dtc\QueueBundle\Model\Job
138
     *
139
     * @throws ClassNotSubclassException
140
     * @throws PriorityException
141
     * @throws ArgumentsNotSetException
142
     */
143 9
    public function prioritySave(\Dtc\QueueBundle\Model\Job $job)
144
    {
145 9
        if (!$job instanceof Job) {
146
            throw new ClassNotSubclassException('Must be derived from '.Job::class);
147
        }
148
149 9
        $this->setupChannel();
150
151 9
        $this->validateSaveable($job);
152 9
        $this->setJobId($job);
153
154 9
        $this->publishJob($job);
155
156 9
        return $job;
157
    }
158
159 9
    protected function publishJob(Job $job)
160
    {
161 9
        $msg = new AMQPMessage($job->toMessage());
162 9
        $this->setMsgPriority($msg, $job);
163
164 9
        $this->channel->basic_publish($msg, $this->exchangeArgs[0]);
165 9
    }
166
167
    /**
168
     * Sets the priority of the AMQPMessage.
169
     *
170
     * @param AMQPMessage                $msg
171
     * @param \Dtc\QueueBundle\Model\Job $job
172
     */
173 9
    protected function setMsgPriority(AMQPMessage $msg, \Dtc\QueueBundle\Model\Job $job)
174
    {
175 9
        if (null !== $this->maxPriority) {
176 9
            $priority = $job->getPriority();
177 9
            $msg->set('priority', $priority);
178
        }
179 9
    }
180
181 9
    protected function calculatePriority($priority)
182
    {
183 9
        $priority = parent::calculatePriority($priority);
184 9
        if (null === $priority) {
185 8
            return 0;
186
        }
187
188 3
        return $priority;
189
    }
190
191
    /**
192
     * @param string|null $workerName
193
     * @param string|null $methodName
194
     *
195
     * @throws UnsupportedException
196
     * @throws ArgumentsNotSetException
197
     */
198 9
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
199
    {
200 9
        $this->verifyGetJobArgs($workerName, $methodName, $prioritize);
201 8
        $this->setupChannel();
202
203
        do {
204 8
            $expiredJob = false;
205 8
            $job = $this->findJob($expiredJob, $runId);
206 8
        } while ($expiredJob);
207
208 8
        return $job;
209
    }
210
211
    /**
212
     * @param bool $expiredJob
213
     * @param $runId
214
     *
215
     * @return Job|null
216
     */
217 8
    protected function findJob(&$expiredJob, $runId)
218
    {
219 8
        $message = $this->channel->basic_get($this->queueArgs[0]);
220 8
        if ($message) {
221 8
            $job = new Job();
222 8
            $job->fromMessage($message->body);
223 8
            $job->setRunId($runId);
224
225 8
            if (($expiresAt = $job->getExpiresAt()) && $expiresAt->getTimestamp() < time()) {
226 1
                $expiredJob = true;
227 1
                $this->channel->basic_nack($message->delivery_info['delivery_tag']);
228 1
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_EXPIRED);
229
230 1
                return null;
231
            }
232 8
            $job->setDeliveryTag($message->delivery_info['delivery_tag']);
233
234 8
            return $job;
235
        }
236
237 6
        return null;
238
    }
239
240 2 View Code Duplication
    protected function resetJob(RetryableJob $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
241
    {
242 2
        if (!$job instanceof Job) {
243
            throw new \InvalidArgumentException('$job must be instance of '.Job::class);
244
        }
245 2
        $job->setStatus(BaseJob::STATUS_NEW);
246 2
        $job->setMessage(null);
247 2
        $job->setStartedAt(null);
248 2
        $job->setDeliveryTag(null);
249 2
        $job->setRetries($job->getRetries() + 1);
250 2
        $job->setUpdatedAt(Util::getMicrotimeDateTime());
251 2
        $this->publishJob($job);
252
253 2
        return true;
254
    }
255
256
    // Save History get called upon completion of the job
257 3
    protected function retryableSaveHistory(RetryableJob $job, $retry)
258
    {
259 3
        if (!$job instanceof Job) {
260
            throw new ClassNotSubclassException("Expected \Dtc\QueueBundle\RabbitMQ\Job, got ".get_class($job));
261
        }
262 3
        $deliveryTag = $job->getDeliveryTag();
263 3
        if (null !== $deliveryTag) {
264 3
            $this->channel->basic_ack($deliveryTag);
265
        }
266
267 3
        return;
268
    }
269
270 3
    public function getWaitingJobCount($workerName = null, $methodName = null)
271
    {
272 3
        $this->setupChannel();
273
274 3
        if ($workerName) {
275 1
            throw new UnsupportedException('Waiting Job Count by $workerName is not supported');
276
        }
277 3
        if ($methodName) {
278 1
            throw new UnsupportedException('Waiting Job Count by $methodName is not supported');
279
        }
280
281 3
        $count = call_user_func_array([$this->channel, 'queue_declare'], $this->queueArgs);
282
283 3
        return isset($count[1]) ? $count[1] : 0;
284
    }
285
286 2
    public function __destruct()
287
    {
288
        // There's some kind of problem trying to close the channel, otherwise we'd call $this->channel->close() at this point.
289 2
    }
290
}
291