1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Dazzle\MySQL; |
4
|
|
|
|
5
|
|
|
use Dazzle\Event\BaseEventEmitter; |
6
|
|
|
use Dazzle\Loop\LoopAwareTrait; |
7
|
|
|
use Dazzle\Loop\LoopInterface; |
8
|
|
|
use Dazzle\MySQL\Command\Command; |
9
|
|
|
use Dazzle\MySQL\Command\CommandInterface; |
10
|
|
|
use Dazzle\MySQL\Command\Concrete\AuthCommand; |
11
|
|
|
use Dazzle\MySQL\Command\Concrete\PingCommand; |
12
|
|
|
use Dazzle\MySQL\Command\Concrete\QueryCommand; |
13
|
|
|
use Dazzle\MySQL\Command\Concrete\QuitCommand; |
14
|
|
|
use Dazzle\MySQL\Protocol\ProtocolParser; |
15
|
|
|
use Dazzle\Promise\Promise; |
16
|
|
|
use Dazzle\Promise\PromiseInterface; |
17
|
|
|
use Dazzle\Socket\Socket; |
18
|
|
|
use Dazzle\Socket\SocketInterface; |
19
|
|
|
use Exception; |
20
|
|
|
use InvalidArgumentException; |
21
|
|
|
use RuntimeException; |
22
|
|
|
|
23
|
|
|
class Database extends BaseEventEmitter implements DatabaseInterface |
24
|
|
|
{ |
25
|
|
|
use LoopAwareTrait; |
26
|
|
|
|
27
|
|
|
const STATE_INIT = 0; |
28
|
|
|
const STATE_CONNECT_PENDING = 3; |
29
|
|
|
const STATE_CONNECT_FAILED = 1; |
30
|
|
|
const STATE_CONNECT_SUCCEEDED = 4; |
31
|
|
|
const STATE_AUTH_PENDING = 8; |
32
|
|
|
const STATE_AUTH_FAILED = 2; |
33
|
|
|
const STATE_AUTH_SUCCEEDED = 5; |
34
|
|
|
const STATE_CLOSEING = 6; |
35
|
|
|
const STATE_STOPPED = 7; |
36
|
|
|
|
37
|
|
|
protected $config; |
38
|
|
|
|
39
|
|
|
protected $serverOptions; |
40
|
|
|
|
41
|
|
|
protected $executor; |
42
|
|
|
|
43
|
|
|
protected $state = self::STATE_INIT; |
44
|
|
|
|
45
|
|
|
protected $stream; |
46
|
|
|
|
47
|
|
|
protected $parser; |
48
|
|
|
|
49
|
|
|
/** |
50
|
|
|
* @param LoopInterface $loop |
51
|
|
|
* @param mixed[] $config |
52
|
|
|
*/ |
53
|
|
|
public function __construct(LoopInterface $loop, $config = []) |
54
|
|
|
{ |
55
|
|
|
$this->loop = $loop; |
56
|
|
|
$this->executor = $this->createExecutor(); |
57
|
|
|
$this->config = $this->createConfig($config); |
58
|
|
|
} |
59
|
|
|
|
60
|
|
|
/** |
61
|
|
|
* @override |
62
|
|
|
* @inheritDoc |
63
|
|
|
*/ |
64
|
|
|
public function isPaused() |
65
|
|
|
{ |
66
|
|
|
// TODO |
67
|
|
|
} |
68
|
|
|
|
69
|
|
|
/** |
70
|
|
|
* @override |
71
|
|
|
* @inheritDoc |
72
|
|
|
*/ |
73
|
|
|
public function pause() |
74
|
|
|
{ |
75
|
|
|
// TODO |
76
|
|
|
} |
77
|
|
|
|
78
|
|
|
/** |
79
|
|
|
* @override |
80
|
|
|
* @inheritDoc |
81
|
|
|
*/ |
82
|
|
|
public function resume() |
83
|
|
|
{ |
84
|
|
|
// TODO |
85
|
|
|
} |
86
|
|
|
|
87
|
|
|
/** |
88
|
|
|
* @override |
89
|
|
|
* @inheritDoc |
90
|
|
|
*/ |
91
|
|
|
public function isStarted() |
92
|
|
|
{ |
93
|
|
|
// TODO |
94
|
|
|
} |
95
|
|
|
|
96
|
|
|
/** |
97
|
|
|
* @override |
98
|
|
|
* @inheritDoc |
99
|
|
|
*/ |
100
|
|
|
public function start() |
101
|
|
|
{ |
102
|
|
|
$this->state = self::STATE_CONNECT_PENDING; |
103
|
|
|
$options = $this->config; |
104
|
|
|
$streamRef = $this->stream; |
105
|
|
|
$args = func_get_args(); |
106
|
|
|
|
107
|
|
|
if (!$args) { |
|
|
|
|
108
|
|
|
throw new Exception('Not Implemented'); |
109
|
|
|
} |
110
|
|
|
|
111
|
|
|
$errorHandler = function ($reason) use ($args) { |
112
|
|
|
$this->state = self::STATE_AUTH_FAILED; |
113
|
|
|
$args[0]($reason, $this); |
114
|
|
|
}; |
115
|
|
|
|
116
|
|
|
$connectedHandler = function ($serverOptions) use ($args) { |
117
|
|
|
$this->state = self::STATE_AUTH_SUCCEEDED; |
118
|
|
|
$this->serverOptions = $serverOptions; |
119
|
|
|
$args[0](null, $this); |
120
|
|
|
}; |
121
|
|
|
|
122
|
|
|
$this |
123
|
|
|
->connect() |
124
|
|
|
->then(function ($stream) use (&$streamRef, $options, $errorHandler, $connectedHandler) { |
125
|
|
|
$streamRef = $stream; |
126
|
|
|
|
127
|
|
|
$stream->on('error', [ $this, 'handleConnectionError' ]); |
128
|
|
|
$stream->on('close', [ $this, 'handleConnectionClosed' ]); |
129
|
|
|
|
130
|
|
|
$parser = $this->parser = new ProtocolParser($stream, $this->executor); |
131
|
|
|
|
132
|
|
|
$parser->setOptions($options); |
133
|
|
|
|
134
|
|
|
$command = $this->doCommand(new AuthCommand($this)); |
135
|
|
|
$command->on('authenticated', $connectedHandler); |
136
|
|
|
$command->on('error', $errorHandler); |
137
|
|
|
|
138
|
|
|
//$parser->on('close', $closeHandler); |
|
|
|
|
139
|
|
|
$parser->start(); |
140
|
|
|
|
141
|
|
|
}, [ $this, 'handleConnectionError' ]); |
142
|
|
|
} |
143
|
|
|
|
144
|
|
|
/** |
145
|
|
|
* @override |
146
|
|
|
* @inheritDoc |
147
|
|
|
*/ |
148
|
|
|
public function stop() |
149
|
|
|
{ |
150
|
|
|
$this |
151
|
|
|
->doCommand(new QuitCommand($this)) |
152
|
|
|
->on('success', function() { |
153
|
|
|
$this->state = self::STATE_STOPPED; |
154
|
|
|
$this->emit('end', [ $this ]); |
155
|
|
|
$this->emit('close', [ $this ]); |
156
|
|
|
}); |
157
|
|
|
$this->state = self::STATE_CLOSEING; |
158
|
|
|
} |
159
|
|
|
|
160
|
|
|
/** |
161
|
|
|
* Do a async query. |
162
|
|
|
*/ |
163
|
|
|
public function query() |
164
|
|
|
{ |
165
|
|
|
$numArgs = func_num_args(); |
166
|
|
|
|
167
|
|
|
if ($numArgs === 0) |
168
|
|
|
{ |
169
|
|
|
throw new InvalidArgumentException('Required at least 1 argument'); |
170
|
|
|
} |
171
|
|
|
|
172
|
|
|
$args = func_get_args(); |
173
|
|
|
$query = new Query(array_shift($args)); |
174
|
|
|
|
175
|
|
|
$callback = array_pop($args); |
176
|
|
|
|
177
|
|
|
$command = new QueryCommand($this); |
178
|
|
|
$command->setQuery($query); |
179
|
|
|
|
180
|
|
|
if (!is_callable($callback)) |
181
|
|
|
{ |
182
|
|
|
if ($numArgs > 1) |
183
|
|
|
{ |
184
|
|
|
$args[] = $callback; |
185
|
|
|
} |
186
|
|
|
$query->bindParamsFromArray($args); |
187
|
|
|
|
188
|
|
|
return $this->doCommand($command); |
189
|
|
|
} |
190
|
|
|
|
191
|
|
|
$query->bindParamsFromArray($args); |
192
|
|
|
$this->doCommand($command); |
193
|
|
|
|
194
|
|
|
$command->on('results', function ($rows, $command) use ($callback) { |
195
|
|
|
$callback($command, $this); |
196
|
|
|
}); |
197
|
|
|
$command->on('error', function ($err, $command) use ($callback) { |
198
|
|
|
$callback($command, $this); |
199
|
|
|
}); |
200
|
|
|
$command->on('success', function ($command) use ($callback) { |
201
|
|
|
$callback($command, $this); |
202
|
|
|
}); |
203
|
|
|
} |
204
|
|
|
|
205
|
|
|
public function ping($callback) |
206
|
|
|
{ |
207
|
|
|
if (!is_callable($callback)) |
208
|
|
|
{ |
209
|
|
|
throw new InvalidArgumentException('Callback is not a valid callable'); |
210
|
|
|
} |
211
|
|
|
$this->doCommand(new PingCommand($this)) |
212
|
|
|
->on('error', function ($reason) use ($callback) { |
213
|
|
|
$callback($reason, $this); |
214
|
|
|
}) |
215
|
|
|
->on('success', function () use ($callback) { |
216
|
|
|
$callback(null, $this); |
217
|
|
|
}); |
218
|
|
|
} |
219
|
|
|
|
220
|
|
|
public function selectDB($dbname) |
221
|
|
|
{ |
222
|
|
|
return $this->query(sprintf('USE `%s`', $dbname)); |
223
|
|
|
} |
224
|
|
|
|
225
|
|
|
public function setOption($name, $value) |
226
|
|
|
{ |
227
|
|
|
$this->config[$name] = $value; |
228
|
|
|
|
229
|
|
|
return $this; |
230
|
|
|
} |
231
|
|
|
|
232
|
|
|
public function getOption($name, $default = null) |
233
|
|
|
{ |
234
|
|
|
if (isset($this->config[$name])) |
235
|
|
|
{ |
236
|
|
|
return $this->config[$name]; |
237
|
|
|
} |
238
|
|
|
|
239
|
|
|
return $default; |
240
|
|
|
} |
241
|
|
|
|
242
|
|
|
public function getState() |
243
|
|
|
{ |
244
|
|
|
return $this->state; |
245
|
|
|
} |
246
|
|
|
|
247
|
|
|
public function handleConnectionError($err) |
248
|
|
|
{ |
249
|
|
|
$this->emit('error', [ $err, $this ]); |
250
|
|
|
} |
251
|
|
|
|
252
|
|
|
public function handleConnectionClosed() |
253
|
|
|
{ |
254
|
|
|
if ($this->state < self::STATE_CLOSEING) |
255
|
|
|
{ |
256
|
|
|
$this->state = self::STATE_STOPPED; |
257
|
|
|
$this->emit('error', [ new RuntimeException('mysql server has gone away'), $this ]); |
258
|
|
|
} |
259
|
|
|
} |
260
|
|
|
|
261
|
|
|
protected function doCommand(CommandInterface $command) |
262
|
|
|
{ |
263
|
|
|
if ($command->equals(Command::INIT_AUTHENTICATE)) |
264
|
|
|
{ |
265
|
|
|
return $this->executor->undequeue($command); |
266
|
|
|
} |
267
|
|
|
elseif ($this->state >= self::STATE_CONNECT_PENDING && $this->state <= self::STATE_AUTH_SUCCEEDED) |
268
|
|
|
{ |
269
|
|
|
return $this->executor->enqueue($command); |
270
|
|
|
} |
271
|
|
|
else |
272
|
|
|
{ |
273
|
|
|
throw new Exception("Cann't send command"); |
274
|
|
|
} |
275
|
|
|
} |
276
|
|
|
|
277
|
|
|
public function getServerOptions() |
278
|
|
|
{ |
279
|
|
|
return $this->serverOptions; |
280
|
|
|
} |
281
|
|
|
|
282
|
|
|
protected function connect() |
283
|
|
|
{ |
284
|
|
|
$socket = new Socket($this->config['endpoint'], $this->getLoop()); |
285
|
|
|
return Promise::doResolve($socket); |
286
|
|
|
} |
287
|
|
|
|
288
|
|
|
/** |
289
|
|
|
* Create executor. |
290
|
|
|
* |
291
|
|
|
* @return Executor |
292
|
|
|
*/ |
293
|
|
|
protected function createExecutor() |
294
|
|
|
{ |
295
|
|
|
return new Executor($this); |
296
|
|
|
} |
297
|
|
|
|
298
|
|
|
/** |
299
|
|
|
* Create configuration file. |
300
|
|
|
* |
301
|
|
|
* @param mixed[] $config |
302
|
|
|
* @return mixed[] |
303
|
|
|
*/ |
304
|
|
|
protected function createConfig($config = []) |
305
|
|
|
{ |
306
|
|
|
$default = [ |
307
|
|
|
'endpoint' => 'tcp://127.0.0.1:3306', |
308
|
|
|
'user' => 'root', |
309
|
|
|
'pass' => '', |
310
|
|
|
'dbname' => '', |
311
|
|
|
]; |
312
|
|
|
return array_merge($default, $config); |
313
|
|
|
} |
314
|
|
|
} |
315
|
|
|
|
This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.
Consider making the comparison explicit by using
empty(..)
or! empty(...)
instead.