Queue::send()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 23
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
cc 1
eloc 8
nc 1
nop 2
dl 0
loc 23
ccs 0
cts 11
cp 0
crap 2
rs 10
c 0
b 0
f 0
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Canvas\Queue;
6
7
use PhpAmqpLib\Message\AMQPMessage;
0 ignored issues
show
Bug introduced by
The type PhpAmqpLib\Message\AMQPMessage was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
8
use Phalcon\Di;
9
10
class Queue
11
{
12
    /**
13
     * default canvas queues system name.
14
     */
15
    const EVENTS = 'events';
16
    const NOTIFICATIONS = 'notifications';
17
    const JOBS = 'jobs';
18
19
    /**
20
     * Send a msg to Queue.
21
     *
22
     * @param string $name
23
     * @param array|object|mixed $msg
24
     * @return bool
25
     */
26
    public static function send(string $name, $msg): bool
27
    {
28
        $queue = Di::getDefault()->get('queue');
29
30
        $channel = $queue->channel();
31
32
        /*
33
            name: $queueName
34
            passive: false
35
            durable: true // the queue will survive server restarts
36
            exclusive: false // the queue can be accessed in other channels
37
            auto_delete: false //the queue won't be deleted once the channel is closed.
38
        */
39
        $channel->queue_declare($name, false, true, false, false);
40
41
        $msg = new AMQPMessage($msg, [
42
            'delivery_mode' => 2
43
        ]);
44
45
        $channel->basic_publish($msg, '', $name);
46
        $channel->close();
47
48
        return true;
49
    }
50
51
    /**
52
     * Process a specify queue.
53
     *
54
     * @param string $queueName
55
     * @param callable $callback
56
     * @return void
57
     */
58
    public static function process(string $queueName, callable $callback): void
59
    {
60
        $queue = Di::getDefault()->get('queue');
61
        Di::getDefault()->get('log')->info('Starting Queue '.$queueName);
62
        
63
        /**
64
         * Use Swoole Coroutine  
65
         */
66
        go(function () use ($queue, $queueName, $callback) {
0 ignored issues
show
Bug introduced by
The function go was not found. Maybe you did not declare it correctly or list all dependencies? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

66
        /** @scrutinizer ignore-call */ 
67
        go(function () use ($queue, $queueName, $callback) {
Loading history...
67
            $channel = $queue->channel();
68
69
            $channel->queue_declare($queueName, false, true, false, false);
70
71
            /*
72
                queueName: Queue from where to get the messages
73
                consumer_tag: Consumer identifier
74
                no_local: Don't receive messages published by this consumer.
75
                no_ack: If set to true, automatic acknowledgement mode will be used by this consumer. See https://www.rabbitmq.com/confirms.html for details.
76
                exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
77
                nowait:
78
                callback: A PHP Callback
79
            */
80
            $channel->basic_consume($queueName, '', false, true, false, false, $callback);
81
82
            while ($channel->is_consuming()) {
83
                $channel->wait();
84
            }
85
86
            $channel->close();
87
            $queue->close();
88
        });
89
    }
90
}
91