Completed
Push — master ( 2db41f...aa6ae6 )
by Matthew
15:06 queued 36s
created

JobManager::getChannel()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 0
cts 2
cp 0
rs 10
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 0
crap 2
1
<?php
2
3
namespace Dtc\QueueBundle\RabbitMQ;
4
5
use Dtc\QueueBundle\Model\JobTiming;
6
use Dtc\QueueBundle\Model\PriorityJobManager;
7
use Dtc\QueueBundle\Exception\ArgumentsNotSetException;
8
use Dtc\QueueBundle\Exception\ClassNotSubclassException;
9
use Dtc\QueueBundle\Exception\PriorityException;
10
use Dtc\QueueBundle\Exception\UnsupportedException;
11
use Dtc\QueueBundle\Model\RunManager;
12
use Dtc\QueueBundle\Model\JobTimingManager;
13
use PhpAmqpLib\Channel\AMQPChannel;
14
use PhpAmqpLib\Connection\AbstractConnection;
15
use PhpAmqpLib\Message\AMQPMessage;
16
17
class JobManager extends PriorityJobManager
18
{
19
    /** @var AMQPChannel */
20
    protected $channel;
21
22
    /** @var AbstractConnection */
23
    protected $connection;
24
    protected $queueArgs;
25
    protected $exchangeArgs;
26
27
    protected $channelSetup = false;
28
29
    protected $hostname;
30
    protected $pid;
31
32 2
    public function __construct(RunManager $runManager, JobTimingManager $jobTimingManager, $jobClass)
33
    {
34 2
        $this->hostname = gethostname() ?: '';
35 2
        $this->pid = getmypid();
36 2
        parent::__construct($runManager, $jobTimingManager, $jobClass);
37 2
    }
38
39
    /**
40
     * @param string $exchange
41
     * @param string $type
42
     * @param bool   $passive
43
     * @param bool   $durable
44
     * @param bool   $autoDelete
45
     */
46 1
    public function setExchangeArgs($exchange, $type, $passive, $durable, $autoDelete)
47
    {
48 1
        $this->exchangeArgs = [$exchange, $type, $passive, $durable, $autoDelete];
49 1
    }
50
51
    /**
52
     * @param string $queue
53
     * @param bool   $passive
54
     * @param bool   $durable
55
     * @param bool   $exclusive
56
     * @param bool   $autoDelete
57
     */
58 1
    public function setQueueArgs($queue, $passive, $durable, $exclusive, $autoDelete)
59
    {
60 1
        $arguments = [$queue, $passive, $durable, $exclusive, $autoDelete];
61
62 1
        $this->queueArgs = $arguments;
63 1
        if (!ctype_digit(strval($this->maxPriority))) {
64 1
            throw new PriorityException('Max Priority ('.$this->maxPriority.') needs to be a non-negative integer');
65
        }
66 1
        if (strval(intval($this->maxPriority)) !== strval($this->maxPriority)) {
67
            throw new PriorityException('Priority is higher than '.PHP_INT_MAX);
68
        }
69 1
    }
70
71 1
    public function setAMQPConnection(AbstractConnection $connection)
72
    {
73 1
        $this->connection = $connection;
74 1
        $this->channel = $connection->channel();
75 1
    }
76
77
    /**
78
     * @return AMQPChannel
79
     */
80
    public function getChannel()
81
    {
82
        return $this->channel;
83
    }
84
85 7
    protected function checkChannelArgs()
86
    {
87 7
        if (empty($this->queueArgs)) {
88 1
            throw new ArgumentsNotSetException(__METHOD__.': queue args need to be set via setQueueArgs(...)');
89
        }
90 7
        if (empty($this->exchangeArgs)) {
91 1
            throw new ArgumentsNotSetException(__METHOD__.': exchange args need to be set via setExchangeArgs(...)');
92
        }
93 7
    }
94
95 1
    protected function performChannelSetup()
96
    {
97 1
        call_user_func_array([$this->channel, 'exchange_declare'], $this->exchangeArgs);
98 1
        if ($this->maxPriority) {
99 1
            array_push($this->queueArgs, false);
100 1
            array_push($this->queueArgs, ['x-max-priority' => ['I', intval($this->maxPriority)]]);
101
        }
102 1
        call_user_func_array([$this->channel, 'queue_declare'], $this->queueArgs);
103 1
        $this->channel->queue_bind($this->queueArgs[0], $this->exchangeArgs[0]);
104 1
    }
105
106 7
    public function setupChannel()
107
    {
108 7
        $this->checkChannelArgs();
109
110 7
        if (!$this->channelSetup) {
111 1
            $this->performChannelSetup();
112 1
            $this->channelSetup = true;
113
        }
114 7
    }
115
116
    /**
117
     * @param \Dtc\QueueBundle\Model\Job $job
118
     *
119
     * @return \Dtc\QueueBundle\Model\Job
120
     *
121
     * @throws ClassNotSubclassException
122
     */
123 6
    public function prioritySave(\Dtc\QueueBundle\Model\Job $job)
124
    {
125 6
        if (!$job instanceof Job) {
126 1
            throw new ClassNotSubclassException('Must be derived from '.Job::class);
127
        }
128
129 6
        $this->setupChannel();
130
131 6
        $this->validateSaveable($job);
132 6
        $this->setJobId($job);
133
134 6
        $msg = new AMQPMessage($job->toMessage());
135 6
        $this->setMsgPriority($msg, $job);
136
137 6
        $this->channel->basic_publish($msg, $this->exchangeArgs[0]);
138
139 6
        return $job;
140
    }
141
142
    /**
143
     * Attach a unique id to a job since RabbitMQ will not.
144
     *
145
     * @param \Dtc\QueueBundle\Model\Job $job
146
     */
147 6
    protected function setJobId(\Dtc\QueueBundle\Model\Job $job)
148
    {
149 6
        if (!$job->getId()) {
150 6
            $job->setId(uniqid($this->hostname.'-'.$this->pid, true));
151
        }
152 6
    }
153
154
    /**
155
     * Sets the priority of the AMQPMessage.
156
     *
157
     * @param AMQPMessage                $msg
158
     * @param \Dtc\QueueBundle\Model\Job $job
159
     */
160 6
    protected function setMsgPriority(AMQPMessage $msg, \Dtc\QueueBundle\Model\Job $job)
161
    {
162 6
        if (null !== $this->maxPriority) {
163 6
            $priority = $job->getPriority();
164 6
            $msg->set('priority', $priority);
165
        }
166 6
    }
167
168 6
    protected function calculatePriority($priority)
169
    {
170 6
        $priority = parent::calculatePriority($priority);
171 6
        if (null === $priority) {
172 5
            return 0;
173
        }
174
175 3
        return $priority;
176
    }
177
178
    /**
179
     * @param \Dtc\QueueBundle\Model\Job $job
180
     *
181
     * @throws PriorityException
182
     * @throws ClassNotSubclassException
183
     */
184 6
    protected function validateSaveable(\Dtc\QueueBundle\Model\Job $job)
185
    {
186 6
        if (null !== $job->getPriority() && null === $this->maxPriority) {
187
            throw new PriorityException('This queue does not support priorities');
188
        }
189
190 6
        if (!$job instanceof Job) {
191
            throw new ClassNotSubclassException('Job needs to be instance of '.Job::class);
192
        }
193 6
    }
194
195 7
    protected function verifyGetJobArgs($workerName = null, $methodName = null, $prioritize = true)
196
    {
197 7
        if (null !== $workerName || null !== $methodName || true !== $prioritize) {
198 2
            throw new UnsupportedException('Unsupported');
199
        }
200 6
    }
201
202
    /**
203
     * @param string $workerName
204
     */
205 7
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
206
    {
207 7
        $this->verifyGetJobArgs($workerName, $methodName, $prioritize);
208 6
        $this->setupChannel();
209
210
        do {
211 6
            $expiredJob = false;
212 6
            $job = $this->findJob($expiredJob, $runId);
213 6
        } while ($expiredJob);
214
215 6
        return $job;
216
    }
217
218
    /**
219
     * @param bool $expiredJob
220
     * @param $runId
221
     *
222
     * @return Job|null
223
     */
224 6
    protected function findJob(&$expiredJob, $runId)
225
    {
226 6
        $message = $this->channel->basic_get($this->queueArgs[0]);
227 6
        if ($message) {
228 6
            $job = new Job();
229 6
            $job->fromMessage($message->body);
230 6
            $job->setRunId($runId);
231
232 6
            if (($expiresAt = $job->getExpiresAt()) && $expiresAt->getTimestamp() < time()) {
233 1
                $expiredJob = true;
234 1
                $this->channel->basic_nack($message->delivery_info['delivery_tag']);
235 1
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_EXPIRED);
236
237 1
                return null;
238
            }
239 6
            $job->setDeliveryTag($message->delivery_info['delivery_tag']);
240
241 6
            return $job;
242
        }
243
244 3
        return null;
245
    }
246
247
    // Save History get called upon completion of the job
248 1
    public function saveHistory(\Dtc\QueueBundle\Model\Job $job)
249
    {
250 1
        if (!$job instanceof Job) {
251 1
            throw new ClassNotSubclassException("Expected \Dtc\QueueBundle\RabbitMQ\Job, got ".get_class($job));
252
        }
253 1
        $deliveryTag = $job->getDeliveryTag();
254 1
        $this->channel->basic_ack($deliveryTag);
255
256 1
        return;
257
    }
258
259 3
    public function __destruct()
260
    {
261 3
        if (null !== $this->channel) {
262 2
            $this->channel->close();
263
        }
264 3
    }
265
}
266