Completed
Push — master ( 9a47a0...be1187 )
by Kamil
02:42
created

Database::query()   B

Complexity

Conditions 3
Paths 1

Size

Total Lines 26
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 12

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 26
ccs 0
cts 20
cp 0
rs 8.8571
cc 3
eloc 18
nc 1
nop 2
crap 12
1
<?php
2
3
namespace Dazzle\MySQL;
4
5
use Dazzle\Event\BaseEventEmitter;
6
use Dazzle\Loop\LoopAwareTrait;
7
use Dazzle\Loop\LoopInterface;
8
use Dazzle\MySQL\Command\Command;
9
use Dazzle\MySQL\Command\CommandInterface;
10
use Dazzle\MySQL\Command\Concrete\AuthCommand;
11
use Dazzle\MySQL\Command\Concrete\PingCommand;
12
use Dazzle\MySQL\Command\Concrete\QueryCommand;
13
use Dazzle\MySQL\Command\Concrete\QuitCommand;
14
use Dazzle\MySQL\Protocol\ProtocolParser;
15
use Dazzle\Promise\Promise;
16
use Dazzle\Promise\PromiseInterface;
17
use Dazzle\Socket\Socket;
18
use Dazzle\Socket\SocketInterface;
19
use Exception;
20
use InvalidArgumentException;
21
use RuntimeException;
22
23
class Database extends BaseEventEmitter implements DatabaseInterface
24
{
25
    use LoopAwareTrait;
26
27
    const STATE_INIT                = 0;
28
    const STATE_CONNECT_PENDING     = 3;
29
    const STATE_CONNECT_FAILED      = 1;
30
    const STATE_CONNECT_SUCCEEDED   = 4;
31
    const STATE_AUTH_PENDING        = 8;
32
    const STATE_AUTH_FAILED         = 2;
33
    const STATE_AUTH_SUCCEEDED      = 5;
34
    const STATE_CLOSEING            = 6;
35
    const STATE_STOPPED             = 7;
36
37
    protected $config;
38
39
    protected $serverOptions;
40
41
    protected $executor;
42
43
    protected $state = self::STATE_INIT;
44
45
    protected $stream;
46
47
    protected $parser;
48
49
    /**
50
     * @param LoopInterface $loop
51
     * @param mixed[] $config
52
     */
53
    public function __construct(LoopInterface $loop, $config = [])
54
    {
55
        $this->loop = $loop;
56
        $this->executor = $this->createExecutor();
57
        $this->config = $this->createConfig($config);
58
    }
59
60
    /**
61
     * @override
62
     * @inheritDoc
63
     */
64
    public function isPaused()
65
    {
66
        // TODO
67
    }
68
69
    /**
70
     * @override
71
     * @inheritDoc
72
     */
73
    public function pause()
74
    {
75
        // TODO
76
    }
77
78
    /**
79
     * @override
80
     * @inheritDoc
81
     */
82
    public function resume()
83
    {
84
        // TODO
85
    }
86
87
    /**
88
     * @override
89
     * @inheritDoc
90
     */
91
    public function isStarted()
92
    {
93
        // TODO
94
    }
95
96
    /**
97
     * @override
98
     * @inheritDoc
99
     */
100
    public function start()
101
    {
102
        return new Promise(function($resolve, $reject) {
103
104
            $this->state = self::STATE_CONNECT_PENDING;
105
            $options = $this->config;
106
            $streamRef = $this->stream;
107
108
            $errorHandler = function ($reason) use ($reject) {
109
                $this->state = self::STATE_AUTH_FAILED;
110
                return $reject($reason);
111
            };
112
113
            $connectedHandler = function ($serverOptions) use ($resolve) {
114
                $this->state = self::STATE_AUTH_SUCCEEDED;
115
                $this->serverOptions = $serverOptions;
116
                return $resolve($serverOptions);
117
            };
118
119
            $this
120
                ->connect()
121
                ->then(function ($stream) use (&$streamRef, $options, $errorHandler, $connectedHandler) {
122
                    $streamRef = $stream;
123
124
                    $stream->on('error', [ $this, 'handleConnectionError' ]);
125
                    $stream->on('close', [ $this, 'handleConnectionClosed' ]);
126
127
                    $parser = $this->parser = new ProtocolParser($stream, $this->executor);
128
129
                    $parser->setOptions($options);
130
131
                    $command = $this->doCommand(new AuthCommand($this));
132
                    $command->on('authenticated', $connectedHandler);
133
                    $command->on('error', $errorHandler);
134
135
                    //$parser->on('close', $closeHandler);
0 ignored issues
show
Unused Code Comprehensibility introduced by
80% of this comment could be valid code. Did you maybe forget this after debugging?

Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.

The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.

This check looks for comments that seem to be mostly valid code and reports them.

Loading history...
136
                    $parser->start();
137
138
                }, [ $this, 'handleConnectionError' ]);
139
        });
140
    }
141
142
    /**
143
     * @override
144
     * @inheritDoc
145
     */
146
    public function stop()
147
    {
148
        return new Promise(function($resolve, $reject) {
0 ignored issues
show
Unused Code introduced by
The parameter $reject is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
149
            $this
150
                ->doCommand(new QuitCommand($this))
151
                ->on('success', function() use($resolve) {
152
                    $this->state = self::STATE_STOPPED;
153
                    $this->emit('end', [ $this ]);
154
                    $this->emit('close', [ $this ]);
155
                    $resolve($this);
156
                });
157
            $this->state = self::STATE_CLOSEING;
158
        });
159
    }
160
161
    /**
162
     * Do a async query.
163
     *
164
     * @param string $sql
165
     * @param mixed[] $sqlParams
166
     * @return PromiseInterface
167
     */
168
    public function query($sql, $sqlParams = [])
169
    {
170
        $promise = new Promise();
171
        $query   = new Query($sql);
172
        $command = new QueryCommand($this);
173
        $command->setQuery($query);
174
175
        $query->bindParamsFromArray($sqlParams);
176
        $this->doCommand($command);
177
178
        $command->on('results', function ($rows, $command) use ($promise) {
179
            return $command->hasError()
180
                ? $promise->reject($command->getError())
181
                : $promise->resolve($command);
182
        });
183
        $command->on('error', function ($err, $command) use ($promise) {
0 ignored issues
show
Unused Code introduced by
The parameter $command is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
184
            return $promise->reject($err);
185
        });
186
        $command->on('success', function ($command) use ($promise) {
187
            return $command->hasError()
188
                ? $promise->reject($command->getError())
189
                : $promise->resolve($command);
190
        });
191
192
        return $promise;
193
    }
194
195
    public function ping()
196
    {
197
        $promise = new Promise();
198
199
        $this->doCommand(new PingCommand($this))
200
            ->on('error', function ($reason) use ($promise) {
201
                return $promise->reject($reason);
202
            })
203
            ->on('success', function () use ($promise) {
204
                return $promise->resolve();
205
            });
206
    }
207
208
    public function selectDB($dbname)
209
    {
210
        return $this->query(sprintf('USE `%s`', $dbname));
211
    }
212
213
    public function setOption($name, $value)
214
    {
215
        $this->config[$name] = $value;
216
217
        return $this;
218
    }
219
220
    public function getOption($name, $default = null)
221
    {
222
        if (isset($this->config[$name]))
223
        {
224
            return $this->config[$name];
225
        }
226
227
        return $default;
228
    }
229
230
    public function getState()
231
    {
232
        return $this->state;
233
    }
234
235
    public function handleConnectionError($err)
236
    {
237
        $this->emit('error', [ $err, $this ]);
238
    }
239
240
    public function handleConnectionClosed()
241
    {
242
        if ($this->state < self::STATE_CLOSEING)
243
        {
244
            $this->state = self::STATE_STOPPED;
245
            $this->emit('error', [ new RuntimeException('mysql server has gone away'), $this ]);
246
        }
247
    }
248
249
    protected function doCommand(CommandInterface $command)
250
    {
251
        if ($command->equals(Command::INIT_AUTHENTICATE))
252
        {
253
            return $this->executor->undequeue($command);
254
        }
255
        elseif ($this->state >= self::STATE_CONNECT_PENDING && $this->state <= self::STATE_AUTH_SUCCEEDED)
256
        {
257
            return $this->executor->enqueue($command);
258
        }
259
        else
260
        {
261
            throw new Exception("Cann't send command");
262
        }
263
    }
264
265
    public function getServerOptions()
266
    {
267
        return $this->serverOptions;
268
    }
269
270
    protected function connect()
271
    {
272
        $socket = new Socket($this->config['endpoint'], $this->getLoop());
273
        return Promise::doResolve($socket);
274
    }
275
276
    /**
277
     * Create executor.
278
     *
279
     * @return Executor
280
     */
281
    protected function createExecutor()
282
    {
283
        return new Executor($this);
284
    }
285
286
    /**
287
     * Create configuration file.
288
     *
289
     * @param mixed[] $config
290
     * @return mixed[]
291
     */
292
    protected function createConfig($config = [])
293
    {
294
        $default = [
295
            'endpoint' => 'tcp://127.0.0.1:3306',
296
            'user'     => 'root',
297
            'pass'     => '',
298
            'dbname'   => '',
299
        ];
300
        return array_merge($default, $config);
301
    }
302
}
303