Completed
Push — master ( 0b0f4b...0669e2 )
by Matthew
03:15
created

JobManager::__construct()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 5
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 5
ccs 4
cts 4
cp 1
rs 9.4285
cc 2
eloc 3
nc 2
nop 0
crap 2
1
<?php
2
3
namespace Dtc\QueueBundle\RabbitMQ;
4
5
use Dtc\QueueBundle\Model\AbstractJobManager;
6
use PhpAmqpLib\Channel\AMQPChannel;
7
use PhpAmqpLib\Connection\AbstractConnection;
8
use PhpAmqpLib\Message\AMQPMessage;
9
10
class JobManager extends AbstractJobManager
11
{
12
    /** @var AMQPChannel */
13
    protected $channel;
14
15
    /** @var AbstractConnection */
16
    protected $connection;
17
    protected $queueArgs;
18
    protected $exchangeArgs;
19
20
    protected $channelSetup = false;
21
22
    protected $hostname;
23
    protected $pid;
24
    protected $maxPriority;
25
26 2
    public function __construct()
27
    {
28 2
        $this->hostname = gethostname() ?: '';
29 2
        $this->pid = getmypid();
30 2
    }
31
32
    /**
33
     * @param string $exchange
34
     * @param string $type
35
     * @param bool   $passive
36
     * @param bool   $durable
37
     * @param bool   $autoDelete
38
     */
39 1
    public function setExchangeArgs($exchange, $type, $passive, $durable, $autoDelete)
40
    {
41 1
        $this->exchangeArgs = [$exchange, $type, $passive, $durable, $autoDelete];
42 1
    }
43
44
    /**
45
     * @param string $queue
46
     * @param bool   $passive
47
     * @param bool   $durable
48
     * @param bool   $exclusive
49
     * @param bool   $autoDelete
50
     * @param int    $maxPriority
51
     */
52 1
    public function setQueueArgs($queue, $passive, $durable, $exclusive, $autoDelete, $maxPriority)
53
    {
54 1
        $arguments = [$queue, $passive, $durable, $exclusive, $autoDelete];
55
56 1
        $this->queueArgs = $arguments;
57 1
        if (!ctype_digit(strval($maxPriority))) {
58 1
            throw new \Exception('Max Priority needs to be a non-negative integer');
59
        }
60 1
        if (strval(intval($maxPriority)) !== strval($maxPriority)) {
61 1
            throw new \Exception('Priority is higher than '.PHP_INT_MAX);
62
        }
63 1
        $this->maxPriority = $maxPriority;
64 1
    }
65
66 1
    public function setAMQPConnection(AbstractConnection $connection)
67
    {
68 1
        $this->connection = $connection;
69 1
        $this->channel = $connection->channel();
70 1
    }
71
72
    /**
73
     * @return AMQPChannel
74
     */
75
    public function getChannel()
76
    {
77
        return $this->channel;
78
    }
79
80 6
    public function setupChannel()
81
    {
82 6
        if (empty($this->queueArgs)) {
83 1
            throw new \Exception(__METHOD__.': queue args need to be set via setQueueArgs(...)');
84
        }
85 6
        if (empty($this->exchangeArgs)) {
86 1
            throw new \Exception(__METHOD__.': exchange args need to be set via setExchangeArgs(...)');
87
        }
88
89 6
        if (!$this->channelSetup) {
90 1
            call_user_func_array([$this->channel, 'exchange_declare'], $this->exchangeArgs);
91 1
            if ($this->maxPriority) {
92 1
                array_push($this->queueArgs, false);
93 1
                array_push($this->queueArgs, ['x-max-priority' => ['I', intval($this->maxPriority)]]);
94
            }
95 1
            call_user_func_array([$this->channel, 'queue_declare'], $this->queueArgs);
96 1
            $this->channel->queue_bind($this->queueArgs[0], $this->exchangeArgs[0]);
97 1
            $this->channelSetup = true;
98
        }
99 6
    }
100
101 5
    /**
102
     * @param \Dtc\QueueBundle\Model\Job $job
103 5
     *
104 5
     * @return \Dtc\QueueBundle\Model\Job
105 5
     *
106
     * @throws \Exception
107
     */
108 5
    public function save(\Dtc\QueueBundle\Model\Job $job)
109
    {
110
        if (!$job instanceof Job) {
111
            throw new \Exception('Must be derived from '.Job::class);
112 5
        }
113
114
        $this->setupChannel();
115
116 5
        $this->validateSaveable($job);
117
        $this->setJobId($job);
118 5
119 5
        $msg = new AMQPMessage($job->toMessage());
120 5
        $this->setMsgPriority($msg, $job);
121
122 5
        $this->channel->basic_publish($msg, $this->exchangeArgs[0]);
123
124 5
        return $job;
125
    }
126
127
    /**
128
     * Attach a unique id to a job since RabbitMQ will not.
129
     *
130 6
     * @param \Dtc\QueueBundle\Model\Job $job
131
     */
132 6
    protected function setJobId(\Dtc\QueueBundle\Model\Job $job)
133 2
    {
134
        if (!$job->getId()) {
135
            $job->setId(uniqid($this->hostname.'-'.$this->pid, true));
136 5
        }
137
    }
138 5
139
    /**
140 5
     * Sets the priority of the AMQPMessage.
141 5
     *
142 5
     * @param AMQPMessage                $msg
143
     * @param \Dtc\QueueBundle\Model\Job $job
144 5
     */
145
    protected function setMsgPriority(AMQPMessage $msg, \Dtc\QueueBundle\Model\Job $job)
146
    {
147
        if ($this->maxPriority) {
148
            $priority = $job->getPriority();
149
            if ($priority > $this->maxPriority) {
150
                throw new \Exception('Priority must be lower than '.$this->maxPriority);
151
            }
152
153 5
            $priority = null === $priority ? 0 : $this->maxPriority - $priority;
154
155 5
            $msg->set('priority', $priority);
156 5
        }
157 5
    }
158 5
159 5
    /**
160
     * @param \Dtc\QueueBundle\Model\Job $job
161 5
     *
162 1
     * @throws \Exception
163 1
     */
164
    protected function validateSaveable(\Dtc\QueueBundle\Model\Job $job)
165 1
    {
166
        if (null !== $job->getPriority() && !$this->maxPriority) {
167 5
            throw new \Exception('This queue does not support priorities');
168
        }
169 5
170
        if (!$job instanceof Job) {
171
            throw new \Exception('Job needs to be instance of '.Job::class);
172 2
        }
173
    }
174
175
    /**
176 1
     * @param string $workerName
177
     */
178 1
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
179 1
    {
180
        if (null !== $workerName || null !== $methodName || true !== $prioritize) {
181 1
            throw new \Exception('Unsupported');
182 1
        }
183
184 1
        $this->setupChannel();
185
186
        $job = null;
0 ignored issues
show
Unused Code introduced by
$job is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
187 3
        do {
188
            $expiredJob = false;
189 3
            $job = $this->findJob($expiredJob, $runId);
190 2
        } while ($expiredJob);
191
192 3
        return $job;
193
    }
194
195
    /**
196
     * @param bool $expiredJob
197
     * @param $runId
198
     *
199
     * @return Job|null
200
     */
201
    protected function findJob(&$expiredJob, $runId)
202
    {
203
        $message = $this->channel->basic_get($this->queueArgs[0]);
204
        if ($message) {
205
            $job = new Job();
206
            $job->fromMessage($message->body);
207
            $job->setRunId($runId);
208
209
            if (($expiresAt = $job->getExpiresAt()) && $expiresAt->getTimestamp() < time()) {
210
                $expiredJob = true;
211
                $this->channel->basic_nack($message->delivery_info['delivery_tag']);
212
213
                return null;
214
            }
215
            $job->setDeliveryTag($message->delivery_info['delivery_tag']);
216
217
            return $job;
218
        }
219
220
        return null;
221
    }
222
223
    // Save History get called upon completion of the job
224
    public function saveHistory(\Dtc\QueueBundle\Model\Job $job)
225
    {
226
        if (!$job instanceof Job) {
227
            throw new \Exception("Expected \Dtc\QueueBundle\RabbitMQ\Job, got ".get_class($job));
228
        }
229
        $deliveryTag = $job->getDeliveryTag();
230
        $this->channel->basic_ack($deliveryTag);
231
232
        return;
233
    }
234
235
    public function __destruct()
236
    {
237
        if (null !== $this->channel) {
238
            $this->channel->close();
239
        }
240
    }
241
}
242