1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Dazzle\MySQL; |
4
|
|
|
|
5
|
|
|
use Dazzle\Event\BaseEventEmitter; |
6
|
|
|
use Dazzle\Loop\LoopAwareTrait; |
7
|
|
|
use Dazzle\Loop\LoopInterface; |
8
|
|
|
use Dazzle\MySQL\Command\Command; |
9
|
|
|
use Dazzle\MySQL\Command\CommandInterface; |
10
|
|
|
use Dazzle\MySQL\Command\Concrete\AuthCommand; |
11
|
|
|
use Dazzle\MySQL\Command\Concrete\PingCommand; |
12
|
|
|
use Dazzle\MySQL\Command\Concrete\QueryCommand; |
13
|
|
|
use Dazzle\MySQL\Command\Concrete\QuitCommand; |
14
|
|
|
use Dazzle\MySQL\Protocol\ProtocolParser; |
15
|
|
|
use Dazzle\Promise\Promise; |
16
|
|
|
use Dazzle\Promise\PromiseInterface; |
17
|
|
|
use Dazzle\Socket\Socket; |
18
|
|
|
use Dazzle\Socket\SocketInterface; |
19
|
|
|
use Exception; |
20
|
|
|
use InvalidArgumentException; |
21
|
|
|
use RuntimeException; |
22
|
|
|
|
23
|
|
|
class Database extends BaseEventEmitter implements DatabaseInterface |
24
|
|
|
{ |
25
|
|
|
use LoopAwareTrait; |
26
|
|
|
|
27
|
|
|
const STATE_INIT = 0; |
28
|
|
|
const STATE_CONNECT_PENDING = 3; |
29
|
|
|
const STATE_CONNECT_FAILED = 1; |
30
|
|
|
const STATE_CONNECT_SUCCEEDED = 4; |
31
|
|
|
const STATE_AUTH_PENDING = 8; |
32
|
|
|
const STATE_AUTH_FAILED = 2; |
33
|
|
|
const STATE_AUTH_SUCCEEDED = 5; |
34
|
|
|
const STATE_CLOSEING = 6; |
35
|
|
|
const STATE_STOPPED = 7; |
36
|
|
|
|
37
|
|
|
protected $config; |
38
|
|
|
|
39
|
|
|
protected $serverOptions; |
40
|
|
|
|
41
|
|
|
protected $executor; |
42
|
|
|
|
43
|
|
|
protected $state = self::STATE_INIT; |
44
|
|
|
|
45
|
|
|
protected $stream; |
46
|
|
|
|
47
|
|
|
protected $parser; |
48
|
|
|
|
49
|
|
|
/** |
50
|
|
|
* @param LoopInterface $loop |
51
|
|
|
* @param mixed[] $config |
52
|
|
|
*/ |
53
|
|
|
public function __construct(LoopInterface $loop, $config = []) |
54
|
|
|
{ |
55
|
|
|
$this->loop = $loop; |
56
|
|
|
$this->executor = $this->createExecutor(); |
57
|
|
|
$this->config = $this->createConfig($config); |
58
|
|
|
} |
59
|
|
|
|
60
|
|
|
/** |
61
|
|
|
* @override |
62
|
|
|
* @inheritDoc |
63
|
|
|
*/ |
64
|
|
|
public function isPaused() |
65
|
|
|
{ |
66
|
|
|
// TODO |
67
|
|
|
} |
68
|
|
|
|
69
|
|
|
/** |
70
|
|
|
* @override |
71
|
|
|
* @inheritDoc |
72
|
|
|
*/ |
73
|
|
|
public function pause() |
74
|
|
|
{ |
75
|
|
|
// TODO |
76
|
|
|
} |
77
|
|
|
|
78
|
|
|
/** |
79
|
|
|
* @override |
80
|
|
|
* @inheritDoc |
81
|
|
|
*/ |
82
|
|
|
public function resume() |
83
|
|
|
{ |
84
|
|
|
// TODO |
85
|
|
|
} |
86
|
|
|
|
87
|
|
|
/** |
88
|
|
|
* @override |
89
|
|
|
* @inheritDoc |
90
|
|
|
*/ |
91
|
|
|
public function isStarted() |
92
|
|
|
{ |
93
|
|
|
// TODO |
94
|
|
|
} |
95
|
|
|
|
96
|
|
|
/** |
97
|
|
|
* @override |
98
|
|
|
* @inheritDoc |
99
|
|
|
*/ |
100
|
|
|
public function start() |
101
|
|
|
{ |
102
|
|
|
return new Promise(function($resolve, $reject) { |
103
|
|
|
|
104
|
|
|
$this->state = self::STATE_CONNECT_PENDING; |
105
|
|
|
$options = $this->config; |
106
|
|
|
$streamRef = $this->stream; |
107
|
|
|
|
108
|
|
|
$errorHandler = function ($reason) use ($reject) { |
109
|
|
|
$this->state = self::STATE_AUTH_FAILED; |
110
|
|
|
return $reject($reason); |
111
|
|
|
}; |
112
|
|
|
|
113
|
|
|
$connectedHandler = function ($serverOptions) use ($resolve) { |
114
|
|
|
$this->state = self::STATE_AUTH_SUCCEEDED; |
115
|
|
|
$this->serverOptions = $serverOptions; |
116
|
|
|
return $resolve($serverOptions); |
117
|
|
|
}; |
118
|
|
|
|
119
|
|
|
$this |
120
|
|
|
->connect() |
121
|
|
|
->then(function ($stream) use (&$streamRef, $options, $errorHandler, $connectedHandler) { |
122
|
|
|
$streamRef = $stream; |
123
|
|
|
|
124
|
|
|
$stream->on('error', [ $this, 'handleConnectionError' ]); |
125
|
|
|
$stream->on('close', [ $this, 'handleConnectionClosed' ]); |
126
|
|
|
|
127
|
|
|
$parser = $this->parser = new ProtocolParser($stream, $this->executor); |
128
|
|
|
|
129
|
|
|
$parser->setOptions($options); |
130
|
|
|
|
131
|
|
|
$command = $this->doCommand(new AuthCommand($this)); |
132
|
|
|
$command->on('authenticated', $connectedHandler); |
133
|
|
|
$command->on('error', $errorHandler); |
134
|
|
|
|
135
|
|
|
//$parser->on('close', $closeHandler); |
|
|
|
|
136
|
|
|
$parser->start(); |
137
|
|
|
|
138
|
|
|
}, [ $this, 'handleConnectionError' ]); |
139
|
|
|
}); |
140
|
|
|
} |
141
|
|
|
|
142
|
|
|
/** |
143
|
|
|
* @override |
144
|
|
|
* @inheritDoc |
145
|
|
|
*/ |
146
|
|
|
public function stop() |
147
|
|
|
{ |
148
|
|
|
return new Promise(function($resolve, $reject) { |
|
|
|
|
149
|
|
|
$this |
150
|
|
|
->doCommand(new QuitCommand($this)) |
151
|
|
|
->on('success', function() use($resolve) { |
152
|
|
|
$this->state = self::STATE_STOPPED; |
153
|
|
|
$this->emit('end', [ $this ]); |
154
|
|
|
$this->emit('close', [ $this ]); |
155
|
|
|
$resolve($this); |
156
|
|
|
}); |
157
|
|
|
$this->state = self::STATE_CLOSEING; |
158
|
|
|
}); |
159
|
|
|
} |
160
|
|
|
|
161
|
|
|
/** |
162
|
|
|
* Do a async query. |
163
|
|
|
* |
164
|
|
|
* @param string $sql |
165
|
|
|
* @param mixed[] $sqlParams |
166
|
|
|
* @return PromiseInterface |
167
|
|
|
*/ |
168
|
|
|
public function query($sql, $sqlParams = []) |
169
|
|
|
{ |
170
|
|
|
$promise = new Promise(); |
171
|
|
|
$query = new Query($sql); |
172
|
|
|
$command = new QueryCommand($this); |
173
|
|
|
$command->setQuery($query); |
174
|
|
|
|
175
|
|
|
$query->bindParamsFromArray($sqlParams); |
176
|
|
|
$this->doCommand($command); |
177
|
|
|
|
178
|
|
|
$command->on('results', function ($rows, $command) use ($promise) { |
179
|
|
|
return $promise->resolve($command); |
180
|
|
|
}); |
181
|
|
|
$command->on('error', function ($err, $command) use ($promise) { |
|
|
|
|
182
|
|
|
return $promise->reject($err); |
183
|
|
|
}); |
184
|
|
|
$command->on('success', function ($command) use ($promise) { |
185
|
|
|
return $promise->resolve($command); |
186
|
|
|
}); |
187
|
|
|
|
188
|
|
|
return $promise; |
189
|
|
|
} |
190
|
|
|
|
191
|
|
|
public function ping() |
192
|
|
|
{ |
193
|
|
|
$promise = new Promise(); |
194
|
|
|
|
195
|
|
|
$this->doCommand(new PingCommand($this)) |
196
|
|
|
->on('error', function ($reason) use ($promise) { |
197
|
|
|
return $promise->reject($reason); |
198
|
|
|
}) |
199
|
|
|
->on('success', function () use ($promise) { |
200
|
|
|
return $promise->resolve(); |
201
|
|
|
}); |
202
|
|
|
} |
203
|
|
|
|
204
|
|
|
public function selectDB($dbname) |
205
|
|
|
{ |
206
|
|
|
return $this->query(sprintf('USE `%s`', $dbname)); |
207
|
|
|
} |
208
|
|
|
|
209
|
|
|
public function setOption($name, $value) |
210
|
|
|
{ |
211
|
|
|
$this->config[$name] = $value; |
212
|
|
|
|
213
|
|
|
return $this; |
214
|
|
|
} |
215
|
|
|
|
216
|
|
|
public function getOption($name, $default = null) |
217
|
|
|
{ |
218
|
|
|
if (isset($this->config[$name])) |
219
|
|
|
{ |
220
|
|
|
return $this->config[$name]; |
221
|
|
|
} |
222
|
|
|
|
223
|
|
|
return $default; |
224
|
|
|
} |
225
|
|
|
|
226
|
|
|
public function getState() |
227
|
|
|
{ |
228
|
|
|
return $this->state; |
229
|
|
|
} |
230
|
|
|
|
231
|
|
|
public function handleConnectionError($err) |
232
|
|
|
{ |
233
|
|
|
$this->emit('error', [ $err, $this ]); |
234
|
|
|
} |
235
|
|
|
|
236
|
|
|
public function handleConnectionClosed() |
237
|
|
|
{ |
238
|
|
|
if ($this->state < self::STATE_CLOSEING) |
239
|
|
|
{ |
240
|
|
|
$this->state = self::STATE_STOPPED; |
241
|
|
|
$this->emit('error', [ new RuntimeException('mysql server has gone away'), $this ]); |
242
|
|
|
} |
243
|
|
|
} |
244
|
|
|
|
245
|
|
|
protected function doCommand(CommandInterface $command) |
246
|
|
|
{ |
247
|
|
|
if ($command->equals(Command::INIT_AUTHENTICATE)) |
248
|
|
|
{ |
249
|
|
|
return $this->executor->undequeue($command); |
250
|
|
|
} |
251
|
|
|
elseif ($this->state >= self::STATE_CONNECT_PENDING && $this->state <= self::STATE_AUTH_SUCCEEDED) |
252
|
|
|
{ |
253
|
|
|
return $this->executor->enqueue($command); |
254
|
|
|
} |
255
|
|
|
else |
256
|
|
|
{ |
257
|
|
|
throw new Exception("Cann't send command"); |
258
|
|
|
} |
259
|
|
|
} |
260
|
|
|
|
261
|
|
|
public function getServerOptions() |
262
|
|
|
{ |
263
|
|
|
return $this->serverOptions; |
264
|
|
|
} |
265
|
|
|
|
266
|
|
|
protected function connect() |
267
|
|
|
{ |
268
|
|
|
$socket = new Socket($this->config['endpoint'], $this->getLoop()); |
269
|
|
|
return Promise::doResolve($socket); |
270
|
|
|
} |
271
|
|
|
|
272
|
|
|
/** |
273
|
|
|
* Create executor. |
274
|
|
|
* |
275
|
|
|
* @return Executor |
276
|
|
|
*/ |
277
|
|
|
protected function createExecutor() |
278
|
|
|
{ |
279
|
|
|
return new Executor($this); |
280
|
|
|
} |
281
|
|
|
|
282
|
|
|
/** |
283
|
|
|
* Create configuration file. |
284
|
|
|
* |
285
|
|
|
* @param mixed[] $config |
286
|
|
|
* @return mixed[] |
287
|
|
|
*/ |
288
|
|
|
protected function createConfig($config = []) |
289
|
|
|
{ |
290
|
|
|
$default = [ |
291
|
|
|
'endpoint' => 'tcp://127.0.0.1:3306', |
292
|
|
|
'user' => 'root', |
293
|
|
|
'pass' => '', |
294
|
|
|
'dbname' => '', |
295
|
|
|
]; |
296
|
|
|
return array_merge($default, $config); |
297
|
|
|
} |
298
|
|
|
} |
299
|
|
|
|
Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.
The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.
This check looks for comments that seem to be mostly valid code and reports them.