1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Dazzle\MySQL; |
4
|
|
|
|
5
|
|
|
use Dazzle\Event\BaseEventEmitter; |
6
|
|
|
use Dazzle\Loop\LoopAwareTrait; |
7
|
|
|
use Dazzle\Loop\LoopInterface; |
8
|
|
|
use Dazzle\MySQL\Protocol\Command\AuthCommand; |
9
|
|
|
use Dazzle\MySQL\Protocol\Command\PingCommand; |
10
|
|
|
use Dazzle\MySQL\Protocol\Command\QueryCommand; |
11
|
|
|
use Dazzle\MySQL\Protocol\Command\QuitCommand; |
12
|
|
|
use Dazzle\MySQL\Protocol\Command; |
13
|
|
|
use Dazzle\MySQL\Protocol\CommandInterface; |
14
|
|
|
use Dazzle\MySQL\Protocol\ProtocolParser; |
15
|
|
|
use Dazzle\MySQL\Protocol\Query; |
16
|
|
|
use Dazzle\MySQL\Protocol\QueryInterface; |
17
|
|
|
use Dazzle\MySQL\Support\Queue\Queue; |
18
|
|
|
use Dazzle\MySQL\Support\Queue\QueueInterface; |
19
|
|
|
use Dazzle\MySQL\Support\Transaction\TransactionBox; |
20
|
|
|
use Dazzle\MySQL\Support\Transaction\TransactionBoxInterface; |
21
|
|
|
use Dazzle\Promise\Promise; |
22
|
|
|
use Dazzle\Promise\PromiseInterface; |
23
|
|
|
use Dazzle\Socket\Socket; |
24
|
|
|
use Dazzle\Socket\SocketInterface; |
25
|
|
|
use Dazzle\Throwable\Exception\Runtime\ExecutionException; |
26
|
|
|
use RuntimeException; |
27
|
|
|
|
28
|
|
|
class Database extends BaseEventEmitter implements DatabaseInterface |
29
|
|
|
{ |
30
|
|
|
use LoopAwareTrait; |
31
|
|
|
|
32
|
|
|
/** |
33
|
|
|
* @var int |
34
|
|
|
*/ |
35
|
|
|
const STATE_INIT = 0; |
36
|
|
|
|
37
|
|
|
/** |
38
|
|
|
* @var int |
39
|
|
|
*/ |
40
|
|
|
const STATE_CONNECT_PENDING = 4; |
41
|
|
|
|
42
|
|
|
/** |
43
|
|
|
* @var int |
44
|
|
|
*/ |
45
|
|
|
const STATE_CONNECT_FAILED = 2; |
46
|
|
|
|
47
|
|
|
/** |
48
|
|
|
* @var int |
49
|
|
|
*/ |
50
|
|
|
const STATE_CONNECT_SUCCEEDED = 6; |
51
|
|
|
|
52
|
|
|
/** |
53
|
|
|
* @var int |
54
|
|
|
*/ |
55
|
|
|
const STATE_AUTH_PENDING = 5; |
56
|
|
|
|
57
|
|
|
/** |
58
|
|
|
* @var int |
59
|
|
|
*/ |
60
|
|
|
const STATE_AUTH_FAILED = 3; |
61
|
|
|
|
62
|
|
|
/** |
63
|
|
|
* @var int |
64
|
|
|
*/ |
65
|
|
|
const STATE_AUTH_SUCCEEDED = 7; |
66
|
|
|
|
67
|
|
|
/** |
68
|
|
|
* @var int |
69
|
|
|
*/ |
70
|
|
|
const STATE_DISCONNECT_PENDING = 8; |
71
|
|
|
|
72
|
|
|
/** |
73
|
|
|
* @var int |
74
|
|
|
*/ |
75
|
|
|
const STATE_DISCONNECT_SUCCEEDED = 1; |
76
|
|
|
|
77
|
|
|
/** |
78
|
|
|
* @var mixed[] |
79
|
|
|
*/ |
80
|
|
|
protected $config; |
81
|
|
|
|
82
|
|
|
/** |
83
|
|
|
* @var mixed[] |
84
|
|
|
*/ |
85
|
|
|
protected $serverInfo; |
86
|
|
|
|
87
|
|
|
/** |
88
|
|
|
* @var int |
89
|
|
|
*/ |
90
|
|
|
protected $state; |
91
|
|
|
|
92
|
|
|
/** |
93
|
|
|
* @var Queue|QueueInterface |
94
|
|
|
*/ |
95
|
|
|
protected $queue; |
96
|
|
|
|
97
|
|
|
protected $parser; |
98
|
|
|
|
99
|
|
|
protected $stream; |
100
|
|
|
|
101
|
|
|
protected $trans; |
102
|
|
|
|
103
|
|
|
/** |
104
|
|
|
* @param LoopInterface $loop |
105
|
|
|
* @param mixed[] $config |
106
|
|
|
*/ |
107
|
|
|
public function __construct(LoopInterface $loop, $config = []) |
108
|
|
|
{ |
109
|
|
|
$this->loop = $loop; |
110
|
|
|
$this->config = $this->createConfig($config); |
111
|
|
|
$this->serverInfo = []; |
112
|
|
|
$this->state = self::STATE_INIT; |
113
|
|
|
$this->queue = $this->createQueue(); |
114
|
|
|
$this->parser = null; |
115
|
|
|
$this->stream = null; |
116
|
|
|
$this->trans = $this->createTransactionBox(); |
117
|
|
|
} |
118
|
|
|
|
119
|
|
|
/** |
120
|
|
|
* @override |
121
|
|
|
* @inheritDoc |
122
|
|
|
*/ |
123
|
|
|
public function isPaused() |
124
|
|
|
{ |
125
|
|
|
// TODO |
126
|
|
|
return false; |
127
|
|
|
} |
128
|
|
|
|
129
|
|
|
/** |
130
|
|
|
* @override |
131
|
|
|
* @inheritDoc |
132
|
|
|
*/ |
133
|
|
|
public function pause() |
134
|
|
|
{ |
135
|
|
|
// TODO |
136
|
|
|
} |
137
|
|
|
|
138
|
|
|
/** |
139
|
|
|
* @override |
140
|
|
|
* @inheritDoc |
141
|
|
|
*/ |
142
|
|
|
public function resume() |
143
|
|
|
{ |
144
|
|
|
// TODO |
145
|
|
|
} |
146
|
|
|
|
147
|
|
|
/** |
148
|
|
|
* @override |
149
|
|
|
* @inheritDoc |
150
|
|
|
*/ |
151
|
|
|
public function isStarted() |
152
|
|
|
{ |
153
|
|
|
return $this->state >= self::STATE_CONNECT_PENDING; |
154
|
|
|
} |
155
|
|
|
|
156
|
|
|
/** |
157
|
|
|
* @override |
158
|
|
|
* @inheritDoc |
159
|
|
|
*/ |
160
|
|
|
public function start() |
161
|
|
|
{ |
162
|
|
|
if ($this->isStarted()) |
163
|
|
|
{ |
164
|
|
|
return Promise::doResolve($this); |
165
|
|
|
} |
166
|
|
|
|
167
|
|
|
$promise = new Promise(); |
168
|
|
|
|
169
|
|
|
$this->state = self::STATE_CONNECT_PENDING; |
170
|
|
|
$config = $this->config; |
171
|
|
|
|
172
|
|
|
$errorHandler = function ($command, $reason) use ($promise) { |
173
|
|
|
$this->state = self::STATE_AUTH_FAILED; |
174
|
|
|
return $promise->reject($reason); |
175
|
|
|
}; |
176
|
|
|
|
177
|
|
|
$connectedHandler = function ($command, $info) use ($promise) { |
178
|
|
|
$this->state = self::STATE_AUTH_SUCCEEDED; |
179
|
|
|
$this->serverInfo = $info; |
180
|
|
|
return $promise->resolve($info); |
181
|
|
|
}; |
182
|
|
|
|
183
|
|
|
$this |
184
|
|
|
->connect() |
185
|
|
|
->then(function($stream) use ($config, $errorHandler, $connectedHandler) { |
186
|
|
|
$this->stream = $stream; |
187
|
|
|
|
188
|
|
|
$stream->on('error', [ $this, 'handleSocketError' ]); |
189
|
|
|
$stream->on('close', [ $this, 'handleSocketClose' ]); |
190
|
|
|
|
191
|
|
|
$this->state = self::STATE_AUTH_PENDING; |
192
|
|
|
$this->parser = new ProtocolParser($stream, $this->queue, $config); |
193
|
|
|
|
194
|
|
|
$command = $this->doAuth(new AuthCommand($this)); |
195
|
|
|
$command->on('success', $connectedHandler); |
196
|
|
|
$command->on('error', $errorHandler); |
197
|
|
|
|
198
|
|
|
//$parser->on('close', $closeHandler); |
|
|
|
|
199
|
|
|
$this->parser->start(); |
200
|
|
|
}) |
201
|
|
|
->done(null, [ $this, 'handleError' ]); |
202
|
|
|
|
203
|
|
|
return $promise; |
204
|
|
|
} |
205
|
|
|
|
206
|
|
|
/** |
207
|
|
|
* @override |
208
|
|
|
* @inheritDoc |
209
|
|
|
*/ |
210
|
|
|
public function stop() |
211
|
|
|
{ |
212
|
|
|
if (!$this->isStarted()) |
213
|
|
|
{ |
214
|
|
|
return Promise::doResolve($this); |
215
|
|
|
} |
216
|
|
|
return new Promise(function($resolve, $reject) { |
|
|
|
|
217
|
|
|
$this |
218
|
|
|
->doCommand(new QuitCommand($this)) |
219
|
|
|
->on('success', function() use($resolve) { |
220
|
|
|
$this->state = self::STATE_DISCONNECT_SUCCEEDED; |
221
|
|
|
$this->emit('stop', [ $this ]); |
222
|
|
|
$resolve($this); |
223
|
|
|
}); |
224
|
|
|
$this->state = self::STATE_DISCONNECT_PENDING; |
225
|
|
|
}); |
226
|
|
|
} |
227
|
|
|
|
228
|
|
|
/** |
229
|
|
|
* @override |
230
|
|
|
* @inheritDoc |
231
|
|
|
*/ |
232
|
|
|
public function getState() |
233
|
|
|
{ |
234
|
|
|
return $this->state; |
235
|
|
|
} |
236
|
|
|
|
237
|
|
|
/** |
238
|
|
|
* @override |
239
|
|
|
* @inheritDoc |
240
|
|
|
*/ |
241
|
|
|
public function getInfo() |
242
|
|
|
{ |
243
|
|
|
return $this->serverInfo; |
244
|
|
|
} |
245
|
|
|
|
246
|
|
|
/** |
247
|
|
|
* @override |
248
|
|
|
* @inheritDoc |
249
|
|
|
*/ |
250
|
|
|
public function setDatabase($dbname) |
251
|
|
|
{ |
252
|
|
|
return $this->query(sprintf('USE `%s`', $dbname)); |
253
|
|
|
} |
254
|
|
|
|
255
|
|
|
/** |
256
|
|
|
* @override |
257
|
|
|
* @inheritDoc |
258
|
|
|
*/ |
259
|
|
|
public function getDatabase() |
260
|
|
|
{ |
261
|
|
|
// TODO |
262
|
|
|
} |
263
|
|
|
|
264
|
|
|
/** |
265
|
|
|
* @override |
266
|
|
|
* @inheritDoc |
267
|
|
|
*/ |
268
|
|
|
public function query($sql, $sqlParams = []) |
269
|
|
|
{ |
270
|
|
|
$promise = new Promise(); |
271
|
|
|
$query = new Query($sql, $sqlParams); |
272
|
|
|
$command = new QueryCommand($this, $query); |
273
|
|
|
|
274
|
|
|
$this->doCommand($command); |
275
|
|
|
|
276
|
|
|
$command->on('error', function ($command, $err) use ($promise) { |
277
|
|
|
return $promise->reject($err); |
278
|
|
|
}); |
279
|
|
|
$command->on('success', function ($command) use ($promise) { |
280
|
|
|
return $promise->resolve($command); |
281
|
|
|
}); |
282
|
|
|
|
283
|
|
|
return $promise; |
284
|
|
|
} |
285
|
|
|
|
286
|
|
|
/** |
287
|
|
|
* @override |
288
|
|
|
* @inheritDoc |
289
|
|
|
*/ |
290
|
|
|
public function execute($sql, $sqlParams = []) |
291
|
|
|
{ |
292
|
|
|
return $this->query($sql, $sqlParams)->then(function($command) { |
293
|
|
|
return $command->affectedRows; |
294
|
|
|
}); |
295
|
|
|
} |
296
|
|
|
|
297
|
|
|
/** |
298
|
|
|
* @override |
299
|
|
|
* @inheritDoc |
300
|
|
|
*/ |
301
|
|
|
public function ping() |
302
|
|
|
{ |
303
|
|
|
$promise = new Promise(); |
304
|
|
|
|
305
|
|
|
$command = $this->doCommand(new PingCommand($this)); |
306
|
|
|
$command->on('error', function ($command, $reason) use ($promise) { |
307
|
|
|
return $promise->reject($reason); |
308
|
|
|
}); |
309
|
|
|
$command->on('success', function () use ($promise) { |
310
|
|
|
return $promise->resolve(); |
311
|
|
|
}); |
312
|
|
|
|
313
|
|
|
return $promise; |
314
|
|
|
} |
315
|
|
|
|
316
|
|
|
/** |
317
|
|
|
* @override |
318
|
|
|
* @inheritDoc |
319
|
|
|
*/ |
320
|
|
|
public function beginTransaction() |
321
|
|
|
{ |
322
|
|
|
return $this->trans->enqueue(new Transaction($this)); |
323
|
|
|
} |
324
|
|
|
|
325
|
|
|
/** |
326
|
|
|
* @override |
327
|
|
|
* @inheritDoc |
328
|
|
|
*/ |
329
|
|
|
public function endTransaction(TransactionInterface $trans) |
330
|
|
|
{ |
331
|
|
|
// TODO |
332
|
|
|
} |
333
|
|
|
|
334
|
|
|
/** |
335
|
|
|
* @override |
336
|
|
|
* @inheritDoc |
337
|
|
|
*/ |
338
|
|
|
public function inTransaction() |
339
|
|
|
{ |
340
|
|
|
return !$this->trans->isEmpty(); |
341
|
|
|
} |
342
|
|
|
|
343
|
|
|
/** |
344
|
|
|
* @internal |
345
|
|
|
*/ |
346
|
|
|
public function handleError($err) |
347
|
|
|
{ |
348
|
|
|
$this->emit('error', [ $this, $err ]); |
349
|
|
|
} |
350
|
|
|
|
351
|
|
|
/** |
352
|
|
|
* @internal |
353
|
|
|
*/ |
354
|
|
|
public function handleSocketError($socket, $err) |
355
|
|
|
{ |
356
|
|
|
$this->emit('error', [ $this, $err ]); |
357
|
|
|
} |
358
|
|
|
|
359
|
|
|
/** |
360
|
|
|
* @internal |
361
|
|
|
*/ |
362
|
|
|
public function handleSocketClose() |
363
|
|
|
{ |
364
|
|
|
if ($this->state < self::STATE_DISCONNECT_PENDING) |
365
|
|
|
{ |
366
|
|
|
$this->state = self::STATE_DISCONNECT_SUCCEEDED; |
367
|
|
|
$this->emit('error', [ $this, new RuntimeException('MySQL server has gone away!') ]); |
368
|
|
|
} |
369
|
|
|
} |
370
|
|
|
|
371
|
|
|
/** |
372
|
|
|
* Do auth command. |
373
|
|
|
* |
374
|
|
|
* @param CommandInterface $command |
375
|
|
|
* @return CommandInterface |
376
|
|
|
* @throws ExecutionException |
377
|
|
|
*/ |
378
|
|
|
protected function doAuth(CommandInterface $command) |
379
|
|
|
{ |
380
|
|
|
if ($command->equals(Command::INIT_AUTHENTICATE)) |
381
|
|
|
{ |
382
|
|
|
return $this->queue->unshift($command); |
|
|
|
|
383
|
|
|
} |
384
|
|
|
throw new ExecutionException("Cann't send command"); |
385
|
|
|
} |
386
|
|
|
|
387
|
|
|
/** |
388
|
|
|
* Do command. |
389
|
|
|
* |
390
|
|
|
* @param CommandInterface $command |
391
|
|
|
* @return CommandInterface |
392
|
|
|
* @throws ExecutionException |
393
|
|
|
*/ |
394
|
|
|
protected function doCommand(CommandInterface $command) |
395
|
|
|
{ |
396
|
|
|
if ($this->state >= self::STATE_CONNECT_PENDING && $this->state <= self::STATE_AUTH_SUCCEEDED) |
397
|
|
|
{ |
398
|
|
|
return $this->queue->enqueue($command); |
399
|
|
|
} |
400
|
|
|
throw new ExecutionException("Cann't send command"); |
401
|
|
|
} |
402
|
|
|
|
403
|
|
|
/** |
404
|
|
|
* Connect to the database endpoint. |
405
|
|
|
* |
406
|
|
|
* @return PromiseInterface |
407
|
|
|
*/ |
408
|
|
|
protected function connect() |
409
|
|
|
{ |
410
|
|
|
return Promise::doResolve( |
411
|
|
|
new Socket($this->config['endpoint'], $this->getLoop()) |
412
|
|
|
); |
413
|
|
|
} |
414
|
|
|
|
415
|
|
|
/** |
416
|
|
|
* Create Queue. |
417
|
|
|
* |
418
|
|
|
* @return Queue|QueueInterface |
419
|
|
|
*/ |
420
|
|
|
protected function createQueue() |
421
|
|
|
{ |
422
|
|
|
return new Queue(); |
423
|
|
|
} |
424
|
|
|
|
425
|
|
|
/** |
426
|
|
|
* Create transaction box. |
427
|
|
|
* |
428
|
|
|
* @return TransactionBoxInterface |
429
|
|
|
*/ |
430
|
|
|
protected function createTransactionBox() |
431
|
|
|
{ |
432
|
|
|
return new TransactionBox(); |
433
|
|
|
} |
434
|
|
|
|
435
|
|
|
/** |
436
|
|
|
* Create configuration file. |
437
|
|
|
* |
438
|
|
|
* @param mixed[] $config |
439
|
|
|
* @return mixed[] |
440
|
|
|
*/ |
441
|
|
|
protected function createConfig($config = []) |
442
|
|
|
{ |
443
|
|
|
$default = [ |
444
|
|
|
'endpoint' => 'tcp://127.0.0.1:3306', |
445
|
|
|
'user' => 'root', |
446
|
|
|
'pass' => '', |
447
|
|
|
'dbname' => '', |
448
|
|
|
]; |
449
|
|
|
return array_merge($default, $config); |
450
|
|
|
} |
451
|
|
|
} |
452
|
|
|
|
Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.
The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.
This check looks for comments that seem to be mostly valid code and reports them.