Passed
Branch testing (e0bf91)
by Roman
08:49
created

Connection::insertBulk()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 2
cts 2
cp 1
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 4
crap 1
1
<?php
2
3
namespace lroman242\LaravelCassandra;
4
5
use Cassandra;
6
use Cassandra\BatchStatement;
7
use \lroman242\LaravelCassandra\Exceptions\CassandraNotSupportedException;
8
9
class Connection extends \Illuminate\Database\Connection
10
{
11
    const DEFAULT_PAGE_SIZE = 5000;
12
    
13
    /**
14
     * The Cassandra keyspace
15
     *
16
     * @var string
17
     */
18
    protected $keyspace;
19
20
    /**
21
     * The Cassandra cluster
22
     *
23
     * @var \Cassandra\Cluster
24
     */
25
    protected $cluster;
26
27
    /**
28
     * The Cassandra connection handler.
29
     *
30
     * @var \Cassandra\Session
31
     */
32
    protected $session;
33
34
    /**
35
     * The config
36
     *
37
     * @var array
38
     */
39
    protected $config;
40
41
    /**
42
     * Create a new database connection instance.
43
     *
44
     * @param  array   $config
45
     */
46 48
    public function __construct(array $config)
47
    {
48 48
        $this->config = $config;
49 48
        if (empty($this->config['page_size'])) {
50 48
            $this->config['page_size'] = self::DEFAULT_PAGE_SIZE;
51
        }
52
53
        // Create the connection
54 48
        $this->cluster = $this->createCluster($config);
55
56 48
        if (isset($config['keyspace'])) {
57 48
            $keyspaceName = $config['keyspace'];
58
59 48
            $this->keyspace = $keyspaceName;
60 48
            $this->session = $this->cluster->connect($keyspaceName);
61
        }
62
63 48
        $this->useDefaultPostProcessor();
64
65 48
        $this->useDefaultSchemaGrammar();
66
67 48
        $this->setQueryGrammar($this->getDefaultQueryGrammar());
68 48
    }
69
70
    /**
71
     * Begin a fluent query against a database table.
72
     *
73
     * @param  string  $table
74
     * @return Query\Builder
75
     */
76 5
    public function table($table)
77
    {
78 5
        $processor = $this->getPostProcessor();
79
80 5
        $query = new Query\Builder($this, null, $processor);
0 ignored issues
show
Documentation introduced by
$processor is of type object<Illuminate\Databa...y\Processors\Processor>, but the function expects a null|object<lroman242\La...sandra\Query\Processor>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
81
82 5
        return $query->from($table);
83
    }
84
85
    /**
86
     * return Cassandra cluster.
87
     *
88
     * @return \Cassandra\Cluster
89
     */
90 1
    public function getCassandraCluster()
91
    {
92 1
        return $this->cluster;
93
    }
94
95
    /**
96
     * return Cassandra Session.
97
     *
98
     * @return \Cassandra\Session
99
     */
100 3
    public function getCassandraSession()
101
    {
102 3
        return $this->session;
103
    }
104
105
    /**
106
     * Return the Cassandra keyspace
107
     *
108
     * @return string
109
     */
110 1
    public function getKeyspace()
111
    {
112 1
        return $this->keyspace;
113
    }
114
115
    /**
116
     * Create a new Cassandra cluster object.
117
     *
118
     * @param  array   $config
119
     *
120
     * @return \Cassandra\Cluster
121
     */
122 48
    protected function createCluster(array $config)
123
    {
124 48
        $cluster = Cassandra::cluster();
125
126
        // Authentication
127 48
        if (isset($config['username']) && isset($config['password'])) {
128 48
            $cluster->withCredentials($config['username'], $config['password']);
129
        }
130
131
        // Contact Points/Host
132 48
        if (!empty($config['host'])) {
133 48
            $contactPoints = $config['host'];
134
135 48
            if (is_string($contactPoints)) {
136 48
                $contactPoints = explode(',', $contactPoints);
137
            }
138
139 48
            call_user_func_array([$cluster, 'withContactPoints'], (array) $contactPoints);
140
        }
141
142 48
        if (!empty($config['port'])) {
143 48
            $cluster->withPort((int) $config['port']);
144
        }
145
146 48
        $cluster->withDefaultPageSize(intval(!empty($config['page_size']) ? $config['page_size'] : self::DEFAULT_PAGE_SIZE));
147
148 48
        if (isset($config['consistency']) && in_array($config['consistency'], [
149 48
                Cassandra::CONSISTENCY_ANY, Cassandra::CONSISTENCY_ONE, Cassandra::CONSISTENCY_TWO,
150
                Cassandra::CONSISTENCY_THREE, Cassandra::CONSISTENCY_QUORUM, Cassandra::CONSISTENCY_ALL,
151
                Cassandra::CONSISTENCY_SERIAL, Cassandra::CONSISTENCY_QUORUM, Cassandra::CONSISTENCY_LOCAL_QUORUM,
152
                Cassandra::CONSISTENCY_EACH_QUORUM, Cassandra::CONSISTENCY_LOCAL_SERIAL, Cassandra::CONSISTENCY_LOCAL_ONE,
153
            ])) {
154
155 48
            $cluster->withDefaultConsistency($config['consistency']);
156
        }
157
158 48
        if (!empty($config['timeout'])) {
159 48
            $cluster->withDefaultTimeout(intval($config['timeout']));
160
        }
161
162 48
        if (!empty($config['connect_timeout'])) {
163 48
            $cluster->withConnectTimeout(floatval($config['connect_timeout']));
164
        }
165
166 48
        if (!empty($config['request_timeout'])) {
167 48
            $cluster->withRequestTimeout(floatval($config['request_timeout']));
168
        }
169
170 48
        return $cluster->build();
171
    }
172
173
    /**
174
     * Disconnect from the underlying Cassandra connection.
175
     */
176 3
    public function disconnect()
177
    {
178 3
        $this->session->close();
179 3
        $this->session = null;
180 3
    }
181
182
    /**
183
     * Get the PDO driver name.
184
     *
185
     * @return string
186
     */
187 1
    public function getDriverName()
188
    {
189 1
        return 'cassandra';
190
    }
191
192
    /**
193
     * Run a select statement against the database.
194
     *
195
     * @param  string  $query
196
     * @param  array  $bindings
197
     * @param  bool  $useReadPdo
198
     * @param  array  $customOptions
199
     *
200
     * @return mixed
201
     */
202 48
    public function select($query, $bindings = [], $useReadPdo = true, array $customOptions = [])
203
    {
204 48
        return $this->statement($query, $bindings, $customOptions);
205
    }
206
207
    /**
208
     * Run an bulk insert statement against the database.
209
     *
210
     * @param  array  $queries
211
     * @param  array  $bindings
212
     * @param  int  $type
213
     * @param  array  $customOptions
214
     *
215
     * @return bool
216
     */
217 6
    public function insertBulk($queries = [], $bindings = [], $type = Cassandra::BATCH_LOGGED, array $customOptions = [])
218
    {
219 6
        return $this->batchStatement($queries, $bindings, $type, $customOptions);
220
    }
221
222
    /**
223
     * Execute a group of queries inside a batch statement against the database.
224
     *
225
     * @param  array  $queries
226
     * @param  array  $bindings
227
     * @param  int  $type
228
     * @param  array  $customOptions
229
     *
230
     * @return bool
231
     */
232
    public function batchStatement($queries = [], $bindings = [], $type = Cassandra::BATCH_LOGGED, array $customOptions = [])
233
    {
234 7
        return $this->run($queries, $bindings, function ($queries, $bindings) use ($type, $customOptions) {
0 ignored issues
show
Documentation introduced by
$queries is of type array, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
235 7
            if ($this->pretending()) {
236 1
                return [];
237
            }
238
239 6
            $batch = new BatchStatement($type);
240
241 6
            foreach ($queries as $k => $query) {
242 6
                $preparedStatement = $this->session->prepare($query);
243 6
                $batch->add($preparedStatement, $bindings[$k]);
244
            }
245
246 6
            return $this->session->execute($batch, $customOptions);
247 7
        });
248
    }
249
250
    /**
251
     * Execute an CQL statement and return the boolean result.
252
     *
253
     * @param  string  $query
254
     * @param  array   $bindings
255
     * @param  array  $customOptions
256
     *
257
     * @return mixed
258
     */
259 48
    public function statement($query, $bindings = [], array $customOptions = [])
260
    {
261 48
        return $this->runStatement($query, $bindings, $customOptions);
262
    }
263
264
    /**
265
     * Because Cassandra is an eventually consistent database, it's not possible to obtain
266
     * the affected count for statements so we're just going to return 0, based on the idea
267
     * that if the query fails somehow, an exception will be thrown
268
     *
269
     * @param  string  $query
270
     * @param  array   $bindings
271
     * @param  array  $customOptions
272
     *
273
     * @return int
274
     */
275 10
    public function affectingStatement($query, $bindings = [], array $customOptions = [])
276
    {
277 10
        return $this->runStatement($query, $bindings, $customOptions, 0, 1);
278
    }
279
280
    /**
281
     * @inheritdoc
282
     */
283 48
    protected function getDefaultPostProcessor()
284
    {
285 48
        return new Query\Processor();
286
    }
287
288
    /**
289
     * @inheritdoc
290
     */
291 48
    protected function getDefaultQueryGrammar()
292
    {
293 48
        return new Query\Grammar();
294
    }
295
296
    /**
297
     * @inheritdoc
298
     */
299 48
    protected function getDefaultSchemaGrammar()
300
    {
301
        //return new Schema\Grammar();
302 48
    }
303
304
    /**
305
     * Reconnect to the database if connection is missing.
306
     *
307
     * @return void
308
     */
309 48
    protected function reconnectIfMissingConnection()
310
    {
311 48
        if (is_null($this->session)) {
312 1
            $this->session = $this->createCluster($this->config)->connect($this->keyspace);
313
        }
314 48
    }
315
316
    /**
317
     * Dynamically pass methods to the connection.
318
     *
319
     * @param  string  $method
320
     * @param  array   $parameters
321
     * @return mixed
322
     */
323 1
    public function __call($method, $parameters)
324
    {
325 1
        return call_user_func_array([$this->session, $method], $parameters);
326
    }
327
328
    /**
329
     * Execute an CQL statement and return the boolean result.
330
     *
331
     * @param string $query
332
     * @param array $bindings
333
     * @param array $customOptions
334
     * @param mixed $defaultFailed
335
     * @param mixed $defaultSuccess
336
     *
337
     * @return mixed
338
     */
339
    protected function runStatement($query, $bindings = [], array $customOptions = [], $defaultFailed = [], $defaultSuccess = null)
340
    {
341 48
        return $this->run($query, $bindings, function ($query, $bindings) use ($customOptions, $defaultFailed, $defaultSuccess) {
342 48
            if ($this->pretending()) {
343 2
                return $defaultFailed;
344
            }
345
346 48
            $preparedStatement = $this->session->prepare($query);
347
348
            //Add bindings
349 48
            $customOptions['arguments'] = $bindings;
350
351 48
            $result = $this->session->execute($preparedStatement, $customOptions);
352
353 48
            return $defaultSuccess === null ? $result : $defaultSuccess;
354 48
        });
355
    }
356
357
    /**
358
     * @inheritDoc
359
     */
360 1
    public function transaction(\Closure $callback, $attempts = 1)
361
    {
362 1
        throw new CassandraNotSupportedException("Transactions is not supported by Cassandra database");
363
    }
364
365
    /**
366
     * @inheritDoc
367
     */
368 1
    public function beginTransaction()
369
    {
370 1
        throw new CassandraNotSupportedException("Transactions is not supported by Cassandra database");
371
    }
372
373
    /**
374
     * @inheritDoc
375
     */
376 1
    public function commit()
377
    {
378 1
        throw new CassandraNotSupportedException("Transactions is not supported by Cassandra database");
379
    }
380
381
    /**
382
     * @inheritDoc
383
     */
384 1
    public function rollBack()
385
    {
386 1
        throw new CassandraNotSupportedException("Transactions is not supported by Cassandra database");
387
    }
388
389
    /**
390
     * @inheritDoc
391
     */
392 1
    public function transactionLevel()
393
    {
394 1
        throw new CassandraNotSupportedException("Transactions is not supported by Cassandra database");
395
    }
396
397
    //TODO: override isDoctrineAvailable method
398
    //TODO: override getDoctrineColumn method
399
    //TODO: override getDoctrineSchemaManager method
400
    //TODO: override getDoctrineConnection method
401
    //TODO: override getPdo method
402
    //TODO: override getReadPdo method
403
    //TODO: override setPdo method
404
    //TODO: override setReadPdo method
405
    //TODO: override setReconnector method
406
    //TODO: override reconnect method
407
    //TODO: override query method
408
409
    //TODO: override bindValues method
410
    //TODO: override cursor method
411
    //TODO: override unprepared method
412
413
    //TODO: check prepareBindings method
414
415
    //TODO: provide interface for $this->session->executeAsync
416
    //TODO: provide interface for $this->session->prepareAsync
417
    //TODO: provide interface for $this->session->closeAsync
418
    //TODO: provide interface for $this->session->schema
419
}
420