Completed
Push — master ( 17631a...07adb9 )
by Matthew
05:11
created

JobManager::setupChannel()   B

Complexity

Conditions 5
Paths 5

Size

Total Lines 20
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 11.643

Importance

Changes 0
Metric Value
dl 0
loc 20
ccs 5
cts 14
cp 0.357
rs 8.8571
c 0
b 0
f 0
cc 5
eloc 13
nc 5
nop 0
crap 11.643
1
<?php
2
3
namespace Dtc\QueueBundle\RabbitMQ;
4
5
use Dtc\QueueBundle\Model\AbstractJobManager;
6
use PhpAmqpLib\Channel\AMQPChannel;
7
use PhpAmqpLib\Connection\AbstractConnection;
8
use PhpAmqpLib\Message\AMQPMessage;
9
10
class JobManager extends AbstractJobManager
11
{
12
    /** @var AMQPChannel */
13
    protected $channel;
14
15
    /** @var AbstractConnection */
16
    protected $connection;
17
    protected $queueArgs;
18
    protected $exchangeArgs;
19
20
    protected $channelSetup = false;
21
22
    protected $hostname;
23
    protected $pid;
24
    protected $maxPriority;
25
26
    public function __construct()
27
    {
28
        $this->hostname = gethostname() ?: '';
29
        $this->pid = getmypid();
30
    }
31
32
    /**
33
     * @param string $exchange
34
     * @param string $type
35
     * @param bool   $passive
36
     * @param bool   $durable
37
     * @param bool   $autoDelete
38
     */
39
    public function setExchangeArgs($exchange, $type, $passive, $durable, $autoDelete)
40
    {
41
        $this->exchangeArgs = [$exchange, $type, $passive, $durable, $autoDelete];
42
    }
43
44
    /**
45
     * @param string $queue
46
     * @param bool   $passive
47
     * @param bool   $durable
48
     * @param bool   $exclusive
49
     * @param bool   $autoDelete
50
     * @param int    $maxPriority
51
     */
52
    public function setQueueArgs($queue, $passive, $durable, $exclusive, $autoDelete, $maxPriority)
53
    {
54
        $arguments = [$queue, $passive, $durable, $exclusive, $autoDelete];
55
56
        $this->queueArgs = $arguments;
57
        if (!ctype_digit(strval($maxPriority))) {
58
            throw new \Exception('Max Priority needs to be a non-negative integer');
59
        }
60
        if (strval(intval($maxPriority)) !== strval($maxPriority)) {
61
            throw new \Exception('Priority is higher than '.PHP_INT_MAX);
62
        }
63
        $this->maxPriority = $maxPriority;
64
    }
65
66
    public function setAMQPConnection(AbstractConnection $connection)
67
    {
68
        $this->connection = $connection;
69
        $this->channel = $connection->channel();
70
    }
71
72
    /**
73
     * @return AMQPChannel
74
     */
75
    public function getChannel()
76
    {
77
        return $this->channel;
78
    }
79
80 5
    public function setupChannel()
81
    {
82 5
        if (empty($this->queueArgs)) {
83
            throw new \Exception(__METHOD__.': queue args need to be set via setQueueArgs(...)');
84
        }
85 5
        if (empty($this->exchangeArgs)) {
86
            throw new \Exception(__METHOD__.': exchange args need to be set via setExchangeArgs(...)');
87
        }
88
89 5
        if (!$this->channelSetup) {
90
            call_user_func_array([$this->channel, 'exchange_declare'], $this->exchangeArgs);
91
            if ($this->maxPriority) {
92
                array_push($this->queueArgs, false);
93
                array_push($this->queueArgs, ['x-max-priority' => ['I', intval($this->maxPriority)]]);
94
            }
95
            call_user_func_array([$this->channel, 'queue_declare'], $this->queueArgs);
96
            $this->channel->queue_bind($this->queueArgs[0], $this->exchangeArgs[0]);
97
            $this->channelSetup = true;
98
        }
99 5
    }
100
101 5
    public function save(\Dtc\QueueBundle\Model\Job $job)
102
    {
103 5
        $this->setupChannel();
104 5
        if (!$job->getId()) {
105 5
            $job->setId(uniqid($this->hostname.'-'.$this->pid, true));
106
        }
107
108 5
        if (null !== ($priority = $job->getPriority()) && !$this->maxPriority) {
109
            throw new \Exception('This queue does not support priorities');
110
        }
111
112 5
        if (!$job instanceof Job) {
113
            throw new \Exception('Job needs to be instance of '.Job::class);
114
        }
115
116 5
        $msg = new AMQPMessage($job->toMessage());
117
118 5
        if ($this->maxPriority) {
119 5
            $priority = null === $priority ? 0 : $this->maxPriority - $priority;
120 5
            $msg->set('priority', $priority);
121
        }
122 5
        $this->channel->basic_publish($msg, $this->exchangeArgs[0]);
123
124 5
        return $job;
125
    }
126
127
    /**
128
     * @param string $workerName
129
     */
130 5
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
131
    {
132 5
        if ($methodName) {
133
            throw new \Exception('Unsupported');
134
        }
135
136 5
        $this->setupChannel();
137
138 5
        $expiredJob = false;
139
        do {
140 5
            $message = $this->channel->basic_get($this->queueArgs[0]);
141 5
            if ($message) {
142 5
                $job = new Job();
143 5
                $job->fromMessage($message->body);
144 5
                $job->setRunId($runId);
145
146 5
                if (($expiresAt = $job->getExpiresAt()) && $expiresAt->getTimestamp() < time()) {
147
                    $expiredJob = true;
148
                    $this->channel->basic_nack($message->delivery_info['delivery_tag']);
149
                    continue;
150
                }
151 5
                $job->setDeliveryTag($message->delivery_info['delivery_tag']);
152
153 5
                return $job;
154
            }
155 1
        } while ($expiredJob);
156
157 1
        return null;
158
    }
159
160
    // Save History get called upon completion of the job
161
    public function saveHistory(\Dtc\QueueBundle\Model\Job $job)
162
    {
163
        if (!$job instanceof Job) {
164
            throw new \Exception("Expected \Dtc\QueueBundle\RabbitMQ\Job, got ".get_class($job));
165
        }
166
        $deliveryTag = $job->getDeliveryTag();
167
        $this->channel->basic_ack($deliveryTag);
168
169
        return;
170
    }
171
172 1
    public function __destruct()
173
    {
174 1
        $this->channel->close();
175 1
    }
176
}
177