Completed
Push — master ( 96e878...616a56 )
by Christopher
02:07
created

DoormanConnector::connect()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 14
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 2.032

Importance

Changes 3
Bugs 0 Features 0
Metric Value
c 3
b 0
f 0
dl 0
loc 14
ccs 8
cts 10
cp 0.8
rs 9.4286
cc 2
eloc 8
nc 2
nop 1
crap 2.032
1
<?php
2
3
namespace AsyncPHP\Icicle\Database\Connector;
4
5
use AsyncPHP\Doorman\Manager;
6
use AsyncPHP\Doorman\Manager\ProcessManager;
7
use AsyncPHP\Icicle\Database\Connector;
8
use AsyncPHP\Remit\Client;
9
use AsyncPHP\Remit\Client\ZeroMqClient;
10
use AsyncPHP\Remit\Location\InMemoryLocation;
11
use AsyncPHP\Remit\Server;
12
use AsyncPHP\Remit\Server\ZeroMqServer;
13
use Icicle\Loop;
14
use Icicle\Promise\Deferred;
15
use Icicle\Promise\PromiseInterface;
16
use InvalidArgumentException;
17
18
final class DoormanConnector implements Connector
19
{
20
    /**
21
     * @var int
22
     */
23
    private $id = 1;
24
25
    /**
26
     * @var array
27
     */
28
    private $deferred = [];
29
30
    /**
31
     * @var Manager
32
     */
33
    private $manager;
34
35
    /**
36
     * @var Server
37
     */
38
    private $server;
39
40
    /**
41
     * @var Client
42
     */
43
    private $client;
44
45
    /**
46
     * @inheritdoc
47
     *
48
     * @param array $config
49
     *
50
     * @return PromiseInterface
51
     *
52
     * @throws InvalidArgumentException
53
     */
54 1
    public function connect(array $config)
55
    {
56 1
        $this->validate($config);
57 1
        $this->remit($config);
58
59 1
        $this->manager = new ProcessManager();
60
61 1
        if (isset($config["log"])) {
62
            $this->manager->setLogPath($config["log"]);
63
        }
64
65 1
        $this->manager->addTask(new DoormanConnectorTask($config));
66 1
        $this->manager->tick();
67 1
    }
68
69
    /**
70
     * @param array $config
71
     *
72
     * @throws InvalidArgumentException
73
     */
74 1
    private function validate(array $config)
75
    {
76 1
        if (!isset($config["remit"])) {
77
            throw new InvalidArgumentException("Undefined remit");
78
        }
79
80 1
        if (!isset($config["remit"]["driver"])) {
81
            throw new InvalidArgumentException("Undefined remit driver");
82
        }
83
84 1
        if (!isset($config["remit"]["server"])) {
85
            throw new InvalidArgumentException("Undefined remit server");
86
        }
87
88 1
        if (!isset($config["remit"]["client"])) {
89
            throw new InvalidArgumentException("Undefined remit client");
90
        }
91
92 1
        if ($config["remit"]["driver"] === "zeromq") {
93 1
            if (!isset($config["remit"]["server"]["port"])) {
94
                throw new InvalidArgumentException("Undefined remit server port");
95
            }
96
97 1
            if (!isset($config["remit"]["client"]["port"])) {
98
                throw new InvalidArgumentException("Undefined remit client port");
99
            }
100 1
        } else {
101
            throw new InvalidArgumentException("Unrecognised remit driver");
102
        }
103 1
    }
104
105
    /**
106
     * @param array $config
107
     */
108 1
    private function remit(array $config)
109
    {
110 1
        if ($config["remit"]["driver"] === "zeromq") {
111
            $config["remit"]["server"] += [
112 1
                "host" => "127.0.0.1",
113
            ];
114
115
            $config["remit"]["client"] += [
116 1
                "host" => "127.0.0.1",
117
            ];
118
119 1
            $this->server = new ZeroMqServer(
120 1
                new InMemoryLocation(
121 1
                    $config["remit"]["server"]["host"],
122 1
                    $config["remit"]["server"]["port"]
123 1
                )
124 1
            );
125
126 1
            $this->client = new ZeroMqClient(
127 1
                new InMemoryLocation(
128 1
                    $config["remit"]["client"]["host"],
129 1
                    $config["remit"]["client"]["port"]
130 1
                )
131 1
            );
132 1
        }
133
134
        $this->server->addListener("r", function ($result, $id) {
135 1
            if (isset($this->deferred[$id])) {
136 1
                $this->deferred[$id]->resolve($result);
137 1
                unset($this->deferred[$id]);
138 1
            }
139 1
        });
140
141
        $this->server->addListener("e", function ($error, $id) {
142
            if (isset($this->deferred[$id])) {
143
                $this->deferred[$id]->reject($error);
144
                unset($this->deferred[$id]);
145
            }
146 1
        });
147
148 1
        Loop\periodic(0, function () {
149 1
            $this->server->tick();
150 1
        });
151 1
    }
152
153
    /**
154
     * @inheritdoc
155
     *
156
     * @param string $query
157
     * @param array $values
158
     *
159
     * @return PromiseInterface
160
     *
161
     * @throws InvalidArgumentException
162
     */
163 1
    public function query($query, $values)
164
    {
165 1
        $id = $this->id++;
166
167 1
        $this->deferred["d{$id}"] = new Deferred();
168
169 1
        $this->client->emit("q", [$query, $values, "d{$id}"]);
170
171 1
        return $this->deferred["d{$id}"]->getPromise();
172
    }
173
}
174