DoormanConnector::connect()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 15
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 2.032

Importance

Changes 0
Metric Value
dl 0
loc 15
c 0
b 0
f 0
ccs 8
cts 10
cp 0.8
rs 9.4285
cc 2
eloc 8
nc 2
nop 1
crap 2.032
1
<?php
2
3
namespace AsyncPHP\Icicle\Database\Connector;
4
5
use AsyncPHP\Doorman\Manager;
6
use AsyncPHP\Doorman\Manager\ProcessManager;
7
use AsyncPHP\Icicle\Database\Connector;
8
use AsyncPHP\Remit\Client;
9
use AsyncPHP\Remit\Client\ZeroMqClient;
10
use AsyncPHP\Remit\Location\InMemoryLocation;
11
use AsyncPHP\Remit\Server;
12
use AsyncPHP\Remit\Server\ZeroMqServer;
13
use Icicle\Loop;
14
use Icicle\Promise\Deferred;
15
use Icicle\Promise\PromiseInterface;
16
use InvalidArgumentException;
17
18
final class DoormanConnector implements Connector
19
{
20
    /**
21
     * @var int
22
     */
23
    private $id = 1;
24
25
    /**
26
     * @var array
27
     */
28
    private $deferred = [];
29
30
    /**
31
     * @var Manager
32
     */
33
    private $manager;
34
35
    /**
36
     * @var Server
37
     */
38
    private $server;
39
40
    /**
41
     * @var Client
42
     */
43
    private $client;
44
45
    /**
46
     * @inheritdoc
47
     *
48
     * @param array $config
49
     *
50
     * @return PromiseInterface
51
     *
52
     * @throws InvalidArgumentException
53
     */
54 1
    public function connect(array $config)
55
    {
56 1
        $config = $this->validate($config);
57
58 1
        $this->remit($config);
59
60 1
        $this->manager = new ProcessManager();
61
62 1
        if (isset($config["log"])) {
63
            $this->manager->setLogPath($config["log"]);
64
        }
65
66 1
        $this->manager->addTask(new DoormanConnectorTask($config));
67 1
        $this->manager->tick();
68 1
    }
69
70
    /**
71
     * @param array $config
72
     *
73
     * @return array
74
     *
75
     * @throws InvalidArgumentException
76
     */
77 1
    private function validate(array $config)
78
    {
79 1
        if (!isset($config["remit"])) {
80
            throw new InvalidArgumentException("Undefined remit");
81
        }
82
83 1
        if (!isset($config["remit"]["driver"])) {
84
            throw new InvalidArgumentException("Undefined remit driver");
85
        }
86
87 1
        if (!isset($config["remit"]["server"])) {
88
            throw new InvalidArgumentException("Undefined remit server");
89
        }
90
91 1
        if (!isset($config["remit"]["client"])) {
92
            throw new InvalidArgumentException("Undefined remit client");
93
        }
94
95 1
        if ($config["remit"]["driver"] === "zeromq") {
96 1
            if (!isset($config["remit"]["server"]["port"])) {
97
                throw new InvalidArgumentException("Undefined remit server port");
98
            }
99
100 1
            if (!isset($config["remit"]["client"]["port"])) {
101
                throw new InvalidArgumentException("Undefined remit client port");
102
            }
103 1
        } else {
104
            throw new InvalidArgumentException("Unrecognised remit driver");
105
        }
106
107
        // TODO: validate connection details
108
109 1
        return $config;
110
    }
111
112
    /**
113
     * @param array $config
114
     */
115 1
    private function remit(array $config)
116
    {
117 1
        if ($config["remit"]["driver"] === "zeromq") {
118
            $config["remit"]["server"] += [
119 1
                "host" => "127.0.0.1",
120
            ];
121
122
            $config["remit"]["client"] += [
123 1
                "host" => "127.0.0.1",
124
            ];
125
126 1
            $this->server = new ZeroMqServer(
127 1
                new InMemoryLocation(
128 1
                    $config["remit"]["server"]["host"],
129 1
                    $config["remit"]["server"]["port"]
130 1
                )
131 1
            );
132
133 1
            $this->client = new ZeroMqClient(
134 1
                new InMemoryLocation(
135 1
                    $config["remit"]["client"]["host"],
136 1
                    $config["remit"]["client"]["port"]
137 1
                )
138 1
            );
139 1
        }
140
141
        $this->server->addListener("r", function ($result, $id) {
142 1
            if (isset($this->deferred[$id])) {
143 1
                $this->deferred[$id]->resolve($result);
144 1
                unset($this->deferred[$id]);
145 1
            }
146 1
        });
147
148
        $this->server->addListener("e", function ($error, $id) {
149
            if (isset($this->deferred[$id])) {
150
                $this->deferred[$id]->reject($error);
151
                unset($this->deferred[$id]);
152
            }
153 1
        });
154
155
        $this->server->addListener("dd", function () {
156
            $this->server->disconnect();
157
            $this->client->disconnect();
158 1
        });
159
160 1
        Loop\periodic(0, function () {
161 1
            $this->server->tick();
162 1
        });
163 1
    }
164
165
    /**
166
     * @inheritdoc
167
     *
168
     * @param string $query
169
     * @param array $values
170
     *
171
     * @return PromiseInterface
172
     *
173
     * @throws InvalidArgumentException
174
     */
175 1
    public function query($query, $values = [])
176
    {
177 1
        $id = $this->id++;
178
179 1
        $this->deferred["d{$id}"] = new Deferred();
180
181 1
        $this->client->emit("q", [$query, $values, "d{$id}"]);
182
183 1
        return $this->deferred["d{$id}"]->getPromise();
184
    }
185
186
    /**
187
     * @inheritdoc
188
     */
189
    public function disconnect()
190
    {
191
        $this->client->emit("d");
192
    }
193
}
194