Passed
Push — master ( 055ccf...06580a )
by Ondřej
01:58
created

IPCControl::waitForNotification()   A

Complexity

Conditions 5
Paths 5

Size

Total Lines 26
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 19
nc 5
nop 1
dl 0
loc 26
rs 9.3222
c 0
b 0
f 0
1
<?php
2
declare(strict_types=1);
3
namespace Ivory\Connection;
4
5
use Ivory\Exception\ConnectionException;
6
7
class IPCControl implements IIPCControl
8
{
9
    private $connCtl;
10
    private $stmtExec;
11
12
13
    public function __construct(ConnectionControl $connCtl, IStatementExecution $stmtExec)
14
    {
15
        $this->connCtl = $connCtl;
16
        $this->stmtExec = $stmtExec;
17
    }
18
19
20
    public function getBackendPID(): int
21
    {
22
        $handler = $this->connCtl->requireConnection();
23
        return pg_get_pid($handler);
24
    }
25
26
    public function notify(string $channel, ?string $payload = null): void
27
    {
28
        if ($payload === null) {
29
            $this->stmtExec->command('NOTIFY %ident', $channel);
30
        } else {
31
            $this->stmtExec->command('NOTIFY %ident, %s', $channel, $payload);
32
        }
33
    }
34
35
    public function listen(string $channel): void
36
    {
37
        $this->stmtExec->command('LISTEN %ident', $channel);
38
    }
39
40
    public function unlisten(string $channel): void
41
    {
42
        $this->stmtExec->command('UNLISTEN %ident', $channel);
43
    }
44
45
    public function unlistenAll(): void
46
    {
47
        $this->stmtExec->rawCommand('UNLISTEN *');
48
    }
49
50
    public function pollNotification(): ?Notification
51
    {
52
        $handler = $this->connCtl->requireConnection();
53
        $res = pg_get_notify($handler, PGSQL_ASSOC);
54
        if ($res === false) {
55
            return null;
56
        }
57
        $payload = ($res['payload'] !== '' ? $res['payload'] : null);
58
        return new Notification($res['message'], $res['pid'], $payload);
59
    }
60
61
    public function waitForNotification(int $millisecondTimeout): ?Notification
62
    {
63
        $notification = $this->pollNotification();
64
        if ($notification !== null) {
65
            return $notification;
66
        }
67
68
        $handler = $this->connCtl->requireConnection();
69
        $socket = pg_socket($handler);
70
        if (!$socket) {
0 ignored issues
show
introduced by
$socket is of type resource, thus it always evaluated to false.
Loading history...
71
            throw new ConnectionException('Error retrieving the connection socket while trying to wait for notifications');
72
        }
73
74
        $timeoutSec = (int)($millisecondTimeout / 1000);
75
        $timeoutMicrosec = ($millisecondTimeout % 1000) * 1000;
76
77
        $r = [$socket];
78
        $w = [];
79
        $ex = [];
80
        $selected = stream_select($r, $w, $ex, $timeoutSec, $timeoutMicrosec);
81
        if ($selected > 0) {
82
            return $this->pollNotification();
83
        } elseif ($selected === 0) {
84
            return null;
85
        } else {
86
            throw new ConnectionException('Error selecting the stream while waiting for notifications');
87
        }
88
    }
89
}
90