Completed
Push — master ( 365f54...bdc15b )
by Christopher
10:19 queued 08:11
created

DoormanConnector::query()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 12
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 1

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 12
ccs 6
cts 6
cp 1
rs 9.4286
cc 1
eloc 6
nc 1
nop 2
crap 1
1
<?php
2
3
namespace AsyncPHP\Icicle\Database\Connector;
4
5
use AsyncPHP\Doorman\Manager;
6
use AsyncPHP\Doorman\Manager\ProcessManager;
7
use AsyncPHP\Doorman\Task\ProcessCallbackTask;
8
use AsyncPHP\Icicle\Database\Connector;
9
use AsyncPHP\Remit\Client;
10
use AsyncPHP\Remit\Client\ZeroMqClient;
11
use AsyncPHP\Remit\Location\InMemoryLocation;
12
use AsyncPHP\Remit\Server;
13
use AsyncPHP\Remit\Server\ZeroMqServer;
14
use Aura\Sql\ExtendedPdo;
15
use Icicle\Loop;
16
use Icicle\Promise\Deferred;
17
use Icicle\Promise\PromiseInterface;
18
use InvalidArgumentException;
19
use PDO;
20
21
final class DoormanConnector implements Connector
22
{
23
    /**
24
     * @var int
25
     */
26
    private $id = 1;
27
28
    /**
29
     * @var array
30
     */
31
    private $deferred = [];
32
33
    /**
34
     * @var Manager
35
     */
36
    private $manager;
37
38
    /**
39
     * @var Server
40
     */
41
    private $server;
42
43
    /**
44
     * @var Client
45
     */
46
    private $client;
47
48
    /**
49
     * @inheritdoc
50
     *
51
     * @param array $config
52
     *
53
     * @return PromiseInterface
54
     *
55
     * @throws InvalidArgumentException
56
     */
57 1
    public function connect(array $config)
58
    {
59 1
        $this->manager = new ProcessManager();
60
61 1
        $this->validate($config);
62 1
        $this->connectRemit($config);
63 1
        $this->connectDoorman($config);
64
65
        $this->server->addListener("r", function ($result, $id) {
66 1
            if (isset($this->deferred[$id])) {
67 1
                $this->deferred[$id]->resolve($result);
68 1
                unset($this->deferred[$id]);
69 1
            }
70 1
        });
71
72
        $this->server->addListener("e", function ($error, $id) {
73
            if (isset($this->deferred[$id])) {
74
                $this->deferred[$id]->reject($error);
75
                unset($this->deferred[$id]);
76
            }
77 1
        });
78
79
        Loop\periodic(0, function () {
80 1
            $this->server->tick();
81 1
        });
82
83 1
        $this->manager->tick();
84 1
    }
85
86
    /**
87
     * @param array $config
88
     *
89
     * @throws InvalidArgumentException
90
     */
91 1
    private function validate(array $config)
92
    {
93 1
        if (!isset($config["remit"])) {
94
            throw new InvalidArgumentException("Undefined remit");
95
        }
96
97 1
        if (!isset($config["remit"]["driver"])) {
98
            throw new InvalidArgumentException("Undefined remit driver");
99
        }
100
101 1
        if (!isset($config["remit"]["server"])) {
102
            throw new InvalidArgumentException("Undefined remit server");
103
        }
104
105 1
        if (!isset($config["remit"]["client"])) {
106
            throw new InvalidArgumentException("Undefined remit client");
107
        }
108
109 1
        if ($config["remit"]["driver"] === "zeromq") {
110 1
            if (!isset($config["remit"]["server"]["port"])) {
111
                throw new InvalidArgumentException("Undefined remit server port");
112
            }
113
114 1
            if (!isset($config["remit"]["client"]["port"])) {
115
                throw new InvalidArgumentException("Undefined remit client port");
116
            }
117 1
        } else {
118
            throw new InvalidArgumentException("Unrecognised remit driver");
119
        }
120 1
    }
121
122
    /**
123
     * @param array $config
124
     */
125 1
    private function connectRemit(array $config)
126
    {
127 1
        $server = $config["remit"]["server"];
128 1
        $client = $config["remit"]["client"];
129
130 1
        if ($config["remit"]["driver"] === "zeromq") {
131 1
            $server = array_merge([
132 1
                "host" => "127.0.0.1",
133 1
            ], $server);
134
135 1
            $this->server = new ZeroMqServer(
136 1
                new InMemoryLocation(
137 1
                    $server["host"],
138 1
                    $server["port"]
139 1
                )
140 1
            );
141
142 1
            $client = array_merge([
143 1
                "host" => "127.0.0.1",
144 1
            ], $client);
145
146 1
            $this->client = new ZeroMqClient(
147 1
                new InMemoryLocation(
148 1
                    $client["host"],
149 1
                    $client["port"]
150 1
                )
151 1
            );
152 1
        }
153 1
    }
154
155
    /**
156
     * @param array $config
157
     */
158 1
    private function connectDoorman(array $config)
159
    {
160 1
        $server = $config["remit"]["server"];
161 1
        $client = $config["remit"]["client"];
162
163 1
        if ($config["remit"]["driver"] === "zeromq") {
164
            $task = new ProcessCallbackTask(function () use ($config, $server, $client) {
165
                $config = array_merge([
0 ignored issues
show
Bug introduced by
Consider using a different name than the imported variable $config, or did you forget to import by reference?

It seems like you are assigning to a variable which was imported through a use statement which was not imported by reference.

For clarity, we suggest to use a different name or import by reference depending on whether you would like to have the change visibile in outer-scope.

Change not visible in outer-scope

$x = 1;
$callable = function() use ($x) {
    $x = 2; // Not visible in outer scope. If you would like this, how
            // about using a different variable name than $x?
};

$callable();
var_dump($x); // integer(1)

Change visible in outer-scope

$x = 1;
$callable = function() use (&$x) {
    $x = 2;
};

$callable();
var_dump($x); // integer(2)
Loading history...
166
                    "host" => "127.0.0.1",
167
                    "port" => 3306,
168
                ], $config);
169
170
                $server = array_merge([
0 ignored issues
show
Bug introduced by
Consider using a different name than the imported variable $server, or did you forget to import by reference?

It seems like you are assigning to a variable which was imported through a use statement which was not imported by reference.

For clarity, we suggest to use a different name or import by reference depending on whether you would like to have the change visibile in outer-scope.

Change not visible in outer-scope

$x = 1;
$callable = function() use ($x) {
    $x = 2; // Not visible in outer scope. If you would like this, how
            // about using a different variable name than $x?
};

$callable();
var_dump($x); // integer(1)

Change visible in outer-scope

$x = 1;
$callable = function() use (&$x) {
    $x = 2;
};

$callable();
var_dump($x); // integer(2)
Loading history...
171
                    "host" => "127.0.0.1",
172
                ], $server);
173
174
                $client = array_merge([
0 ignored issues
show
Bug introduced by
Consider using a different name than the imported variable $client, or did you forget to import by reference?

It seems like you are assigning to a variable which was imported through a use statement which was not imported by reference.

For clarity, we suggest to use a different name or import by reference depending on whether you would like to have the change visibile in outer-scope.

Change not visible in outer-scope

$x = 1;
$callable = function() use ($x) {
    $x = 2; // Not visible in outer scope. If you would like this, how
            // about using a different variable name than $x?
};

$callable();
var_dump($x); // integer(1)

Change visible in outer-scope

$x = 1;
$callable = function() use (&$x) {
    $x = 2;
};

$callable();
var_dump($x); // integer(2)
Loading history...
175
                    "host" => "127.0.0.1",
176
                ], $client);
177
178
                $remitServer = new ZeroMqServer(
179
                    new InMemoryLocation(
180
                        $client["host"],
181
                        $client["port"]
182
                    )
183
                );
184
185
                $remitClient = new ZeroMqClient(
186
                    new InMemoryLocation(
187
                        $server["host"],
188
                        $server["port"]
189
                    )
190
                );
191
192
                $database = $config["database"];
193
                $host = $config["host"];
194
                $port = $config["port"];
195
                $username = $config["username"];
196
                $password = $config["password"];
197
198
                $connection = new ExtendedPdo(
199
                    new PDO(
200
                        "mysql:dbname={$database};host={$host};port={$port}",
201
                        $username, $password
202
                    )
203
                );
204
205
                $remitServer->addListener("q", function ($query, $values, $id) use ($remitClient, $connection) {
206
                    $remitClient->emit("r", [$connection->fetchAll($query, $values), $id]);
207
                });
208
209
                Loop\periodic(0, function () use ($remitServer) {
210
                    $remitServer->tick();
211
                });
212
213
                Loop\run();
214 1
            });
215
216 1
            $this->manager->addTask($task);
217 1
        }
218 1
    }
219
220
    /**
221
     * @inheritdoc
222
     *
223
     * @param string $query
224
     * @param array $values
225
     *
226
     * @return PromiseInterface
227
     *
228
     * @throws InvalidArgumentException
229
     */
230 1
    public function query($query, $values)
231
    {
232 1
        $id = $this->id++;
233
234 1
        $deferred = new Deferred();
235
236 1
        $this->client->emit("q", [$query, $values, "d{$id}"]);
237
238 1
        $this->deferred["d{$id}"] = $deferred;
239
240 1
        return $deferred->getPromise();
241
    }
242
}
243