Completed
Push — master ( 2f7083...d63bf6 )
by Kamil
02:31
created

Database::commitTransaction()   B

Complexity

Conditions 4
Paths 2

Size

Total Lines 38
Code Lines 22

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 20

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 38
ccs 0
cts 31
cp 0
rs 8.5806
cc 4
eloc 22
nc 2
nop 1
crap 20
1
<?php
2
3
namespace Dazzle\MySQL;
4
5
use Dazzle\Event\BaseEventEmitter;
6
use Dazzle\Loop\LoopAwareTrait;
7
use Dazzle\Loop\LoopInterface;
8
use Dazzle\MySQL\Protocol\Command\AuthCommand;
9
use Dazzle\MySQL\Protocol\Command\PingCommand;
10
use Dazzle\MySQL\Protocol\Command\QueryCommand;
11
use Dazzle\MySQL\Protocol\Command\QuitCommand;
12
use Dazzle\MySQL\Protocol\Command;
13
use Dazzle\MySQL\Protocol\CommandInterface;
14
use Dazzle\MySQL\Protocol\ProtocolParser;
15
use Dazzle\MySQL\Protocol\Query;
16
use Dazzle\MySQL\Protocol\QueryInterface;
17
use Dazzle\MySQL\Support\Queue\Queue;
18
use Dazzle\MySQL\Support\Queue\QueueInterface;
19
use Dazzle\MySQL\Support\Transaction\TransactionBox;
20
use Dazzle\MySQL\Support\Transaction\TransactionBoxInterface;
21
use Dazzle\Promise\Promise;
22
use Dazzle\Promise\PromiseInterface;
23
use Dazzle\Socket\Socket;
24
use Dazzle\Socket\SocketInterface;
25
use Dazzle\Throwable\Exception\Runtime\ExecutionException;
26
use RuntimeException;
27
use SplQueue;
28
29
class Database extends BaseEventEmitter implements DatabaseInterface
30
{
31
    use LoopAwareTrait;
32
33
    /**
34
     * @var int
35
     */
36
    const STATE_INIT                 = 0;
37
38
    /**
39
     * @var int
40
     */
41
    const STATE_CONNECT_PENDING      = 4;
42
43
    /**
44
     * @var int
45
     */
46
    const STATE_CONNECT_FAILED       = 2;
47
48
    /**
49
     * @var int
50
     */
51
    const STATE_CONNECT_SUCCEEDED    = 6;
52
53
    /**
54
     * @var int
55
     */
56
    const STATE_AUTH_PENDING         = 5;
57
58
    /**
59
     * @var int
60
     */
61
    const STATE_AUTH_FAILED          = 3;
62
63
    /**
64
     * @var int
65
     */
66
    const STATE_AUTH_SUCCEEDED       = 7;
67
68
    /**
69
     * @var int
70
     */
71
    const STATE_DISCONNECT_PENDING   = 8;
72
73
    /**
74
     * @var int
75
     */
76
    const STATE_DISCONNECT_SUCCEEDED = 1;
77
78
    /**
79
     * @var mixed[]
80
     */
81
    protected $config;
82
83
    /**
84
     * @var mixed[]
85
     */
86
    protected $serverInfo;
87
88
    /**
89
     * @var int
90
     */
91
    protected $state;
92
93
    /**
94
     * @var Queue|QueueInterface
95
     */
96
    protected $queue;
97
98
    /**
99
     * @var ProtocolParser|null
100
     */
101
    protected $parser;
102
103
    /**
104
     * @var SocketInterface|null
105
     */
106
    protected $stream;
107
108
    /**
109
     * @var TransactionBoxInterface
110
     */
111
    protected $transBox;
112
113
    /**
114
     * @param LoopInterface $loop
115
     * @param mixed[] $config
116
     */
117
    public function __construct(LoopInterface $loop, $config = [])
118
    {
119
        $this->loop = $loop;
120
        $this->config = $this->createConfig($config);
121
        $this->serverInfo = [];
122
        $this->state = self::STATE_INIT;
123
        $this->queue = $this->createQueue();
124
        $this->parser = null;
125
        $this->stream = null;
126
        $this->transBox = $this->createTransactionBox();
127
    }
128
129
    /**
130
     * @override
131
     * @inheritDoc
132
     */
133
    public function isPaused()
134
    {
135
        // TODO
136
        return false;
137
    }
138
139
    /**
140
     * @override
141
     * @inheritDoc
142
     */
143
    public function pause()
144
    {
145
        // TODO
146
    }
147
148
    /**
149
     * @override
150
     * @inheritDoc
151
     */
152
    public function resume()
153
    {
154
        // TODO
155
    }
156
157
    /**
158
     * @override
159
     * @inheritDoc
160
     */
161
    public function isStarted()
162
    {
163
        return $this->state >= self::STATE_CONNECT_PENDING;
164
    }
165
166
    /**
167
     * @override
168
     * @inheritDoc
169
     */
170
    public function start()
171
    {
172
        if ($this->isStarted())
173
        {
174
            return Promise::doResolve($this);
175
        }
176
177
        $promise = new Promise();
178
179
        $this->state = self::STATE_CONNECT_PENDING;
180
        $config = $this->config;
181
182
        $errorHandler = function ($command, $reason) use ($promise) {
183
            $this->state = self::STATE_AUTH_FAILED;
184
            return $promise->reject($reason);
185
        };
186
187
        $connectedHandler = function ($command, $info) use ($promise) {
188
            $this->state = self::STATE_AUTH_SUCCEEDED;
189
            $this->serverInfo = $info;
190
            return $promise->resolve($info);
191
        };
192
193
        $this
194
            ->connect()
195
            ->then(function($stream) use ($config, $errorHandler, $connectedHandler) {
196
                $this->stream = $stream;
197
198
                $stream->on('error', [ $this, 'handleSocketError' ]);
199
                $stream->on('close', [ $this, 'handleSocketClose' ]);
200
201
                $this->state  = self::STATE_AUTH_PENDING;
202
                $this->parser = new ProtocolParser($stream, $this->queue, $config);
203
204
                $command = $this->doAuth(new AuthCommand($this));
205
                $command->on('success', $connectedHandler);
206
                $command->on('error', $errorHandler);
207
208
                $this->parser->start();
209
            })
210
            ->done(null, [ $this, 'handleError' ]);
211
212
        return $promise;
213
    }
214
215
    /**
216
     * @override
217
     * @inheritDoc
218
     */
219
    public function stop()
220
    {
221
        if (!$this->isStarted())
222
        {
223
            return Promise::doResolve($this);
224
        }
225
        return new Promise(function($resolve, $reject) {
0 ignored issues
show
Unused Code introduced by
The parameter $reject is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
226
            $this
227
                ->doCommand(new QuitCommand($this))
228
                ->on('success', function() use($resolve) {
229
                    $this->state = self::STATE_DISCONNECT_SUCCEEDED;
230
                    $this->emit('stop', [ $this ]);
231
                    $resolve($this);
232
                });
233
            $this->state = self::STATE_DISCONNECT_PENDING;
234
        });
235
    }
236
237
    /**
238
     * @override
239
     * @inheritDoc
240
     */
241
    public function getState()
242
    {
243
        return $this->state;
244
    }
245
246
    /**
247
     * @override
248
     * @inheritDoc
249
     */
250
    public function getInfo()
251
    {
252
        return $this->serverInfo;
253
    }
254
255
    /**
256
     * @override
257
     * @inheritDoc
258
     */
259
    public function setDatabase($dbname)
260
    {
261
        return $this->query(sprintf('USE `%s`', $dbname));
262
    }
263
264
    /**
265
     * @override
266
     * @inheritDoc
267
     */
268
    public function getDatabase()
269
    {
270
        // TODO
271
    }
272
273
    /**
274
     * @override
275
     * @inheritDoc
276
     */
277
    public function query($sql, $sqlParams = [])
278
    {
279
        $promise = new Promise();
280
        $query   = new Query($sql, $sqlParams);
281
        $command = new QueryCommand($this, $query);
282
283
        $this->doCommand($command);
284
285
        $command->on('error', function ($command, $err) use ($promise) {
286
            return $promise->reject($err);
287
        });
288
        $command->on('success', function ($command) use ($promise) {
289
            return $promise->resolve($command);
290
        });
291
292
        return $promise;
293
    }
294
295
    /**
296
     * @override
297
     * @inheritDoc
298
     */
299
    public function execute($sql, $sqlParams = [])
300
    {
301
        return $this->query($sql, $sqlParams)->then(function($command) {
302
            return $command->affectedRows;
303
        });
304
    }
305
306
    /**
307
     * @override
308
     * @inheritDoc
309
     */
310
    public function ping()
311
    {
312
        $promise = new Promise();
313
314
        $command = $this->doCommand(new PingCommand($this));
315
        $command->on('error', function ($command, $reason) use ($promise) {
316
            return $promise->reject($reason);
317
        });
318
        $command->on('success', function () use ($promise) {
319
            return $promise->resolve();
320
        });
321
322
        return $promise;
323
    }
324
325
    /**
326
     * @override
327
     * @inheritDoc
328
     */
329
    public function beginTransaction()
330
    {
331
        $trans = new Transaction($this);
332
333
        $trans->on('commit', function(TransactionInterface $trans, SplQueue $queue) {
334
            $this->commitTransaction($queue)->then(
335
                function() use($trans) {
336
                    return $trans->emit('success', [ $trans ]);
337
                },
338
                function($ex) use($trans) {
339
                    return $trans->emit('error', [ $trans, $ex ]);
340
                }
341
            );
342
            $this->transBox->remove($trans);
343
        });
344
        $trans->on('rollback', function(TransactionInterface $trans) {
345
            $this->transBox->remove($trans);
346
        });
347
348
        return $this->transBox->add($trans);
349
    }
350
351
    /**
352
     * @override
353
     * @inheritDoc
354
     */
355
    public function endTransaction(TransactionInterface $trans)
356
    {
357
        return $trans->rollback();
358
    }
359
360
    /**
361
     * Try to commit a transaction.
362
     *
363
     * @param SplQueue $queue
364
     * @return PromiseInterface
365
     */
366
    protected function commitTransaction(SplQueue $queue)
367
    {
368
        $promise = new Promise();
369
        $ex = null;
370
371
        $queue->unshift(new QueryCommand($this, new Query('BEGIN')));
372
        $queue->unshift(new QueryCommand($this, new Query('START TRANSACTION')));
373
374
        $size = 0;
375
        $sizeCap = $queue->count();
376
377
        while (!$queue->isEmpty())
378
        {
379
            $command = $this->doCommand($queue->dequeue());
380
            $command->on('error', function($command, $err) use(&$ex, $promise) {
381
                if ($ex === null)
382
                {
383
                    $ex = $err;
384
                    $this->doCommand(new QueryCommand($this, new Query('ROLLBACK')));
385
                    $promise->reject($ex);
386
                }
387
            });
388
            $command->on('success', function() use (&$size, &$sizeCap, $promise) {
389
                if (++$size >= $sizeCap)
390
                {
391
                    $commit = $this->doCommand(new QueryCommand($this, new Query('COMMIT')));
392
                    $commit->on('success', function() use($promise) {
393
                        return $promise->resolve();
394
                    });
395
                    $commit->on('error', function($command, $err) use($promise) {
396
                        return $promise->reject($err);
397
                    });
398
                }
399
            });
400
        }
401
402
        return $promise;
403
    }
404
405
    /**
406
     * @override
407
     * @inheritDoc
408
     */
409
    public function inTransaction()
410
    {
411
        return !$this->transBox->isEmpty();
412
    }
413
414
    /**
415
     * @internal
416
     */
417
    public function handleError($err)
418
    {
419
        $this->emit('error', [ $this, $err ]);
420
    }
421
422
    /**
423
     * @internal
424
     */
425
    public function handleSocketError($socket, $err)
426
    {
427
        $this->emit('error', [ $this, $err ]);
428
    }
429
430
    /**
431
     * @internal
432
     */
433
    public function handleSocketClose()
434
    {
435
        if ($this->state < self::STATE_DISCONNECT_PENDING)
436
        {
437
            $this->state = self::STATE_DISCONNECT_SUCCEEDED;
438
            $this->emit('error', [ $this, new RuntimeException('MySQL server has gone away!') ]);
439
        }
440
    }
441
442
    /**
443
     * Do auth command.
444
     *
445
     * @param CommandInterface $command
446
     * @return CommandInterface
447
     * @throws ExecutionException
448
     */
449
    protected function doAuth(CommandInterface $command)
450
    {
451
        if ($command->equals(Command::INIT_AUTHENTICATE))
452
        {
453
            return $this->queue->unshift($command);
0 ignored issues
show
Bug introduced by
The method unshift does only exist in Dazzle\MySQL\Support\Queue\Queue, but not in Dazzle\MySQL\Support\Queue\QueueInterface.

It seems like the method you are trying to call exists only in some of the possible types.

Let’s take a look at an example:

class A
{
    public function foo() { }
}

class B extends A
{
    public function bar() { }
}

/**
 * @param A|B $x
 */
function someFunction($x)
{
    $x->foo(); // This call is fine as the method exists in A and B.
    $x->bar(); // This method only exists in B and might cause an error.
}

Available Fixes

  1. Add an additional type-check:

    /**
     * @param A|B $x
     */
    function someFunction($x)
    {
        $x->foo();
    
        if ($x instanceof B) {
            $x->bar();
        }
    }
    
  2. Only allow a single type to be passed if the variable comes from a parameter:

    function someFunction(B $x) { /** ... */ }
    
Loading history...
454
        }
455
        throw new ExecutionException("Cann't send command");
456
    }
457
458
    /**
459
     * Do command.
460
     *
461
     * @param CommandInterface $command
462
     * @return CommandInterface
463
     * @throws ExecutionException
464
     */
465
    protected function doCommand($command)
466
    {
467
        if ($this->state >= self::STATE_CONNECT_PENDING && $this->state <= self::STATE_AUTH_SUCCEEDED)
468
        {
469
            return $this->queue->enqueue($command);
470
        }
471
        throw new ExecutionException("Cann't send command");
472
    }
473
474
    /**
475
     * Connect to the database endpoint.
476
     *
477
     * @return PromiseInterface
478
     */
479
    protected function connect()
480
    {
481
        return Promise::doResolve(
482
            new Socket($this->config['endpoint'], $this->getLoop())
483
        );
484
    }
485
486
    /**
487
     * Create Queue.
488
     *
489
     * @return Queue|QueueInterface
490
     */
491
    protected function createQueue()
492
    {
493
        return new Queue();
494
    }
495
496
    /**
497
     * Create transaction box.
498
     *
499
     * @return TransactionBoxInterface
500
     */
501
    protected function createTransactionBox()
502
    {
503
        return new TransactionBox();
504
    }
505
506
    /**
507
     * Create configuration file.
508
     *
509
     * @param mixed[] $config
510
     * @return mixed[]
511
     */
512
    protected function createConfig($config = [])
513
    {
514
        $default = [
515
            'endpoint' => 'tcp://127.0.0.1:3306',
516
            'user'     => 'root',
517
            'pass'     => '',
518
            'dbname'   => '',
519
        ];
520
        return array_merge($default, $config);
521
    }
522
}
523