Passed
Push — master ( 6765d2...f88508 )
by Murilo
01:30
created

RabbitReceiver::sendHeartbeat()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 8
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 6
c 0
b 0
f 0
dl 0
loc 8
rs 10
cc 1
nc 1
nop 0
1
<?php
2
3
namespace Source\Core\Rabbit;
4
5
use PhpAmqpLib\Wire\AMQPWriter;
0 ignored issues
show
Bug introduced by
The type PhpAmqpLib\Wire\AMQPWriter was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
6
7
/**
8
 * The rabbit receiver will be listening infinitefor messages in your queue
9
 *
10
 * the handler method can be overwrited by main class for handle the received messages
11
 */
12
abstract class RabbitReceiver extends Rabbit // implements Interface
13
{
14
    /**
15
     * The handler of the messages received from queue
16
     * @var mixed $message Received from rabbitmq queue
17
     */
18
    protected function handler($message)
19
    {
20
        // Simple print on getted message from the queue
21
        print_r($message->body);
22
    }
23
24
    /**
25
     * Infinite loop for listen the queue messages
26
     */
27
    protected function consume()
28
    {
29
        $callback = function ($message) {
30
            $this->handler($message);
31
        };
32
        $this->channel->basic_consume($this->getQueue(), 'consumer', false, $this->getReply(), false, false, $callback);
33
34
        $pid = pcntl_fork();
35
36
        if ($pid == -1) {
37
            echo "can i help u ?";
38
        } else if ($pid) {
39
            while (true) {
40
                $this->sendHeartbeat();
41
                sleep(10);
42
            }
43
        } else {
44
            while (count($this->channel->callbacks)) {
45
                $this->channel->wait();
46
            }
47
        }
48
49
        $this->closeConnection();
50
    }
51
52
    /**
53
     * work on PhpAmqpLib 2.9.2
54
     */
55
    private function sendHeartbeat()
56
    {
57
        $packet = new AMQPWriter();
58
        $packet->write_octet(8);
59
        $packet->write_short(0);
60
        $packet->write_long(0);
61
        $packet->write_octet(0xCE);
62
        $this->connection->getIO()->write($packet->getvalue());
63
    }
64
}
65