Completed
Push — master ( 8a247e...4109b7 )
by Matthew
05:12
created

JobManager::calculatePriority()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 9
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 0
Metric Value
dl 0
loc 9
ccs 0
cts 8
cp 0
rs 9.6666
c 0
b 0
f 0
cc 2
eloc 5
nc 2
nop 1
crap 6
1
<?php
2
3
namespace Dtc\QueueBundle\RabbitMQ;
4
5
use Dtc\QueueBundle\Model\PriorityJobManager;
6
use Dtc\QueueBundle\Exception\ArgumentsNotSetException;
7
use Dtc\QueueBundle\Exception\ClassNotSubclassException;
8
use Dtc\QueueBundle\Exception\PriorityException;
9
use Dtc\QueueBundle\Exception\UnsupportedException;
10
use PhpAmqpLib\Channel\AMQPChannel;
11
use PhpAmqpLib\Connection\AbstractConnection;
12
use PhpAmqpLib\Message\AMQPMessage;
13
14
class JobManager extends PriorityJobManager
15
{
16
    /** @var AMQPChannel */
17
    protected $channel;
18
19
    /** @var AbstractConnection */
20
    protected $connection;
21
    protected $queueArgs;
22
    protected $exchangeArgs;
23
24
    protected $channelSetup = false;
25
26
    protected $hostname;
27
    protected $pid;
28
29
    public function __construct()
30
    {
31
        $this->hostname = gethostname() ?: '';
32
        $this->pid = getmypid();
33
    }
34
35
    /**
36
     * @param string $exchange
37
     * @param string $type
38
     * @param bool   $passive
39
     * @param bool   $durable
40
     * @param bool   $autoDelete
41
     */
42
    public function setExchangeArgs($exchange, $type, $passive, $durable, $autoDelete)
43
    {
44
        $this->exchangeArgs = [$exchange, $type, $passive, $durable, $autoDelete];
45
    }
46
47
    /**
48
     * @param string $queue
49
     * @param bool   $passive
50
     * @param bool   $durable
51
     * @param bool   $exclusive
52
     * @param bool   $autoDelete
53
     */
54
    public function setQueueArgs($queue, $passive, $durable, $exclusive, $autoDelete)
55
    {
56
        $arguments = [$queue, $passive, $durable, $exclusive, $autoDelete];
57
58
        $this->queueArgs = $arguments;
59
        if (!ctype_digit(strval($this->maxPriority))) {
60
            throw new PriorityException('Max Priority ('.$this->maxPriority.') needs to be a non-negative integer');
61
        }
62
        if (strval(intval($this->maxPriority)) !== strval($this->maxPriority)) {
63
            throw new PriorityException('Priority is higher than '.PHP_INT_MAX);
64
        }
65
    }
66
67
    public function setAMQPConnection(AbstractConnection $connection)
68
    {
69
        $this->connection = $connection;
70
        $this->channel = $connection->channel();
71
    }
72
73
    /**
74
     * @return AMQPChannel
75
     */
76
    public function getChannel()
77
    {
78
        return $this->channel;
79
    }
80
81
    protected function checkChannelArgs()
82
    {
83
        if (empty($this->queueArgs)) {
84
            throw new ArgumentsNotSetException(__METHOD__.': queue args need to be set via setQueueArgs(...)');
85
        }
86
        if (empty($this->exchangeArgs)) {
87
            throw new ArgumentsNotSetException(__METHOD__.': exchange args need to be set via setExchangeArgs(...)');
88
        }
89
    }
90
91
    protected function performChannelSetup()
92
    {
93
        call_user_func_array([$this->channel, 'exchange_declare'], $this->exchangeArgs);
94
        if ($this->maxPriority) {
95
            array_push($this->queueArgs, false);
96
            array_push($this->queueArgs, ['x-max-priority' => ['I', intval($this->maxPriority)]]);
97
        }
98
        call_user_func_array([$this->channel, 'queue_declare'], $this->queueArgs);
99
        $this->channel->queue_bind($this->queueArgs[0], $this->exchangeArgs[0]);
100
    }
101
102
    public function setupChannel()
103
    {
104
        $this->checkChannelArgs();
105
106
        if (!$this->channelSetup) {
107
            $this->performChannelSetup();
108
            $this->channelSetup = true;
109
        }
110
    }
111
112
    /**
113
     * @param \Dtc\QueueBundle\Model\Job $job
114
     *
115
     * @return \Dtc\QueueBundle\Model\Job
116
     *
117
     * @throws ClassNotSubclassException
118
     */
119
    public function prioritySave(\Dtc\QueueBundle\Model\Job $job)
120
    {
121
        if (!$job instanceof Job) {
122
            throw new ClassNotSubclassException('Must be derived from '.Job::class);
123
        }
124
125
        $this->setupChannel();
126
127
        $this->validateSaveable($job);
128
        $this->setJobId($job);
129
130
        $msg = new AMQPMessage($job->toMessage());
131
        $this->setMsgPriority($msg, $job);
132
133
        $this->channel->basic_publish($msg, $this->exchangeArgs[0]);
134
135
        return $job;
136
    }
137
138
    /**
139
     * Attach a unique id to a job since RabbitMQ will not.
140
     *
141
     * @param \Dtc\QueueBundle\Model\Job $job
142
     */
143
    protected function setJobId(\Dtc\QueueBundle\Model\Job $job)
144
    {
145
        if (!$job->getId()) {
146
            $job->setId(uniqid($this->hostname.'-'.$this->pid, true));
147
        }
148
    }
149
150
    /**
151
     * Sets the priority of the AMQPMessage.
152
     *
153
     * @param AMQPMessage                $msg
154
     * @param \Dtc\QueueBundle\Model\Job $job
155
     */
156
    protected function setMsgPriority(AMQPMessage $msg, \Dtc\QueueBundle\Model\Job $job)
157
    {
158
        if (null !== $this->maxPriority) {
159
            $priority = $job->getPriority();
160
            $msg->set('priority', $priority);
161
        }
162
    }
163
164
    protected function calculatePriority($priority)
165
    {
166
        $priority = parent::calculatePriority($priority);
167
        if (null === $priority) {
168
            return 0;
169
        }
170
171
        return $priority;
172
    }
173
174
    /**
175
     * @param \Dtc\QueueBundle\Model\Job $job
176
     *
177
     * @throws PriorityException
178
     * @throws ClassNotSubclassException
179
     */
180
    protected function validateSaveable(\Dtc\QueueBundle\Model\Job $job)
181
    {
182
        if (null !== $job->getPriority() && null === $this->maxPriority) {
183
            throw new PriorityException('This queue does not support priorities');
184
        }
185
186
        if (!$job instanceof Job) {
187
            throw new ClassNotSubclassException('Job needs to be instance of '.Job::class);
188
        }
189
    }
190
191
    protected function verifyGetJobArgs($workerName = null, $methodName = null, $prioritize = true)
192
    {
193
        if (null !== $workerName || null !== $methodName || true !== $prioritize) {
194
            throw new UnsupportedException('Unsupported');
195
        }
196
    }
197
198
    /**
199
     * @param string $workerName
200
     */
201
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
202
    {
203
        $this->verifyGetJobArgs($workerName, $methodName, $prioritize);
204
        $this->setupChannel();
205
206
        do {
207
            $expiredJob = false;
208
            $job = $this->findJob($expiredJob, $runId);
209
        } while ($expiredJob);
210
211
        return $job;
212
    }
213
214
    /**
215
     * @param bool $expiredJob
216
     * @param $runId
217
     *
218
     * @return Job|null
219
     */
220
    protected function findJob(&$expiredJob, $runId)
221
    {
222
        $message = $this->channel->basic_get($this->queueArgs[0]);
223
        if ($message) {
224
            $job = new Job();
225
            $job->fromMessage($message->body);
226
            $job->setRunId($runId);
227
228
            if (($expiresAt = $job->getExpiresAt()) && $expiresAt->getTimestamp() < time()) {
229
                $expiredJob = true;
230
                $this->channel->basic_nack($message->delivery_info['delivery_tag']);
231
232
                return null;
233
            }
234
            $job->setDeliveryTag($message->delivery_info['delivery_tag']);
235
236
            return $job;
237
        }
238
239
        return null;
240
    }
241
242
    // Save History get called upon completion of the job
243
    public function saveHistory(\Dtc\QueueBundle\Model\Job $job)
244
    {
245
        if (!$job instanceof Job) {
246
            throw new ClassNotSubclassException("Expected \Dtc\QueueBundle\RabbitMQ\Job, got ".get_class($job));
247
        }
248
        $deliveryTag = $job->getDeliveryTag();
249
        $this->channel->basic_ack($deliveryTag);
250
251
        return;
252
    }
253
254
    public function __destruct()
255
    {
256
        if (null !== $this->channel) {
257
            $this->channel->close();
258
        }
259
    }
260
}
261