RabbitMQQueue::pop()   A
last analyzed

Complexity

Conditions 2
Paths 1

Size

Total Lines 5
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 3
nc 1
nop 1
dl 0
loc 5
rs 10
c 0
b 0
f 0
1
<?php
2
3
namespace Noitran\Lumen\Horizon;
4
5
use Noitran\Lumen\Horizon\Jobs\RabbitMQJob;
6
use Illuminate\Contracts\Events\Dispatcher;
7
use Illuminate\Support\Str;
8
use Laravel\Horizon\Events\JobDeleted;
9
use Laravel\Horizon\Events\JobPushed;
10
use Laravel\Horizon\Events\JobReserved;
11
use Laravel\Horizon\JobId;
12
use Laravel\Horizon\JobPayload;
13
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue as BaseQueue;
14
15
class RabbitMQQueue extends BaseQueue
16
{
17
    /**
18
     * The job that last pushed to queue via the "push" method.
19
     *
20
     * @var object|string
21
     */
22
    protected $lastPushed;
23
24
    /**
25
     * Get the number of queue jobs that are ready to process.
26
     *
27
     * @param  string|null  $queue
28
     *
29
     * @return int
30
     */
31
    public function readyNow($queue = null): int
32
    {
33
        return $this->size($queue);
34
    }
35
36
    /**
37
     * {@inheritdoc}
38
     */
39
    public function push($job, $data = '', $queue = null)
40
    {
41
        $this->lastPushed = $job;
42
43
        return parent::push($job, $data, $queue);
44
    }
45
46
    /**
47
     * {@inheritdoc}
48
     */
49
    public function pushRaw($payload, $queue = null, array $options = [])
50
    {
51
        $payload = (new JobPayload($payload))->prepare($this->lastPushed)->value;
52
53
        return tap(parent::pushRaw($payload, $queue, $options), function () use ($queue, $payload): void {
54
            $this->event($this->getQueue($queue), new JobPushed($payload));
55
        });
56
    }
57
58
    /**
59
     * {@inheritdoc}
60
     */
61
    public function later($delay, $job, $data = '', $queue = null)
62
    {
63
        $payload = (new JobPayload($this->createPayload($job, $data)))->prepare($job)->value;
64
65
        $options = [
66
            'delay' => $this->secondsUntil($delay),
67
        ];
68
69
        return tap(parent::pushRaw($payload, $queue, $options), function () use ($payload, $queue): void {
70
            $this->event($this->getQueue($queue), new JobPushed($payload));
71
        });
72
    }
73
74
    /**
75
     * {@inheritdoc}
76
     */
77
    protected function popRaw($queueName = null): ?RabbitMQJob
78
    {
79
        try {
80
            [$queue] = $this->declareEverything($queueName);
81
82
            $consumer = $this->getContext()->createConsumer($queue);
83
84
            if ($message = $consumer->receiveNoWait()) {
85
                return new RabbitMQJob($this->container, $this, $consumer, $message);
86
            }
87
        } catch (\Exception $exception) {
88
            $this->reportConnectionError('pop', $exception);
89
        }
90
    }
91
92
    /**
93
     * {@inheritdoc}
94
     */
95
    public function pop($queue = null)
96
    {
97
        return tap($this->popRaw($queue), function ($result) use ($queue): void {
98
            if ($result) {
99
                $this->event($this->getQueue($queue), new JobReserved($result->getRawBody()));
100
            }
101
        });
102
    }
103
104
    /**
105
     * {@inheritdoc}
106
     */
107
    public function release($delay, $job, $data, $queue, $attempts = 0)
108
    {
109
        $this->lastPushed = $job;
110
111
        return parent::release($delay, $job, $data, $queue, $attempts);
112
    }
113
114
    /**
115
     * Fire the given event if a dispatcher is bound.
116
     *
117
     * @param  string  $queue
118
     * @param  mixed  $event
119
     *
120
     * @return void
121
     */
122
    protected function event($queue, $event): void
123
    {
124
        if ($this->container && $this->container->bound(Dispatcher::class)) {
125
            $queue = Str::replaceFirst('queues:', '', $queue);
126
127
            $this->container->make(Dispatcher::class)->dispatch(
128
                $event->connection($this->getConnectionName())->queue($queue)
129
            );
130
        }
131
    }
132
133
    /**
134
     * Create a payload string from the given job and data.
135
     *
136
     * @param  string  $job
137
     * @param  string  $queue
138
     * @param  mixed   $data
139
     *
140
     * @return string
141
     */
142
    protected function createPayloadArray($job, $queue, $data = ''): string
143
    {
144
        return array_merge(parent::createPayloadArray($job, $queue, $data), [
0 ignored issues
show
Bug Best Practice introduced by
The expression return array_merge(paren...Id(), 'attempts' => 0)) returns the type array which is incompatible with the type-hinted return string.
Loading history...
145
            'id' => $this->getRandomId(),
146
            'attempts' => 0,
147
        ]);
148
    }
149
150
    /**
151
     * Fire the job deleted event.
152
     *
153
     * @param  string  $queue
154
     * @param  \Noitran\Lumen\Horizon\Jobs\RabbitMQJob  $job
155
     *
156
     * @return void
157
     */
158
    public function deleteReserved($queue, $job): void
159
    {
160
        $this->event($this->getQueue($queue), new JobDeleted($job, $job->getRawBody()));
161
    }
162
163
    /**
164
     * Get the queue name.
165
     *
166
     * @param string|null $queue
167
     *
168
     * @return string
169
     */
170
    protected function getQueue($queue = null): string
171
    {
172
        return $queue ?: $this->queueName;
173
    }
174
175
    /**
176
     * Get a random ID string.
177
     *
178
     * @return string
179
     */
180
    protected function getRandomId(): string
181
    {
182
        return JobId::generate();
183
    }
184
}
185