Completed
Push — master ( d1f44d...2f7083 )
by Kamil
02:59
created

Database::query()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 17
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 17
ccs 0
cts 12
cp 0
rs 9.4285
cc 1
eloc 10
nc 1
nop 2
crap 2
1
<?php
2
3
namespace Dazzle\MySQL;
4
5
use Dazzle\Event\BaseEventEmitter;
6
use Dazzle\Loop\LoopAwareTrait;
7
use Dazzle\Loop\LoopInterface;
8
use Dazzle\MySQL\Protocol\Command\AuthCommand;
9
use Dazzle\MySQL\Protocol\Command\PingCommand;
10
use Dazzle\MySQL\Protocol\Command\QueryCommand;
11
use Dazzle\MySQL\Protocol\Command\QuitCommand;
12
use Dazzle\MySQL\Protocol\Command;
13
use Dazzle\MySQL\Protocol\CommandInterface;
14
use Dazzle\MySQL\Protocol\ProtocolParser;
15
use Dazzle\MySQL\Protocol\Query;
16
use Dazzle\MySQL\Protocol\QueryInterface;
17
use Dazzle\MySQL\Support\Queue\Queue;
18
use Dazzle\MySQL\Support\Queue\QueueInterface;
19
use Dazzle\MySQL\Support\Transaction\TransactionBox;
20
use Dazzle\MySQL\Support\Transaction\TransactionBoxInterface;
21
use Dazzle\Promise\Promise;
22
use Dazzle\Promise\PromiseInterface;
23
use Dazzle\Socket\Socket;
24
use Dazzle\Socket\SocketInterface;
25
use Dazzle\Throwable\Exception\Runtime\ExecutionException;
26
use RuntimeException;
27
28
class Database extends BaseEventEmitter implements DatabaseInterface
29
{
30
    use LoopAwareTrait;
31
32
    /**
33
     * @var int
34
     */
35
    const STATE_INIT                 = 0;
36
37
    /**
38
     * @var int
39
     */
40
    const STATE_CONNECT_PENDING      = 4;
41
42
    /**
43
     * @var int
44
     */
45
    const STATE_CONNECT_FAILED       = 2;
46
47
    /**
48
     * @var int
49
     */
50
    const STATE_CONNECT_SUCCEEDED    = 6;
51
52
    /**
53
     * @var int
54
     */
55
    const STATE_AUTH_PENDING         = 5;
56
57
    /**
58
     * @var int
59
     */
60
    const STATE_AUTH_FAILED          = 3;
61
62
    /**
63
     * @var int
64
     */
65
    const STATE_AUTH_SUCCEEDED       = 7;
66
67
    /**
68
     * @var int
69
     */
70
    const STATE_DISCONNECT_PENDING   = 8;
71
72
    /**
73
     * @var int
74
     */
75
    const STATE_DISCONNECT_SUCCEEDED = 1;
76
77
    /**
78
     * @var mixed[]
79
     */
80
    protected $config;
81
82
    /**
83
     * @var mixed[]
84
     */
85
    protected $serverInfo;
86
87
    /**
88
     * @var int
89
     */
90
    protected $state;
91
92
    /**
93
     * @var Queue|QueueInterface
94
     */
95
    protected $queue;
96
97
    protected $parser;
98
99
    protected $stream;
100
101
    protected $trans;
102
103
    /**
104
     * @param LoopInterface $loop
105
     * @param mixed[] $config
106
     */
107
    public function __construct(LoopInterface $loop, $config = [])
108
    {
109
        $this->loop = $loop;
110
        $this->config = $this->createConfig($config);
111
        $this->serverInfo = [];
112
        $this->state = self::STATE_INIT;
113
        $this->queue = $this->createQueue();
114
        $this->parser = null;
115
        $this->stream = null;
116
        $this->trans = $this->createTransactionBox();
117
    }
118
119
    /**
120
     * @override
121
     * @inheritDoc
122
     */
123
    public function isPaused()
124
    {
125
        // TODO
126
        return false;
127
    }
128
129
    /**
130
     * @override
131
     * @inheritDoc
132
     */
133
    public function pause()
134
    {
135
        // TODO
136
    }
137
138
    /**
139
     * @override
140
     * @inheritDoc
141
     */
142
    public function resume()
143
    {
144
        // TODO
145
    }
146
147
    /**
148
     * @override
149
     * @inheritDoc
150
     */
151
    public function isStarted()
152
    {
153
        return $this->state >= self::STATE_CONNECT_PENDING;
154
    }
155
156
    /**
157
     * @override
158
     * @inheritDoc
159
     */
160
    public function start()
161
    {
162
        if ($this->isStarted())
163
        {
164
            return Promise::doResolve($this);
165
        }
166
167
        $promise = new Promise();
168
169
        $this->state = self::STATE_CONNECT_PENDING;
170
        $config = $this->config;
171
172
        $errorHandler = function ($command, $reason) use ($promise) {
173
            $this->state = self::STATE_AUTH_FAILED;
174
            return $promise->reject($reason);
175
        };
176
177
        $connectedHandler = function ($command, $info) use ($promise) {
178
            $this->state = self::STATE_AUTH_SUCCEEDED;
179
            $this->serverInfo = $info;
180
            return $promise->resolve($info);
181
        };
182
183
        $this
184
            ->connect()
185
            ->then(function($stream) use ($config, $errorHandler, $connectedHandler) {
186
                $this->stream = $stream;
187
188
                $stream->on('error', [ $this, 'handleSocketError' ]);
189
                $stream->on('close', [ $this, 'handleSocketClose' ]);
190
191
                $this->state  = self::STATE_AUTH_PENDING;
192
                $this->parser = new ProtocolParser($stream, $this->queue, $config);
193
194
                $command = $this->doAuth(new AuthCommand($this));
195
                $command->on('success', $connectedHandler);
196
                $command->on('error', $errorHandler);
197
198
                //$parser->on('close', $closeHandler);
0 ignored issues
show
Unused Code Comprehensibility introduced by
80% of this comment could be valid code. Did you maybe forget this after debugging?

Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.

The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.

This check looks for comments that seem to be mostly valid code and reports them.

Loading history...
199
                $this->parser->start();
200
            })
201
            ->done(null, [ $this, 'handleError' ]);
202
203
        return $promise;
204
    }
205
206
    /**
207
     * @override
208
     * @inheritDoc
209
     */
210
    public function stop()
211
    {
212
        if (!$this->isStarted())
213
        {
214
            return Promise::doResolve($this);
215
        }
216
        return new Promise(function($resolve, $reject) {
0 ignored issues
show
Unused Code introduced by
The parameter $reject is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
217
            $this
218
                ->doCommand(new QuitCommand($this))
219
                ->on('success', function() use($resolve) {
220
                    $this->state = self::STATE_DISCONNECT_SUCCEEDED;
221
                    $this->emit('stop', [ $this ]);
222
                    $resolve($this);
223
                });
224
            $this->state = self::STATE_DISCONNECT_PENDING;
225
        });
226
    }
227
228
    /**
229
     * @override
230
     * @inheritDoc
231
     */
232
    public function getState()
233
    {
234
        return $this->state;
235
    }
236
237
    /**
238
     * @override
239
     * @inheritDoc
240
     */
241
    public function getInfo()
242
    {
243
        return $this->serverInfo;
244
    }
245
246
    /**
247
     * @override
248
     * @inheritDoc
249
     */
250
    public function setDatabase($dbname)
251
    {
252
        return $this->query(sprintf('USE `%s`', $dbname));
253
    }
254
255
    /**
256
     * @override
257
     * @inheritDoc
258
     */
259
    public function getDatabase()
260
    {
261
        // TODO
262
    }
263
264
    /**
265
     * @override
266
     * @inheritDoc
267
     */
268
    public function query($sql, $sqlParams = [])
269
    {
270
        $promise = new Promise();
271
        $query   = new Query($sql, $sqlParams);
272
        $command = new QueryCommand($this, $query);
273
274
        $this->doCommand($command);
275
276
        $command->on('error', function ($command, $err) use ($promise) {
277
            return $promise->reject($err);
278
        });
279
        $command->on('success', function ($command) use ($promise) {
280
            return $promise->resolve($command);
281
        });
282
283
        return $promise;
284
    }
285
286
    /**
287
     * @override
288
     * @inheritDoc
289
     */
290
    public function execute($sql, $sqlParams = [])
291
    {
292
        return $this->query($sql, $sqlParams)->then(function($command) {
293
            return $command->affectedRows;
294
        });
295
    }
296
297
    /**
298
     * @override
299
     * @inheritDoc
300
     */
301
    public function ping()
302
    {
303
        $promise = new Promise();
304
305
        $command = $this->doCommand(new PingCommand($this));
306
        $command->on('error', function ($command, $reason) use ($promise) {
307
            return $promise->reject($reason);
308
        });
309
        $command->on('success', function () use ($promise) {
310
            return $promise->resolve();
311
        });
312
313
        return $promise;
314
    }
315
316
    /**
317
     * @override
318
     * @inheritDoc
319
     */
320
    public function beginTransaction()
321
    {
322
        return $this->trans->enqueue(new Transaction($this));
323
    }
324
325
    /**
326
     * @override
327
     * @inheritDoc
328
     */
329
    public function endTransaction(TransactionInterface $trans)
330
    {
331
        // TODO
332
    }
333
334
    /**
335
     * @override
336
     * @inheritDoc
337
     */
338
    public function inTransaction()
339
    {
340
        return !$this->trans->isEmpty();
341
    }
342
343
    /**
344
     * @internal
345
     */
346
    public function handleError($err)
347
    {
348
        $this->emit('error', [ $this, $err ]);
349
    }
350
351
    /**
352
     * @internal
353
     */
354
    public function handleSocketError($socket, $err)
355
    {
356
        $this->emit('error', [ $this, $err ]);
357
    }
358
359
    /**
360
     * @internal
361
     */
362
    public function handleSocketClose()
363
    {
364
        if ($this->state < self::STATE_DISCONNECT_PENDING)
365
        {
366
            $this->state = self::STATE_DISCONNECT_SUCCEEDED;
367
            $this->emit('error', [ $this, new RuntimeException('MySQL server has gone away!') ]);
368
        }
369
    }
370
371
    /**
372
     * Do auth command.
373
     *
374
     * @param CommandInterface $command
375
     * @return CommandInterface
376
     * @throws ExecutionException
377
     */
378
    protected function doAuth(CommandInterface $command)
379
    {
380
        if ($command->equals(Command::INIT_AUTHENTICATE))
381
        {
382
            return $this->queue->unshift($command);
0 ignored issues
show
Bug introduced by
The method unshift does only exist in Dazzle\MySQL\Support\Queue\Queue, but not in Dazzle\MySQL\Support\Queue\QueueInterface.

It seems like the method you are trying to call exists only in some of the possible types.

Let’s take a look at an example:

class A
{
    public function foo() { }
}

class B extends A
{
    public function bar() { }
}

/**
 * @param A|B $x
 */
function someFunction($x)
{
    $x->foo(); // This call is fine as the method exists in A and B.
    $x->bar(); // This method only exists in B and might cause an error.
}

Available Fixes

  1. Add an additional type-check:

    /**
     * @param A|B $x
     */
    function someFunction($x)
    {
        $x->foo();
    
        if ($x instanceof B) {
            $x->bar();
        }
    }
    
  2. Only allow a single type to be passed if the variable comes from a parameter:

    function someFunction(B $x) { /** ... */ }
    
Loading history...
383
        }
384
        throw new ExecutionException("Cann't send command");
385
    }
386
387
    /**
388
     * Do command.
389
     *
390
     * @param CommandInterface $command
391
     * @return CommandInterface
392
     * @throws ExecutionException
393
     */
394
    protected function doCommand(CommandInterface $command)
395
    {
396
        if ($this->state >= self::STATE_CONNECT_PENDING && $this->state <= self::STATE_AUTH_SUCCEEDED)
397
        {
398
            return $this->queue->enqueue($command);
399
        }
400
        throw new ExecutionException("Cann't send command");
401
    }
402
403
    /**
404
     * Connect to the database endpoint.
405
     *
406
     * @return PromiseInterface
407
     */
408
    protected function connect()
409
    {
410
        return Promise::doResolve(
411
            new Socket($this->config['endpoint'], $this->getLoop())
412
        );
413
    }
414
415
    /**
416
     * Create Queue.
417
     *
418
     * @return Queue|QueueInterface
419
     */
420
    protected function createQueue()
421
    {
422
        return new Queue();
423
    }
424
425
    /**
426
     * Create transaction box.
427
     *
428
     * @return TransactionBoxInterface
429
     */
430
    protected function createTransactionBox()
431
    {
432
        return new TransactionBox();
433
    }
434
435
    /**
436
     * Create configuration file.
437
     *
438
     * @param mixed[] $config
439
     * @return mixed[]
440
     */
441
    protected function createConfig($config = [])
442
    {
443
        $default = [
444
            'endpoint' => 'tcp://127.0.0.1:3306',
445
            'user'     => 'root',
446
            'pass'     => '',
447
            'dbname'   => '',
448
        ];
449
        return array_merge($default, $config);
450
    }
451
}
452