Test Failed
Branch add-core-tests (42298d)
by Rafael
10:47
created

Queue   A

Complexity

Total Complexity 3

Size/Duplication

Total Lines 74
Duplicated Lines 0 %

Test Coverage

Coverage 0%

Importance

Changes 2
Bugs 0 Features 0
Metric Value
eloc 20
c 2
b 0
f 0
dl 0
loc 74
ccs 0
cts 23
cp 0
rs 10
wmc 3

2 Methods

Rating   Name   Duplication   Size   Complexity  
A process() 0 25 2
A send() 0 23 1
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Canvas\Queue;
6
7
use PhpAmqpLib\Message\AMQPMessage;
8
use Phalcon\Di;
9
10
class Queue
11
{
12
    /**
13
     * default canvas queues system name
14
     */
15
    const EVENTS = 'events';
16
    const NOTIFICATIONS = 'notifications';
17
    const JOBS = 'jobs';
18
19
    /**
20
     * Send a msg to Queue
21
     *
22
     * @param string $name
23
     * @param array|object|mixed $msg
24
     * @return bool
25
     */
26
    public static function send(string $name, $msg): bool
27
    {
28
        $queue = Di::getDefault()->get('queue');
29
30
        $channel = $queue->channel();
31
32
        /*
33
            name: $queueName
34
            passive: false
35
            durable: true // the queue will survive server restarts
36
            exclusive: false // the queue can be accessed in other channels
37
            auto_delete: false //the queue won't be deleted once the channel is closed.
38
        */
39
        $channel->queue_declare($name, false, true, false, false);
40
41
        $msg = new AMQPMessage($msg, [
42
            'delivery_mode' => 2
43
        ]);
44
45
        $channel->basic_publish($msg, '', $name);
46
        $channel->close();
47
48
        return true;
49
    }
50
51
52
    /**
53
     * Process a specify queue
54
     *
55
     * @param string $queueName
56
     * @param callable $callback
57
     * @return void
58
     */
59
    public static function process(string $queueName, callable $callback): void
60
    {
61
        $queue = Di::getDefault()->get('queue');
62
63
        $channel = $queue->channel();
64
        
65
        $channel->queue_declare($queueName, false, true, false, false);
66
67
        /*
68
            queueName: Queue from where to get the messages
69
            consumer_tag: Consumer identifier
70
            no_local: Don't receive messages published by this consumer.
71
            no_ack: If set to true, automatic acknowledgement mode will be used by this consumer. See https://www.rabbitmq.com/confirms.html for details.
72
            exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
73
            nowait:
74
            callback: A PHP Callback
75
        */
76
        $channel->basic_consume($queueName, '', false, true, false, false, $callback);
77
78
        while ($channel->is_consuming()) {
79
            $channel->wait();
80
        }
81
82
        $channel->close();
83
        $queue->close();
84
    }
85
}
86