Connection::executeAsync()   A
last analyzed

Complexity

Conditions 2
Paths 1

Size

Total Lines 5
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 6
Bugs 1 Features 4
Metric Value
c 6
b 1
f 4
dl 0
loc 5
rs 9.4286
cc 2
eloc 3
nc 1
nop 4
1
<?php
2
namespace Cassandra;
3
use Cassandra\Protocol\Frame;
4
5
class Connection {
6
7
    /**
8
     * Connection options
9
     * @var array
10
     */
11
    protected $_options = [
12
        'CQL_VERSION' => '3.0.0'
13
    ];
14
15
    /**
16
     * @var string
17
     */
18
    protected $_keyspace;
19
20
    /**
21
     * @var array|\Traversable
22
     */
23
    protected $_nodes;
24
25
    /**
26
     * @var Connection\Socket|Connection\Stream
27
     */
28
    protected $_node;
29
    
30
    /**
31
     * @var int
32
     */
33
    protected $_lastStreamId = 0;
34
    
35
    /**
36
     * 
37
     * @var array
38
     */
39
    protected $_statements = [];
40
    
41
    /**
42
     * 
43
     * @var \SplQueue
44
     */
45
    protected $_recycledStreams;
46
47
    /**
48
     * @var int
49
     */
50
    protected $_consistency = Request\Request::CONSISTENCY_ONE;
51
52
    /**
53
     * @param array|\Traversable $nodes
54
     * @param string $keyspace
55
     * @param array $options
56
     */
57
    public function __construct($nodes, $keyspace = '', array $options = []) {
58
        if (is_array($nodes))
59
            shuffle($nodes);
60
        
61
        $this->_nodes = $nodes;
62
        $this->_options = array_merge($this->_options, $options);
63
        $this->_keyspace = $keyspace;
64
        $this->_recycledStreams = new \SplQueue();
65
    }
66
    
67
    /**
68
     * @throws Exception
69
     */
70
    protected function _connect() {
71
        foreach($this->_nodes as $options){
72
            if (is_string($options)){
73
                if (!preg_match('/^(((tcp|udp|unix|ssl|tls):\/\/)?[\w\.\-]+)(\:(\d+))?/i', $options, $matches))
74
                    throw new Exception('Invalid host: ' . $options);
75
                
76
                $options = [ 'host' => $matches[1],];
77
                
78
                if (!empty($matches[5]))
79
                    $options['port'] = $matches[5];
80
                
81
                // Use Connection\Stream when protocol prefix is defined.
82
                try {
83
                    $this->_node = empty($matches[2]) ? new Connection\Socket($options) : new Connection\Stream($options);
84
                } catch (Exception $e) {
85
                    continue;
86
                }
87
            }
88
            else{
89
                $className = isset($options['class']) ? $options['class'] : 'Cassandra\Connection\Socket';
90
                try {
91
                    $this->_node = new $className($options);
92
                } catch (Exception $e) {
93
                    continue;
94
                }
95
            }
96
            return;
97
        }
98
        
99
        throw new Exception("Unable to connect to all Cassandra nodes.");
100
    }
101
    
102
    /**
103
     * @return bool
104
     */
105
    public function disconnect() {
106
        if ($this->_node === null)
107
            return true;
108
        
109
        return $this->_node->close();
110
    }
111
    
112
    /**
113
     * @return bool
114
     */
115
    public function isConnected() {
116
        return $this->_node !== null;
117
    }
118
    
119
    /**
120
     * 
121
     * @param Response\Event $response
122
     */
123
    public function trigger($response){
0 ignored issues
show
Unused Code introduced by
The parameter $response is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
124
    }
125
    
126
    /**
127
     * 
128
     * @param int $streamId
129
     * @throws Response\Exception
130
     * @return Response\Response
131
     */
132
    public function getResponse($streamId = 0){
133
        do{
134
            $response = $this->_getResponse();
135
        }
136
        while($response->getStream() !== $streamId);
137
        
138
        return $response;
139
    }
140
    
141
    /**
142
     *
143
     * @throws Response\Exception
144
     * @return Response\Response
145
     */
146
    protected function _getResponse() {
147
        $version = unpack('C', $this->_node->read(1))[1];
148
        switch($version) {
149
            case 0x83:
150
                $header = unpack('Cflags/nstream/Copcode/Nlength', $this->_node->read(8));
151
                $body = $header['length'] === 0 ? '' : $this->_node->read($header['length']);
152
                
153
                static $responseClassMap = [
154
                    Frame::OPCODE_ERROR            => 'Cassandra\Response\Error',
155
                    Frame::OPCODE_READY            => 'Cassandra\Response\Ready',
156
                    Frame::OPCODE_AUTHENTICATE    => 'Cassandra\Response\Authenticate',
157
                    Frame::OPCODE_SUPPORTED        => 'Cassandra\Response\Supported',
158
                    Frame::OPCODE_RESULT        => 'Cassandra\Response\Result',
159
                    Frame::OPCODE_EVENT            => 'Cassandra\Response\Event',
160
                    Frame::OPCODE_AUTH_SUCCESS    => 'Cassandra\Response\AuthSuccess',
161
                ];
162
                
163
                if (!isset($responseClassMap[$header['opcode']]))
164
                    throw new Response\Exception('Unknown response');
165
                
166
                $responseClass = $responseClassMap[$header['opcode']];
167
                $response = new $responseClass($header, new Response\StreamReader($body));
168
                
169
                if ($header['stream'] !== 0){
170
                    if (isset($this->_statements[$header['stream']])){
171
                        $this->_statements[$header['stream']]->setResponse($response);
172
                        unset($this->_statements[$header['stream']]);
173
                        $this->_recycledStreams->enqueue($header['stream']);
174
                    }
175
                    elseif ($response instanceof Response\Event){
176
                        $this->trigger($response);
177
                    }
178
                }
179
                
180
                return $response;
181
            default:
182
                throw new Exception('php-cassandra supports CQL binary protocol v3 only, please upgrade your Cassandra to 2.1 or later.');
183
        }
184
    }
185
    
186
    /**
187
     * Wait until all statements received response.
188
     */
189
    public function flush(){
190
        while(!empty($this->_statements)){
191
            $this->_getResponse();
192
        }
193
    }
194
    
195
    /**
196
     * @return Connection\Node
197
     */
198
    public function getNode() {
199
        return $this->_node;
200
    }
201
202
    /**
203
     * Connect to database
204
     * @throws Exception
205
     * @return bool
206
     */
207
    public function connect() {
208
        if ($this->_node !== null)
209
            return true;
210
        
211
        $this->_connect();
212
        
213
        $response = $this->syncRequest(new Request\Startup($this->_options));
214
        
215
        if ($response instanceof Response\Authenticate){
216
            $nodeOptions = $this->_node->getOptions();
217
            
218
            if (empty($nodeOptions['username']) || empty($nodeOptions['password']))
219
                throw new Exception('Username and password are required.');
220
            
221
            $this->syncRequest(new Request\AuthResponse($nodeOptions['username'], $nodeOptions['password']));
222
        }
223
        
224
        if (!empty($this->_keyspace))
225
            $this->syncRequest(new Request\Query("USE {$this->_keyspace};"));
226
        
227
        return true;
228
    }
229
230
    /**
231
     * @param Request\Request $request
232
     * @throws Exception
233
     * @return Response\Response
234
     */
235
    public function syncRequest(Request\Request $request) {
236
        if ($this->_node === null)
237
            $this->connect();
238
        
239
        $this->_node->write($request->__toString());
240
        
241
        $response = $this->getResponse();
242
        
243
        if ($response instanceof Response\Error)
244
            throw $response->getException();
245
        
246
        return $response;
247
    }
248
    
249
    /**
250
     * 
251
     * @param Request\Request $request
252
     * @return Statement
253
     */
254
    public function asyncRequest(Request\Request $request) {
255
        if ($this->_node === null)
256
            $this->connect();
257
        
258
        $streamId = $this->_getNewStreamId();
259
        $request->setStream($streamId);
260
        
261
        $this->_node->write($request->__toString());
262
        
263
        return $this->_statements[$streamId] = new Statement($this, $streamId);
264
    }
265
266
    /**
267
     * 
268
     * @throws Exception
269
     * @return int
270
     */
271
    protected function _getNewStreamId(){
272
        if ($this->_lastStreamId < 32767)
273
            return ++$this->_lastStreamId;
274
        
275
        while ($this->_recycledStreams->isEmpty()){
276
            $this->_getResponse();
277
        }
278
        
279
        return $this->_recycledStreams->dequeue();
280
    }
281
    
282
    /***** Shorthand Methods ******/
283
    /**
284
     * 
285
     * @param string $cql
286
     * @throws Exception
287
     * @return array
288
     */
289
    public function prepare($cql) {
290
        $response = $this->syncRequest(new Request\Prepare($cql));
291
        
292
        return $response->getData();
293
    }
294
    
295
    /**
296
     * 
297
     * @param string $queryId
298
     * @param array $values
299
     * @param int $consistency
300
     * @param array $options
301
     * @throws Exception
302
     * @return Response\Response
303
     */
304
    public function executeSync($queryId, array $values = [], $consistency = null, array $options = []){
305
        $request = new Request\Execute($queryId, $values, $consistency === null ? $this->_consistency : $consistency, $options);
306
        
307
        return $this->syncRequest($request);
308
    }
309
    
310
    /**
311
     * 
312
     * @param string $queryId
313
     * @param array $values
314
     * @param int $consistency
315
     * @param array $options
316
     * @throws Exception
317
     * @return Statement
318
     */
319
    public function executeAsync($queryId, array $values = [], $consistency = null, array $options = []){
320
        $request = new Request\Execute($queryId, $values, $consistency === null ? $this->_consistency : $consistency, $options);
321
        
322
        return $this->asyncRequest($request);
323
    }
324
    
325
    /**
326
     * 
327
     * @param string $cql
328
     * @param array $values
329
     * @param int $consistency
330
     * @param array $options
331
     * @throws Exception
332
     * @return Response\Response
333
     */
334
    public function querySync($cql, array $values = [], $consistency = null, array $options = []){
335
        $request = new Request\Query($cql, $values, $consistency === null ? $this->_consistency : $consistency, $options);
336
337
        return $this->syncRequest($request);
338
    }
339
    
340
    /**
341
     *
342
     * @param string $cql
343
     * @param array $values
344
     * @param int $consistency
345
     * @param array $options
346
     * @throws Exception
347
     * @return Statement
348
     */
349
    public function queryAsync($cql, array $values = [], $consistency = null, array $options = []){
350
        $request = new Request\Query($cql, $values, $consistency === null ? $this->_consistency : $consistency, $options);
351
352
        return $this->asyncRequest($request);
353
    }
354
    
355
    /**
356
     * @param string $keyspace
357
     * @throws Exception
358
     * @return Response\Result
359
     */
360
    public function setKeyspace($keyspace) {
361
        $this->_keyspace = $keyspace;
362
        
363
        if ($this->_node === null)
364
            return;
365
        
366
        return $this->syncRequest(new Request\Query("USE {$this->_keyspace};"));
367
    }
368
    
369
    /**
370
     * @param int  $consistency
371
     */
372
    public function setConsistency($consistency){
373
        $this->_consistency = $consistency;
374
    }
375
}
376