Completed
Push — master ( be2940...32ab89 )
by Matthew
05:55
created

JobManager::setQueueArgs()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 12
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 3.0175

Importance

Changes 1
Bugs 0 Features 0
Metric Value
dl 0
loc 12
ccs 7
cts 8
cp 0.875
rs 9.4285
c 1
b 0
f 0
cc 3
eloc 7
nc 3
nop 5
crap 3.0175
1
<?php
2
3
namespace Dtc\QueueBundle\RabbitMQ;
4
5
use Dtc\QueueBundle\Model\PriorityJobManager;
6
use PhpAmqpLib\Channel\AMQPChannel;
7
use PhpAmqpLib\Connection\AbstractConnection;
8
use PhpAmqpLib\Message\AMQPMessage;
9
10
class JobManager extends PriorityJobManager
11
{
12
    /** @var AMQPChannel */
13
    protected $channel;
14
15
    /** @var AbstractConnection */
16
    protected $connection;
17
    protected $queueArgs;
18
    protected $exchangeArgs;
19
20
    protected $channelSetup = false;
21
22
    protected $hostname;
23
    protected $pid;
24
25 2
    public function __construct()
26
    {
27 2
        $this->hostname = gethostname() ?: '';
28 2
        $this->pid = getmypid();
29 2
    }
30
31
    /**
32
     * @param string $exchange
33
     * @param string $type
34
     * @param bool   $passive
35
     * @param bool   $durable
36
     * @param bool   $autoDelete
37
     */
38 1
    public function setExchangeArgs($exchange, $type, $passive, $durable, $autoDelete)
39
    {
40 1
        $this->exchangeArgs = [$exchange, $type, $passive, $durable, $autoDelete];
41 1
    }
42
43
    /**
44
     * @param string $queue
45
     * @param bool   $passive
46
     * @param bool   $durable
47
     * @param bool   $exclusive
48
     * @param bool   $autoDelete
49
     * @param int    $maxPriority
0 ignored issues
show
Bug introduced by
There is no parameter named $maxPriority. Was it maybe removed?

This check looks for PHPDoc comments describing methods or function parameters that do not exist on the corresponding method or function.

Consider the following example. The parameter $italy is not defined by the method finale(...).

/**
 * @param array $germany
 * @param array $island
 * @param array $italy
 */
function finale($germany, $island) {
    return "2:1";
}

The most likely cause is that the parameter was removed, but the annotation was not.

Loading history...
50
     */
51 1
    public function setQueueArgs($queue, $passive, $durable, $exclusive, $autoDelete)
52
    {
53 1
        $arguments = [$queue, $passive, $durable, $exclusive, $autoDelete];
54
55 1
        $this->queueArgs = $arguments;
56 1
        if (!ctype_digit(strval($this->maxPriority))) {
57 1
            throw new \Exception('Max Priority ('.$this->maxPriority.') needs to be a non-negative integer');
58
        }
59 1
        if (strval(intval($this->maxPriority)) !== strval($this->maxPriority)) {
60
            throw new \Exception('Priority is higher than '.PHP_INT_MAX);
61
        }
62 1
    }
63
64 1
    public function setAMQPConnection(AbstractConnection $connection)
65
    {
66 1
        $this->connection = $connection;
67 1
        $this->channel = $connection->channel();
68 1
    }
69
70
    /**
71
     * @return AMQPChannel
72
     */
73
    public function getChannel()
74
    {
75
        return $this->channel;
76
    }
77
78 7
    public function setupChannel()
79
    {
80 7
        if (empty($this->queueArgs)) {
81 1
            throw new \Exception(__METHOD__.': queue args need to be set via setQueueArgs(...)');
82
        }
83 7
        if (empty($this->exchangeArgs)) {
84 1
            throw new \Exception(__METHOD__.': exchange args need to be set via setExchangeArgs(...)');
85
        }
86
87 7
        if (!$this->channelSetup) {
88 1
            call_user_func_array([$this->channel, 'exchange_declare'], $this->exchangeArgs);
89 1
            if ($this->maxPriority) {
90 1
                array_push($this->queueArgs, false);
91 1
                array_push($this->queueArgs, ['x-max-priority' => ['I', intval($this->maxPriority)]]);
92
            }
93 1
            call_user_func_array([$this->channel, 'queue_declare'], $this->queueArgs);
94 1
            $this->channel->queue_bind($this->queueArgs[0], $this->exchangeArgs[0]);
95 1
            $this->channelSetup = true;
96
        }
97 7
    }
98
99
    /**
100
     * @param \Dtc\QueueBundle\Model\Job $job
101
     *
102
     * @return \Dtc\QueueBundle\Model\Job
103
     *
104
     * @throws \Exception
105
     */
106 6
    public function prioritySave(\Dtc\QueueBundle\Model\Job $job)
107
    {
108 6
        if (!$job instanceof Job) {
109 1
            throw new \Exception('Must be derived from '.Job::class);
110
        }
111
112 6
        $this->setupChannel();
113
114 6
        $this->validateSaveable($job);
115 6
        $this->setJobId($job);
116
117 6
        $msg = new AMQPMessage($job->toMessage());
118 6
        $this->setMsgPriority($msg, $job);
119
120 6
        $this->channel->basic_publish($msg, $this->exchangeArgs[0]);
121
122 6
        return $job;
123
    }
124
125
    /**
126
     * Attach a unique id to a job since RabbitMQ will not.
127
     *
128
     * @param \Dtc\QueueBundle\Model\Job $job
129
     */
130 6
    protected function setJobId(\Dtc\QueueBundle\Model\Job $job)
131
    {
132 6
        if (!$job->getId()) {
133 6
            $job->setId(uniqid($this->hostname.'-'.$this->pid, true));
134
        }
135 6
    }
136
137
    /**
138
     * Sets the priority of the AMQPMessage.
139
     *
140
     * @param AMQPMessage                $msg
141
     * @param \Dtc\QueueBundle\Model\Job $job
142
     */
143 6
    protected function setMsgPriority(AMQPMessage $msg, \Dtc\QueueBundle\Model\Job $job)
144
    {
145 6
        if (null !== $this->maxPriority) {
146 6
            $priority = $job->getPriority();
147 6
            $msg->set('priority', $priority);
148
        }
149 6
    }
150
151 6
    protected function calculatePriority($priority)
152
    {
153 6
        $priority = parent::calculatePriority($priority);
154 6
        if (null === $priority) {
155 5
            return 0;
156
        }
157
158 3
        return $priority;
159
    }
160
161
    /**
162
     * @param \Dtc\QueueBundle\Model\Job $job
163
     *
164
     * @throws \Exception
165
     */
166 6
    protected function validateSaveable(\Dtc\QueueBundle\Model\Job $job)
167
    {
168 6
        if (null !== $job->getPriority() && null === $this->maxPriority) {
169
            throw new \Exception('This queue does not support priorities');
170
        }
171
172 6
        if (!$job instanceof Job) {
173
            throw new \Exception('Job needs to be instance of '.Job::class);
174
        }
175 6
    }
176
177
    /**
178
     * @param string $workerName
179
     */
180 7
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
181
    {
182 7
        if (null !== $workerName || null !== $methodName || true !== $prioritize) {
183 2
            throw new \Exception('Unsupported');
184
        }
185
186 6
        $this->setupChannel();
187
188
        do {
189 6
            $expiredJob = false;
190 6
            $job = $this->findJob($expiredJob, $runId);
191 6
        } while ($expiredJob);
192
193 6
        return $job;
194
    }
195
196
    /**
197
     * @param bool $expiredJob
198
     * @param $runId
199
     *
200
     * @return Job|null
201
     */
202 6
    protected function findJob(&$expiredJob, $runId)
203
    {
204 6
        $message = $this->channel->basic_get($this->queueArgs[0]);
205 6
        if ($message) {
206 6
            $job = new Job();
207 6
            $job->fromMessage($message->body);
208 6
            $job->setRunId($runId);
209
210 6
            if (($expiresAt = $job->getExpiresAt()) && $expiresAt->getTimestamp() < time()) {
211 1
                $expiredJob = true;
212 1
                $this->channel->basic_nack($message->delivery_info['delivery_tag']);
213
214 1
                return null;
215
            }
216 6
            $job->setDeliveryTag($message->delivery_info['delivery_tag']);
217
218 6
            return $job;
219
        }
220
221 3
        return null;
222
    }
223
224
    // Save History get called upon completion of the job
225 1
    public function saveHistory(\Dtc\QueueBundle\Model\Job $job)
226
    {
227 1
        if (!$job instanceof Job) {
228 1
            throw new \Exception("Expected \Dtc\QueueBundle\RabbitMQ\Job, got ".get_class($job));
229
        }
230 1
        $deliveryTag = $job->getDeliveryTag();
231 1
        $this->channel->basic_ack($deliveryTag);
232
233 1
        return;
234
    }
235
236 3
    public function __destruct()
237
    {
238 3
        if (null !== $this->channel) {
239 2
            $this->channel->close();
240
        }
241 3
    }
242
}
243