RabbitMqQueue::pushRaw()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 9
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 0
Metric Value
dl 0
loc 9
ccs 0
cts 8
cp 0
rs 9.6666
c 0
b 0
f 0
cc 2
eloc 5
nc 2
nop 3
crap 6
1
<?php
2
3
namespace Ccovey\LaravelRabbitMQ;
4
5
use Ccovey\RabbitMQ\Consumer\ConsumableParameters;
6
use Ccovey\RabbitMQ\Consumer\ConsumerInterface;
7
use Ccovey\RabbitMQ\Producer\Message;
8
use Ccovey\RabbitMQ\Producer\ProducerInterface;
9
use DateTime;
10
use Illuminate\Queue\Queue;
11
use Illuminate\Contracts\Queue\Queue as QueueContract;
12
13
class RabbitMqQueue extends Queue implements QueueContract
14
{
15
    /**
16
     * @var ConsumerInterface
17
     */
18
    private $consumer;
19
20
    /**
21
     * @var ProducerInterface
22
     */
23
    private $producer;
24
25
    /**
26
     * @var array
27
     */
28
    private $config;
29
30
    public function __construct(
31
        ConsumerInterface $consumer,
32
        ProducerInterface $producer,
33
        array $config
34
    )
35
    {
36
        $this->consumer = $consumer;
37
        $this->producer = $producer;
38
        $this->config = $config;
39
    }
40
41
    /**
42
     * Push a new job onto the queue.
43
     *
44
     * @param  string $job
45
     * @param  mixed  $data
46
     * @param  string $queue
47
     *
48
     * @return mixed
49
     */
50
    public function push($job, $data = '', $queue = null)
51
    {
52
        $this->pushRaw($this->createPayload($job, $data), $this->getQueueName($queue));
53
    }
54
55
    /**
56
     * Push a raw payload onto the queue.
57
     *
58
     * @param  string $payload
59
     * @param  string $queue
60
     * @param  array  $options
61
     *
62
     * @return mixed
63
     */
64
    public function pushRaw($payload, $queue = null, array $options = [])
65
    {
66
        if (isset($options['delay'])) {
67
            $payload['scheduledAt'] = new DateTime(sprintf('+%s', $options['delay']));
68
        }
69
70
        $message = new Message(json_decode($payload, 1), $this->getQueueName($queue));
71
        $this->producer->publish($message);
72
    }
73
74
    /**
75
     * Push a new job onto the queue after a delay.
76
     *
77
     * @param  \DateTime|int $delay
78
     * @param  string        $job
79
     * @param  mixed         $data
80
     * @param  string        $queue
81
     *
82
     * @return mixed
83
     */
84
    public function later($delay, $job, $data = '', $queue = null)
85
    {
86
        $this->pushRaw($this->createPayload($job, $data), $this->getQueueName($queue), ['delay' => $delay]);
87
    }
88
89
    /**
90
     * Pop the next job off of the queue.
91
     *
92
     * @param  string $queue
93
     *
94
     * @return \Illuminate\Contracts\Queue\Job|null
95
     */
96
    public function pop($queue = null)
97
    {
98
        $params = new ConsumableParameters($this->getQueueName($queue));
99
        $message = $this->consumer->getMessage($params);
100
101
        if (!$message) {
102
            return null;
103
        }
104
105
        return new RabbitMqJob($this->container, $this->producer, $this->consumer, $message);
106
    }
107
108
    /**
109
     * Get the size of the queue.
110
     *
111
     * @param  string $queue
112
     *
113
     * @return int
114
     */
115
    public function size($queue = null)
116
    {
117
        return $this->consumer->getSize($this->getQueueName($queue));
118
    }
119
120
    private function getQueueName($queue)
121
    {
122
        return $queue ?: 'queueName';
123
    }
124
}
125