Completed
Push — master ( cc5276...9a47a0 )
by Kamil
03:04
created

Database::query()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 22
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 22
ccs 0
cts 16
cp 0
rs 9.2
c 0
b 0
f 0
cc 1
eloc 14
nc 1
nop 2
crap 2
1
<?php
2
3
namespace Dazzle\MySQL;
4
5
use Dazzle\Event\BaseEventEmitter;
6
use Dazzle\Loop\LoopAwareTrait;
7
use Dazzle\Loop\LoopInterface;
8
use Dazzle\MySQL\Command\Command;
9
use Dazzle\MySQL\Command\CommandInterface;
10
use Dazzle\MySQL\Command\Concrete\AuthCommand;
11
use Dazzle\MySQL\Command\Concrete\PingCommand;
12
use Dazzle\MySQL\Command\Concrete\QueryCommand;
13
use Dazzle\MySQL\Command\Concrete\QuitCommand;
14
use Dazzle\MySQL\Protocol\ProtocolParser;
15
use Dazzle\Promise\Promise;
16
use Dazzle\Promise\PromiseInterface;
17
use Dazzle\Socket\Socket;
18
use Dazzle\Socket\SocketInterface;
19
use Exception;
20
use InvalidArgumentException;
21
use RuntimeException;
22
23
class Database extends BaseEventEmitter implements DatabaseInterface
24
{
25
    use LoopAwareTrait;
26
27
    const STATE_INIT                = 0;
28
    const STATE_CONNECT_PENDING     = 3;
29
    const STATE_CONNECT_FAILED      = 1;
30
    const STATE_CONNECT_SUCCEEDED   = 4;
31
    const STATE_AUTH_PENDING        = 8;
32
    const STATE_AUTH_FAILED         = 2;
33
    const STATE_AUTH_SUCCEEDED      = 5;
34
    const STATE_CLOSEING            = 6;
35
    const STATE_STOPPED             = 7;
36
37
    protected $config;
38
39
    protected $serverOptions;
40
41
    protected $executor;
42
43
    protected $state = self::STATE_INIT;
44
45
    protected $stream;
46
47
    protected $parser;
48
49
    /**
50
     * @param LoopInterface $loop
51
     * @param mixed[] $config
52
     */
53
    public function __construct(LoopInterface $loop, $config = [])
54
    {
55
        $this->loop = $loop;
56
        $this->executor = $this->createExecutor();
57
        $this->config = $this->createConfig($config);
58
    }
59
60
    /**
61
     * @override
62
     * @inheritDoc
63
     */
64
    public function isPaused()
65
    {
66
        // TODO
67
    }
68
69
    /**
70
     * @override
71
     * @inheritDoc
72
     */
73
    public function pause()
74
    {
75
        // TODO
76
    }
77
78
    /**
79
     * @override
80
     * @inheritDoc
81
     */
82
    public function resume()
83
    {
84
        // TODO
85
    }
86
87
    /**
88
     * @override
89
     * @inheritDoc
90
     */
91
    public function isStarted()
92
    {
93
        // TODO
94
    }
95
96
    /**
97
     * @override
98
     * @inheritDoc
99
     */
100
    public function start()
101
    {
102
        return new Promise(function($resolve, $reject) {
103
104
            $this->state = self::STATE_CONNECT_PENDING;
105
            $options = $this->config;
106
            $streamRef = $this->stream;
107
108
            $errorHandler = function ($reason) use ($reject) {
109
                $this->state = self::STATE_AUTH_FAILED;
110
                return $reject($reason);
111
            };
112
113
            $connectedHandler = function ($serverOptions) use ($resolve) {
114
                $this->state = self::STATE_AUTH_SUCCEEDED;
115
                $this->serverOptions = $serverOptions;
116
                return $resolve($serverOptions);
117
            };
118
119
            $this
120
                ->connect()
121
                ->then(function ($stream) use (&$streamRef, $options, $errorHandler, $connectedHandler) {
122
                    $streamRef = $stream;
123
124
                    $stream->on('error', [ $this, 'handleConnectionError' ]);
125
                    $stream->on('close', [ $this, 'handleConnectionClosed' ]);
126
127
                    $parser = $this->parser = new ProtocolParser($stream, $this->executor);
128
129
                    $parser->setOptions($options);
130
131
                    $command = $this->doCommand(new AuthCommand($this));
132
                    $command->on('authenticated', $connectedHandler);
133
                    $command->on('error', $errorHandler);
134
135
                    //$parser->on('close', $closeHandler);
0 ignored issues
show
Unused Code Comprehensibility introduced by
80% of this comment could be valid code. Did you maybe forget this after debugging?

Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.

The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.

This check looks for comments that seem to be mostly valid code and reports them.

Loading history...
136
                    $parser->start();
137
138
                }, [ $this, 'handleConnectionError' ]);
139
        });
140
    }
141
142
    /**
143
     * @override
144
     * @inheritDoc
145
     */
146
    public function stop()
147
    {
148
        return new Promise(function($resolve, $reject) {
0 ignored issues
show
Unused Code introduced by
The parameter $reject is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
149
            $this
150
                ->doCommand(new QuitCommand($this))
151
                ->on('success', function() use($resolve) {
152
                    $this->state = self::STATE_STOPPED;
153
                    $this->emit('end', [ $this ]);
154
                    $this->emit('close', [ $this ]);
155
                    $resolve($this);
156
                });
157
            $this->state = self::STATE_CLOSEING;
158
        });
159
    }
160
161
    /**
162
     * Do a async query.
163
     *
164
     * @param string $sql
165
     * @param mixed[] $sqlParams
166
     * @return PromiseInterface
167
     */
168
    public function query($sql, $sqlParams = [])
169
    {
170
        $promise = new Promise();
171
        $query   = new Query($sql);
172
        $command = new QueryCommand($this);
173
        $command->setQuery($query);
174
175
        $query->bindParamsFromArray($sqlParams);
176
        $this->doCommand($command);
177
178
        $command->on('results', function ($rows, $command) use ($promise) {
179
            return $promise->resolve($command);
180
        });
181
        $command->on('error', function ($err, $command) use ($promise) {
0 ignored issues
show
Unused Code introduced by
The parameter $command is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
182
            return $promise->reject($err);
183
        });
184
        $command->on('success', function ($command) use ($promise) {
185
            return $promise->resolve($command);
186
        });
187
188
        return $promise;
189
    }
190
191
    public function ping()
192
    {
193
        $promise = new Promise();
194
195
        $this->doCommand(new PingCommand($this))
196
            ->on('error', function ($reason) use ($promise) {
197
                return $promise->reject($reason);
198
            })
199
            ->on('success', function () use ($promise) {
200
                return $promise->resolve();
201
            });
202
    }
203
204
    public function selectDB($dbname)
205
    {
206
        return $this->query(sprintf('USE `%s`', $dbname));
207
    }
208
209
    public function setOption($name, $value)
210
    {
211
        $this->config[$name] = $value;
212
213
        return $this;
214
    }
215
216
    public function getOption($name, $default = null)
217
    {
218
        if (isset($this->config[$name]))
219
        {
220
            return $this->config[$name];
221
        }
222
223
        return $default;
224
    }
225
226
    public function getState()
227
    {
228
        return $this->state;
229
    }
230
231
    public function handleConnectionError($err)
232
    {
233
        $this->emit('error', [ $err, $this ]);
234
    }
235
236
    public function handleConnectionClosed()
237
    {
238
        if ($this->state < self::STATE_CLOSEING)
239
        {
240
            $this->state = self::STATE_STOPPED;
241
            $this->emit('error', [ new RuntimeException('mysql server has gone away'), $this ]);
242
        }
243
    }
244
245
    protected function doCommand(CommandInterface $command)
246
    {
247
        if ($command->equals(Command::INIT_AUTHENTICATE))
248
        {
249
            return $this->executor->undequeue($command);
250
        }
251
        elseif ($this->state >= self::STATE_CONNECT_PENDING && $this->state <= self::STATE_AUTH_SUCCEEDED)
252
        {
253
            return $this->executor->enqueue($command);
254
        }
255
        else
256
        {
257
            throw new Exception("Cann't send command");
258
        }
259
    }
260
261
    public function getServerOptions()
262
    {
263
        return $this->serverOptions;
264
    }
265
266
    protected function connect()
267
    {
268
        $socket = new Socket($this->config['endpoint'], $this->getLoop());
269
        return Promise::doResolve($socket);
270
    }
271
272
    /**
273
     * Create executor.
274
     *
275
     * @return Executor
276
     */
277
    protected function createExecutor()
278
    {
279
        return new Executor($this);
280
    }
281
282
    /**
283
     * Create configuration file.
284
     *
285
     * @param mixed[] $config
286
     * @return mixed[]
287
     */
288
    protected function createConfig($config = [])
289
    {
290
        $default = [
291
            'endpoint' => 'tcp://127.0.0.1:3306',
292
            'user'     => 'root',
293
            'pass'     => '',
294
            'dbname'   => '',
295
        ];
296
        return array_merge($default, $config);
297
    }
298
}
299