Completed
Push — master ( b5fdaf...96e878 )
by Christopher
02:15
created

DoormanConnector::connect()   B

Complexity

Conditions 4
Paths 2

Size

Total Lines 32
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 17
CRAP Score 4.2841

Importance

Changes 2
Bugs 0 Features 0
Metric Value
c 2
b 0
f 0
dl 0
loc 32
ccs 17
cts 23
cp 0.7391
rs 8.5806
cc 4
eloc 18
nc 2
nop 1
crap 4.2841
1
<?php
2
3
namespace AsyncPHP\Icicle\Database\Connector;
4
5
use AsyncPHP\Doorman\Manager;
6
use AsyncPHP\Doorman\Manager\ProcessManager;
7
use AsyncPHP\Doorman\Task\ProcessCallbackTask;
8
use AsyncPHP\Icicle\Database\Connector;
9
use AsyncPHP\Remit\Client;
10
use AsyncPHP\Remit\Client\ZeroMqClient;
11
use AsyncPHP\Remit\Location\InMemoryLocation;
12
use AsyncPHP\Remit\Server;
13
use AsyncPHP\Remit\Server\ZeroMqServer;
14
use Aura\Sql\ExtendedPdo;
15
use Icicle\Loop;
16
use Icicle\Promise\Deferred;
17
use Icicle\Promise\PromiseInterface;
18
use InvalidArgumentException;
19
use PDO;
20
21
final class DoormanConnector implements Connector
22
{
23
    /**
24
     * @var int
25
     */
26
    private $id = 1;
27
28
    /**
29
     * @var array
30
     */
31
    private $deferred = [];
32
33
    /**
34
     * @var Manager
35
     */
36
    private $manager;
37
38
    /**
39
     * @var Server
40
     */
41
    private $server;
42
43
    /**
44
     * @var Client
45
     */
46
    private $client;
47
48
    /**
49
     * @inheritdoc
50
     *
51
     * @param array $config
52
     *
53
     * @return PromiseInterface
54
     *
55
     * @throws InvalidArgumentException
56
     */
57 1
    public function connect(array $config)
58
    {
59 1
        $this->manager = new ProcessManager();
60
61 1
        if (isset($config["log"])) {
62
            $this->manager->setLogPath($config["log"]);
63
        }
64
65 1
        $this->validate($config);
66 1
        $this->connectRemit($config);
67
68
        $this->server->addListener("r", function ($result, $id) {
69 1
            if (isset($this->deferred[$id])) {
70 1
                $this->deferred[$id]->resolve($result);
71 1
                unset($this->deferred[$id]);
72 1
            }
73 1
        });
74
75
        $this->server->addListener("e", function ($error, $id) {
76
            if (isset($this->deferred[$id])) {
77
                $this->deferred[$id]->reject($error);
78
                unset($this->deferred[$id]);
79
            }
80 1
        });
81
82 1
        Loop\periodic(0, function () {
83 1
            $this->server->tick();
84 1
        });
85
86 1
        $this->manager->addTask(new DoormanConnectorTask($config));
87 1
        $this->manager->tick();
88 1
    }
89
90
    /**
91
     * @param array $config
92
     *
93
     * @throws InvalidArgumentException
94
     */
95 1
    private function validate(array $config)
96
    {
97 1
        if (!isset($config["remit"])) {
98
            throw new InvalidArgumentException("Undefined remit");
99
        }
100
101 1
        if (!isset($config["remit"]["driver"])) {
102
            throw new InvalidArgumentException("Undefined remit driver");
103
        }
104
105 1
        if (!isset($config["remit"]["server"])) {
106
            throw new InvalidArgumentException("Undefined remit server");
107
        }
108
109 1
        if (!isset($config["remit"]["client"])) {
110
            throw new InvalidArgumentException("Undefined remit client");
111
        }
112
113 1
        if ($config["remit"]["driver"] === "zeromq") {
114 1
            if (!isset($config["remit"]["server"]["port"])) {
115
                throw new InvalidArgumentException("Undefined remit server port");
116
            }
117
118 1
            if (!isset($config["remit"]["client"]["port"])) {
119
                throw new InvalidArgumentException("Undefined remit client port");
120
            }
121 1
        } else {
122
            throw new InvalidArgumentException("Unrecognised remit driver");
123
        }
124 1
    }
125
126
    /**
127
     * @param array $config
128
     */
129 1
    private function connectRemit(array $config)
130
    {
131 1
        $server = $config["remit"]["server"];
132 1
        $client = $config["remit"]["client"];
133
134 1
        if ($config["remit"]["driver"] === "zeromq") {
135 1
            $server = array_merge([
136 1
                "host" => "127.0.0.1",
137 1
            ], $server);
138
139 1
            $this->server = new ZeroMqServer(
140 1
                new InMemoryLocation(
141 1
                    $server["host"],
142 1
                    $server["port"]
143 1
                )
144 1
            );
145
146 1
            $client = array_merge([
147 1
                "host" => "127.0.0.1",
148 1
            ], $client);
149
150 1
            $this->client = new ZeroMqClient(
151 1
                new InMemoryLocation(
152 1
                    $client["host"],
153 1
                    $client["port"]
154 1
                )
155 1
            );
156 1
        }
157 1
    }
158
159
    /**
160
     * @inheritdoc
161
     *
162
     * @param string $query
163
     * @param array $values
164
     *
165
     * @return PromiseInterface
166
     *
167
     * @throws InvalidArgumentException
168
     */
169 1
    public function query($query, $values)
170
    {
171 1
        $id = $this->id++;
172
173 1
        $deferred = new Deferred();
174
175 1
        $this->client->emit("q", [$query, $values, "d{$id}"]);
176
177 1
        $this->deferred["d{$id}"] = $deferred;
178
179 1
        return $deferred->getPromise();
180
    }
181
}
182