Passed
Push — master ( c65021...b58a23 )
by Murilo
02:50
created

RabbitReceiver::consume()   A

Complexity

Conditions 5
Paths 5

Size

Total Lines 25
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 14
dl 0
loc 25
rs 9.4888
c 0
b 0
f 0
cc 5
nc 5
nop 0
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Source\Infra\Queue\RabbitMQ;
6
7
use PhpAmqpLib\Wire\AMQPWriter;
0 ignored issues
show
Bug introduced by
The type PhpAmqpLib\Wire\AMQPWriter was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
8
9
/**
10
 * The rabbit receiver will be listening infinitefor messages in your queue
11
 * Handler method can be overwrited by main class for handle the received messages
12
 *
13
 * @version 0.1.0
14
 * @author Murilo Chianfa <github.com/MuriloChianfa>
15
 * @package Source\Infra\Queue\RabbitMQ\RabbitMQ
16
 */
17
abstract class RabbitReceiver extends RabbitMQ
18
{
19
    /**
20
     * The handler of the messages received from queue
21
     *
22
     * @param mixed $message Received from rabbitmq queue
23
     * @return void
24
     */
25
    protected function handler($message)
26
    {
27
        // Simple print on getted message from the queue
28
        print_r($message->body);
29
    }
30
31
    /**
32
     * Infinite loop for listen the queue messages
33
     *
34
     * @return void
35
     */
36
    protected function consume()
37
    {
38
        $callback = function($message) { $this->handler($message); };
39
        $this->channel->basic_consume($this->getQueue(), 'consumer', false, $this->getReply(), false, false, $callback);
40
        
41
        $pid = pcntl_fork();
42
43
        if ($pid == -1) {
44
            echo 'annot fork consume proccess...';
45
            return;
46
        }
47
48
        if ($pid) {
49
            while (true) {
50
                $this->send_heartbeat();
51
                sleep(10);
52
            }
53
        } else {
54
            while (count($this->channel->callbacks))
55
            {
56
                $this->channel->wait();
57
            }
58
        }
59
60
        $this->closeConnection();
61
    }
62
63
    /**
64
     * work on PhpAmqpLib 2.9.2
65
     */
66
    private function send_heartbeat()
67
    {
68
        $packet = new AMQPWriter();
69
        $packet->write_octet(8);
70
        $packet->write_short(0);
71
        $packet->write_long(0);
72
        $packet->write_octet(0xCE);
73
74
        $this->connection->getIO()->write($packet->getvalue());
75
    }
76
}
77