Passed
Pull Request — master (#57)
by Matthew
07:36
created

JobManager::setAMQPConnection()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 4
rs 10
c 0
b 0
f 0
ccs 3
cts 3
cp 1
cc 1
eloc 2
nc 1
nop 1
crap 1
1
<?php
2
3
namespace Dtc\QueueBundle\RabbitMQ;
4
5
use Dtc\QueueBundle\Manager\SaveableTrait;
6
use Dtc\QueueBundle\Manager\VerifyTrait;
7
use Dtc\QueueBundle\Model\BaseJob;
8
use Dtc\QueueBundle\Manager\JobIdTrait;
9
use Dtc\QueueBundle\Model\RetryableJob;
10
use Dtc\QueueBundle\Model\JobTiming;
11
use Dtc\QueueBundle\Manager\PriorityJobManager;
12
use Dtc\QueueBundle\Exception\ArgumentsNotSetException;
13
use Dtc\QueueBundle\Exception\ClassNotSubclassException;
14
use Dtc\QueueBundle\Exception\PriorityException;
15
use Dtc\QueueBundle\Exception\UnsupportedException;
16
use Dtc\QueueBundle\Manager\RunManager;
17
use Dtc\QueueBundle\Manager\JobTimingManager;
18
use Dtc\QueueBundle\Util\Util;
19
use PhpAmqpLib\Channel\AMQPChannel;
20
use PhpAmqpLib\Connection\AbstractConnection;
21
use PhpAmqpLib\Message\AMQPMessage;
22
23
class JobManager extends PriorityJobManager
24
{
25
    use JobIdTrait;
26
    use VerifyTrait;
27
    use SaveableTrait;
28
29
    /** @var AMQPChannel */
30
    protected $channel;
31
32
    /** @var AbstractConnection */
33
    protected $connection;
34
    protected $queueArgs;
35
    protected $exchangeArgs;
36
37
    protected $channelSetup = false;
38
39
    protected $hostname;
40
    protected $pid;
41
42 2
    public function __construct(RunManager $runManager, JobTimingManager $jobTimingManager, $jobClass)
43
    {
44 2
        $this->hostname = gethostname() ?: '';
45 2
        $this->pid = getmypid();
46 2
        parent::__construct($runManager, $jobTimingManager, $jobClass);
47 2
    }
48
49
    /**
50
     * @param string $exchange
51
     * @param string $type
52
     * @param bool   $passive
53
     * @param bool   $durable
54
     * @param bool   $autoDelete
55
     */
56 1
    public function setExchangeArgs($exchange, $type, $passive, $durable, $autoDelete)
57
    {
58 1
        $this->exchangeArgs = [$exchange, $type, $passive, $durable, $autoDelete];
59 1
    }
60
61
    /**
62
     * @param string $queue
63
     * @param bool   $passive
64
     * @param bool   $durable
65
     * @param bool   $exclusive
66
     * @param bool   $autoDelete
67
     *
68
     * @throws PriorityException
69
     */
70 1
    public function setQueueArgs($queue, $passive, $durable, $exclusive, $autoDelete)
71
    {
72 1
        $arguments = [$queue, $passive, $durable, $exclusive, $autoDelete];
73
74 1
        $this->queueArgs = $arguments;
75 1
        if (!ctype_digit(strval($this->maxPriority))) {
76 1
            throw new PriorityException('Max Priority ('.$this->maxPriority.') needs to be a non-negative integer');
77
        }
78 1
        if (strval(intval($this->maxPriority)) !== strval($this->maxPriority)) {
79
            throw new PriorityException('Priority is higher than '.PHP_INT_MAX);
80
        }
81 1
    }
82
83 1
    public function setAMQPConnection(AbstractConnection $connection)
84
    {
85 1
        $this->connection = $connection;
86 1
        $this->channel = $connection->channel();
87 1
    }
88
89
    /**
90
     * @return AMQPChannel
91
     */
92 1
    public function getChannel()
93
    {
94 1
        return $this->channel;
95
    }
96
97
    /**
98
     * @throws ArgumentsNotSetException
99
     */
100 11
    protected function checkChannelArgs()
101
    {
102 11
        if (empty($this->queueArgs)) {
103 1
            throw new ArgumentsNotSetException(__METHOD__.': queue args need to be set via setQueueArgs(...)');
104
        }
105 11
        if (empty($this->exchangeArgs)) {
106 1
            throw new ArgumentsNotSetException(__METHOD__.': exchange args need to be set via setExchangeArgs(...)');
107
        }
108 11
    }
109
110 1
    protected function performChannelSetup()
111
    {
112 1
        call_user_func_array([$this->channel, 'exchange_declare'], $this->exchangeArgs);
113 1
        if ($this->maxPriority) {
114 1
            array_push($this->queueArgs, false);
115 1
            array_push($this->queueArgs, ['x-max-priority' => ['I', intval($this->maxPriority)]]);
116
        }
117 1
        call_user_func_array([$this->channel, 'queue_declare'], $this->queueArgs);
118 1
        $this->channel->queue_bind($this->queueArgs[0], $this->exchangeArgs[0]);
119 1
    }
120
121
    /**
122
     * @throws ArgumentsNotSetException
123
     */
124 11
    public function setupChannel()
125
    {
126 11
        $this->checkChannelArgs();
127
128 11
        if (!$this->channelSetup) {
129 1
            $this->performChannelSetup();
130 1
            $this->channelSetup = true;
131
        }
132 11
    }
133
134
    /**
135
     * @param \Dtc\QueueBundle\Model\Job $job
136
     *
137
     * @return \Dtc\QueueBundle\Model\Job
138
     *
139
     * @throws ClassNotSubclassException
140
     * @throws PriorityException
141
     * @throws ArgumentsNotSetException
142
     */
143 9
    public function prioritySave(\Dtc\QueueBundle\Model\Job $job)
144
    {
145 9
        if (!$job instanceof Job) {
146
            throw new ClassNotSubclassException('Must be derived from '.Job::class);
147
        }
148
149 9
        $this->setupChannel();
150
151 9
        $this->validateSaveable($job);
152 9
        $this->setJobId($job);
153
154 9
        $this->publishJob($job);
155
156 9
        return $job;
157
    }
158
159 9
    protected function publishJob(Job $job)
160
    {
161 9
        $msg = new AMQPMessage($job->toMessage());
162 9
        $this->setMsgPriority($msg, $job);
163
164 9
        $this->channel->basic_publish($msg, $this->exchangeArgs[0]);
165 9
    }
166
167
    /**
168
     * Sets the priority of the AMQPMessage.
169
     *
170
     * @param AMQPMessage                $msg
171
     * @param \Dtc\QueueBundle\Model\Job $job
172
     */
173 9
    protected function setMsgPriority(AMQPMessage $msg, \Dtc\QueueBundle\Model\Job $job)
174
    {
175 9
        if (null !== $this->maxPriority) {
176 9
            $priority = $job->getPriority();
177 9
            $msg->set('priority', $priority);
178
        }
179 9
    }
180
181 9
    protected function calculatePriority($priority)
182
    {
183 9
        $priority = parent::calculatePriority($priority);
184 9
        if (null === $priority) {
185 8
            return 0;
186
        }
187
188 3
        return $priority;
189
    }
190
191
    /**
192
     * @param string|null $workerName
193
     * @param string|null $methodName
194
     *
195
     * @throws UnsupportedException
196
     * @throws ArgumentsNotSetException
197
     */
198 9
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
199
    {
200 9
        $this->verifyGetJobArgs($workerName, $methodName, $prioritize);
201 8
        $this->setupChannel();
202
203
        do {
204 8
            $expiredJob = false;
205 8
            $job = $this->findJob($expiredJob, $runId);
206 8
        } while ($expiredJob);
207
208 8
        return $job;
209
    }
210
211
    /**
212
     * @param bool $expiredJob
213
     * @param $runId
214
     *
215
     * @return Job|null
216
     */
217 8
    protected function findJob(&$expiredJob, $runId)
218
    {
219 8
        $message = $this->channel->basic_get($this->queueArgs[0]);
220 8
        if ($message) {
221 8
            $job = new Job();
222 8
            $job->fromMessage($message->body);
223 8
            $job->setRunId($runId);
224
225 8
            if (($expiresAt = $job->getExpiresAt()) && $expiresAt->getTimestamp() < time()) {
226 1
                $expiredJob = true;
227 1
                $this->channel->basic_nack($message->delivery_info['delivery_tag']);
228 1
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_EXPIRED);
229
230 1
                return null;
231
            }
232 8
            $job->setDeliveryTag($message->delivery_info['delivery_tag']);
233
234 8
            return $job;
235
        }
236
237 6
        return null;
238
    }
239
240 2
    protected function resetJob(RetryableJob $job)
241
    {
242 2
        if (!$job instanceof Job) {
243
            throw new \InvalidArgumentException('$job must be instance of '.Job::class);
244
        }
245 2
        $job->setStatus(BaseJob::STATUS_NEW);
246 2
        $job->setMessage(null);
247 2
        $job->setStartedAt(null);
248 2
        $job->setDeliveryTag(null);
249 2
        $job->setRetries($job->getRetries() + 1);
250 2
        $job->setUpdatedAt(Util::getMicrotimeDateTime());
251 2
        $this->publishJob($job);
252
253 2
        return true;
254
    }
255
256
    // Save History get called upon completion of the job
257 3
    protected function retryableSaveHistory(RetryableJob $job, $retry)
258
    {
259 3
        if (!$job instanceof Job) {
260
            throw new ClassNotSubclassException("Expected \Dtc\QueueBundle\RabbitMQ\Job, got ".get_class($job));
261
        }
262 3
        $deliveryTag = $job->getDeliveryTag();
263 3
        if (null !== $deliveryTag) {
0 ignored issues
show
introduced by
The condition null !== $deliveryTag can never be false.
Loading history...
264 3
            $this->channel->basic_ack($deliveryTag);
265
        }
266
267 3
        return;
268
    }
269
270 3
    public function getWaitingJobCount($workerName = null, $methodName = null)
271
    {
272 3
        $this->setupChannel();
273
274 3
        if ($workerName) {
275 1
            throw new UnsupportedException('Waiting Job Count by $workerName is not supported');
276
        }
277 3
        if ($methodName) {
278 1
            throw new UnsupportedException('Waiting Job Count by $methodName is not supported');
279
        }
280
281 3
        $count = call_user_func_array([$this->channel, 'queue_declare'], $this->queueArgs);
282
283 3
        return isset($count[1]) ? $count[1] : 0;
284
    }
285
286 2
    public function __destruct()
287
    {
288
        // There's some kind of problem trying to close the channel, otherwise we'd call $this->channel->close() at this point.
289 2
    }
290
}
291