Completed
Push — master ( 3a2d29...d0475e )
by Nicolas
02:57
created

lib/Elastica/Client.php (2 issues)

Upgrade to new PHP Analysis Engine

These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more

1
<?php
2
namespace Elastica;
3
4
use Elastica\Bulk\Action;
5
use Elastica\Exception\ConnectionException;
6
use Elastica\Exception\InvalidException;
7
use Elastica\Script\AbstractScript;
8
use Psr\Log\LoggerInterface;
9
use Psr\Log\NullLogger;
10
11
/**
12
 * Client to connect the the elasticsearch server.
13
 *
14
 * @author Nicolas Ruflin <[email protected]>
15
 */
16
class Client
17
{
18
    /**
19
     * Config with defaults.
20
     *
21
     * log: Set to true, to enable logging, set a string to log to a specific file
22
     * retryOnConflict: Use in \Elastica\Client::updateDocument
23
     * bigintConversion: Set to true to enable the JSON bigint to string conversion option (see issue #717)
24
     *
25
     * @var array
26
     */
27
    protected $_config = [
28
        'host' => null,
29
        'port' => null,
30
        'path' => null,
31
        'url' => null,
32
        'proxy' => null,
33
        'transport' => null,
34
        'persistent' => true,
35
        'timeout' => null,
36
        'connections' => [], // host, port, path, timeout, transport, compression, persistent, timeout, config -> (curl, headers, url)
37
        'roundRobin' => false,
38
        'log' => false,
39
        'retryOnConflict' => 0,
40
        'bigintConversion' => false,
41
        'username' => null,
42
        'password' => null,
43
    ];
44
45
    /**
46
     * @var callback
47
     */
48
    protected $_callback;
49
50
    /**
51
     * @var Connection\ConnectionPool
52
     */
53
    protected $_connectionPool;
54
55
    /**
56
     * @var \Elastica\Request|null
57
     */
58
    protected $_lastRequest;
59
60
    /**
61
     * @var \Elastica\Response|null
62
     */
63
    protected $_lastResponse;
64
65
    /**
66
     * @var LoggerInterface
67
     */
68
    protected $_logger;
69
70
    /**
71
     * @var string
72
     */
73
    protected $_version;
74
75
    /**
76
     * Creates a new Elastica client.
77
     *
78
     * @param array           $config   OPTIONAL Additional config options
79
     * @param callback        $callback OPTIONAL Callback function which can be used to be notified about errors (for example connection down)
80
     * @param LoggerInterface $logger
81
     */
82
    public function __construct(array $config = [], $callback = null, LoggerInterface $logger = null)
83
    {
84
        $this->_callback = $callback;
85
86
        if (!$logger && isset($config['log']) && $config['log']) {
87
            $logger = new Log($config['log']);
88
        }
89
        $this->_logger = $logger ?: new NullLogger();
90
91
        $this->setConfig($config);
92
        $this->_initConnections();
93
    }
94
95
    /**
96
     * Get current version.
97
     *
98
     * @return string
99
     */
100
    public function getVersion()
101
    {
102
        if ($this->_version) {
103
            return $this->_version;
104
        }
105
106
        $data = $this->request('/')->getData();
107
108
        return $this->_version = $data['version']['number'];
109
    }
110
111
    /**
112
     * Inits the client connections.
113
     */
114
    protected function _initConnections()
115
    {
116
        $connections = [];
117
118
        foreach ($this->getConfig('connections') as $connection) {
119
            $connections[] = Connection::create($this->_prepareConnectionParams($connection));
120
        }
121
122
        if (isset($this->_config['servers'])) {
123
            foreach ($this->getConfig('servers') as $server) {
124
                $connections[] = Connection::create($this->_prepareConnectionParams($server));
125
            }
126
        }
127
128
        // If no connections set, create default connection
129
        if (empty($connections)) {
130
            $connections[] = Connection::create($this->_prepareConnectionParams($this->getConfig()));
131
        }
132
133
        if (!isset($this->_config['connectionStrategy'])) {
134
            if ($this->getConfig('roundRobin') === true) {
135
                $this->setConfigValue('connectionStrategy', 'RoundRobin');
136
            } else {
137
                $this->setConfigValue('connectionStrategy', 'Simple');
138
            }
139
        }
140
141
        $strategy = Connection\Strategy\StrategyFactory::create($this->getConfig('connectionStrategy'));
142
143
        $this->_connectionPool = new Connection\ConnectionPool($connections, $strategy, $this->_callback);
144
    }
145
146
    /**
147
     * Creates a Connection params array from a Client or server config array.
148
     *
149
     * @param array $config
150
     *
151
     * @return array
152
     */
153
    protected function _prepareConnectionParams(array $config)
154
    {
155
        $params = [];
156
        $params['config'] = [];
157
        foreach ($config as $key => $value) {
158
            if (in_array($key, ['bigintConversion', 'curl', 'headers', 'url'])) {
159
                $params['config'][$key] = $value;
160
            } else {
161
                $params[$key] = $value;
162
            }
163
        }
164
165
        return $params;
166
    }
167
168
    /**
169
     * Sets specific config values (updates and keeps default values).
170
     *
171
     * @param array $config Params
172
     *
173
     * @return $this
174
     */
175
    public function setConfig(array $config)
176
    {
177
        foreach ($config as $key => $value) {
178
            $this->_config[$key] = $value;
179
        }
180
181
        return $this;
182
    }
183
184
    /**
185
     * Returns a specific config key or the whole
186
     * config array if not set.
187
     *
188
     * @param string $key Config key
189
     *
190
     * @throws \Elastica\Exception\InvalidException
191
     *
192
     * @return array|string Config value
193
     */
194
    public function getConfig($key = '')
195
    {
196
        if (empty($key)) {
197
            return $this->_config;
198
        }
199
200
        if (!array_key_exists($key, $this->_config)) {
201
            throw new InvalidException('Config key is not set: '.$key);
202
        }
203
204
        return $this->_config[$key];
205
    }
206
207
    /**
208
     * Sets / overwrites a specific config value.
209
     *
210
     * @param string $key   Key to set
211
     * @param mixed  $value Value
212
     *
213
     * @return $this
214
     */
215
    public function setConfigValue($key, $value)
216
    {
217
        return $this->setConfig([$key => $value]);
218
    }
219
220
    /**
221
     * @param array|string $keys    config key or path of config keys
222
     * @param mixed        $default default value will be returned if key was not found
223
     *
224
     * @return mixed
225
     */
226
    public function getConfigValue($keys, $default = null)
227
    {
228
        $value = $this->_config;
229
        foreach ((array) $keys as $key) {
230
            if (isset($value[$key])) {
231
                $value = $value[$key];
232
            } else {
233
                return $default;
234
            }
235
        }
236
237
        return $value;
238
    }
239
240
    /**
241
     * Returns the index for the given connection.
242
     *
243
     * @param string $name Index name to create connection to
244
     *
245
     * @return \Elastica\Index Index for the given name
246
     */
247
    public function getIndex($name)
248
    {
249
        return new Index($this, $name);
250
    }
251
252
    /**
253
     * Adds a HTTP Header.
254
     *
255
     * @param string $header      The HTTP Header
256
     * @param string $headerValue The HTTP Header Value
257
     *
258
     * @throws \Elastica\Exception\InvalidException If $header or $headerValue is not a string
259
     *
260
     * @return $this
261
     */
262 View Code Duplication
    public function addHeader($header, $headerValue)
263
    {
264
        if (is_string($header) && is_string($headerValue)) {
265
            $this->_config['headers'][$header] = $headerValue;
266
        } else {
267
            throw new InvalidException('Header must be a string');
268
        }
269
270
        return $this;
271
    }
272
273
    /**
274
     * Remove a HTTP Header.
275
     *
276
     * @param string $header The HTTP Header to remove
277
     *
278
     * @throws \Elastica\Exception\InvalidException If $header is not a string
279
     *
280
     * @return $this
281
     */
282 View Code Duplication
    public function removeHeader($header)
283
    {
284
        if (is_string($header)) {
285
            if (array_key_exists($header, $this->_config['headers'])) {
286
                unset($this->_config['headers'][$header]);
287
            }
288
        } else {
289
            throw new InvalidException('Header must be a string');
290
        }
291
292
        return $this;
293
    }
294
295
    /**
296
     * Uses _bulk to send documents to the server.
297
     *
298
     * Array of \Elastica\Document as input. Index and type has to be
299
     * set inside the document, because for bulk settings documents,
300
     * documents can belong to any type and index
301
     *
302
     * @link https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
303
     *
304
     * @param array|\Elastica\Document[] $docs Array of Elastica\Document
305
     *
306
     * @throws \Elastica\Exception\InvalidException If docs is empty
307
     *
308
     * @return \Elastica\Bulk\ResponseSet Response object
309
     */
310 View Code Duplication
    public function updateDocuments(array $docs)
311
    {
312
        if (empty($docs)) {
313
            throw new InvalidException('Array has to consist of at least one element');
314
        }
315
316
        $bulk = new Bulk($this);
317
318
        $bulk->addDocuments($docs, \Elastica\Bulk\Action::OP_TYPE_UPDATE);
319
320
        return $bulk->send();
321
    }
322
323
    /**
324
     * Uses _bulk to send documents to the server.
325
     *
326
     * Array of \Elastica\Document as input. Index and type has to be
327
     * set inside the document, because for bulk settings documents,
328
     * documents can belong to any type and index
329
     *
330
     * @link https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
331
     *
332
     * @param array|\Elastica\Document[] $docs Array of Elastica\Document
333
     *
334
     * @throws \Elastica\Exception\InvalidException If docs is empty
335
     *
336
     * @return \Elastica\Bulk\ResponseSet Response object
337
     */
338 View Code Duplication
    public function addDocuments(array $docs)
339
    {
340
        if (empty($docs)) {
341
            throw new InvalidException('Array has to consist of at least one element');
342
        }
343
344
        $bulk = new Bulk($this);
345
346
        $bulk->addDocuments($docs);
347
348
        return $bulk->send();
349
    }
350
351
    /**
352
     * Update document, using update script. Requires elasticsearch >= 0.19.0.
353
     *
354
     * @param int|string                                               $id      document id
355
     * @param array|\Elastica\Script\AbstractScript|\Elastica\Document $data    raw data for request body
356
     * @param string                                                   $index   index to update
357
     * @param string                                                   $type    type of index to update
358
     * @param array                                                    $options array of query params to use for query. For possible options check es api
359
     *
360
     * @return \Elastica\Response
361
     *
362
     * @link https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html
363
     */
364
    public function updateDocument($id, $data, $index, $type, array $options = [])
365
    {
366
        $path = $index.'/'.$type.'/'.$id.'/_update';
367
368
        if ($data instanceof AbstractScript) {
369
            $requestData = $data->toArray();
370
        } elseif ($data instanceof Document) {
371
            $requestData = ['doc' => $data->getData()];
372
373
            if ($data->getDocAsUpsert()) {
374
                $requestData['doc_as_upsert'] = true;
375
            }
376
377
            $docOptions = $data->getOptions(
378
                [
379
                    'version',
380
                    'version_type',
381
                    'routing',
382
                    'percolate',
383
                    'parent',
384
                    'fields',
385
                    'retry_on_conflict',
386
                    'consistency',
387
                    'replication',
388
                    'refresh',
389
                    'timeout',
390
                ]
391
            );
392
            $options += $docOptions;
393
            // set fields param to source only if options was not set before
394
            if ($data instanceof Document && ($data->isAutoPopulate()
395
                || $this->getConfigValue(['document', 'autoPopulate'], false))
396
                && !isset($options['fields'])
397
            ) {
398
                $options['fields'] = '_source';
399
            }
400
        } else {
401
            $requestData = $data;
402
        }
403
404
        //If an upsert document exists
405
        if ($data instanceof AbstractScript || $data instanceof Document) {
406
            if ($data->hasUpsert()) {
407
                $requestData['upsert'] = $data->getUpsert()->getData();
408
            }
409
        }
410
411
        if (!isset($options['retry_on_conflict'])) {
412
            $retryOnConflict = $this->getConfig('retryOnConflict');
413
            $options['retry_on_conflict'] = $retryOnConflict;
414
        }
415
416
        $response = $this->request($path, Request::POST, $requestData, $options);
417
418 View Code Duplication
        if ($response->isOk()
419
            && $data instanceof Document
420
            && ($data->isAutoPopulate() || $this->getConfigValue(['document', 'autoPopulate'], false))
421
        ) {
422
            $responseData = $response->getData();
423
            if (isset($responseData['_version'])) {
424
                $data->setVersion($responseData['_version']);
425
            }
426
            if (isset($options['fields'])) {
427
                $this->_populateDocumentFieldsFromResponse($response, $data, $options['fields']);
428
            }
429
        }
430
431
        return $response;
432
    }
433
434
    /**
435
     * @param \Elastica\Response $response
436
     * @param \Elastica\Document $document
437
     * @param string             $fields   Array of field names to be populated or '_source' if whole document data should be updated
438
     */
439
    protected function _populateDocumentFieldsFromResponse(Response $response, Document $document, $fields)
440
    {
441
        $responseData = $response->getData();
442
        if ('_source' == $fields) {
443
            if (isset($responseData['get']['_source']) && is_array($responseData['get']['_source'])) {
444
                $document->setData($responseData['get']['_source']);
445
            }
446
        } else {
447
            $keys = explode(',', $fields);
448
            $data = $document->getData();
449
            foreach ($keys as $key) {
450
                if (isset($responseData['get']['fields'][$key])) {
451
                    $data[$key] = $responseData['get']['fields'][$key];
452
                } elseif (isset($data[$key])) {
453
                    unset($data[$key]);
454
                }
455
            }
456
            $document->setData($data);
457
        }
458
    }
459
460
    /**
461
     * Bulk deletes documents.
462
     *
463
     * @param array|\Elastica\Document[] $docs
464
     *
465
     * @throws \Elastica\Exception\InvalidException
466
     *
467
     * @return \Elastica\Bulk\ResponseSet
468
     */
469 View Code Duplication
    public function deleteDocuments(array $docs)
470
    {
471
        if (empty($docs)) {
472
            throw new InvalidException('Array has to consist of at least one element');
473
        }
474
475
        $bulk = new Bulk($this);
476
        $bulk->addDocuments($docs, Action::OP_TYPE_DELETE);
477
478
        return $bulk->send();
479
    }
480
481
    /**
482
     * Returns the status object for all indices.
483
     *
484
     * @return \Elastica\Status Status object
485
     */
486
    public function getStatus()
487
    {
488
        return new Status($this);
489
    }
490
491
    /**
492
     * Returns the current cluster.
493
     *
494
     * @return \Elastica\Cluster Cluster object
495
     */
496
    public function getCluster()
497
    {
498
        return new Cluster($this);
499
    }
500
501
    /**
502
     * Establishes the client connections.
503
     */
504
    public function connect()
505
    {
506
        return $this->_initConnections();
507
    }
508
509
    /**
510
     * @param \Elastica\Connection $connection
511
     *
512
     * @return $this
513
     */
514
    public function addConnection(Connection $connection)
515
    {
516
        $this->_connectionPool->addConnection($connection);
517
518
        return $this;
519
    }
520
521
    /**
522
     * Determines whether a valid connection is available for use.
523
     *
524
     * @return bool
525
     */
526
    public function hasConnection()
527
    {
528
        return $this->_connectionPool->hasConnection();
529
    }
530
531
    /**
532
     * @throws \Elastica\Exception\ClientException
533
     *
534
     * @return \Elastica\Connection
535
     */
536
    public function getConnection()
537
    {
538
        return $this->_connectionPool->getConnection();
539
    }
540
541
    /**
542
     * @return \Elastica\Connection[]
543
     */
544
    public function getConnections()
545
    {
546
        return $this->_connectionPool->getConnections();
547
    }
548
549
    /**
550
     * @return \Elastica\Connection\Strategy\StrategyInterface
551
     */
552
    public function getConnectionStrategy()
553
    {
554
        return $this->_connectionPool->getStrategy();
555
    }
556
557
    /**
558
     * @param array|\Elastica\Connection[] $connections
559
     *
560
     * @return $this
561
     */
562
    public function setConnections(array $connections)
563
    {
564
        $this->_connectionPool->setConnections($connections);
565
566
        return $this;
567
    }
568
569
    /**
570
     * Deletes documents with the given ids, index, type from the index.
571
     *
572
     * @link https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
573
     *
574
     * @param array                  $ids     Document ids
575
     * @param string|\Elastica\Index $index   Index name
576
     * @param string|\Elastica\Type  $type    Type of documents
577
     * @param string|bool            $routing Optional routing key for all ids
578
     *
579
     * @throws \Elastica\Exception\InvalidException
580
     *
581
     * @return \Elastica\Bulk\ResponseSet Response  object
582
     */
583
    public function deleteIds(array $ids, $index, $type, $routing = false)
584
    {
585
        if (empty($ids)) {
586
            throw new InvalidException('Array has to consist of at least one id');
587
        }
588
589
        $bulk = new Bulk($this);
590
        $bulk->setIndex($index);
591
        $bulk->setType($type);
592
593
        foreach ($ids as $id) {
594
            $action = new Action(Action::OP_TYPE_DELETE);
595
            $action->setId($id);
596
597
            if (!empty($routing)) {
598
                $action->setRouting($routing);
0 ignored issues
show
It seems like $routing defined by parameter $routing on line 583 can also be of type boolean; however, Elastica\Bulk\Action::setRouting() does only seem to accept string, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
599
            }
600
601
            $bulk->addAction($action);
602
        }
603
604
        return $bulk->send();
605
    }
606
607
    /**
608
     * Bulk operation.
609
     *
610
     * Every entry in the params array has to exactly on array
611
     * of the bulk operation. An example param array would be:
612
     *
613
     * array(
614
     *         array('index' => array('_index' => 'test', '_type' => 'user', '_id' => '1')),
615
     *         array('user' => array('name' => 'hans')),
616
     *         array('delete' => array('_index' => 'test', '_type' => 'user', '_id' => '2'))
617
     * );
618
     *
619
     * @link https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
620
     *
621
     * @param array $params Parameter array
622
     *
623
     * @throws \Elastica\Exception\ResponseException
624
     * @throws \Elastica\Exception\InvalidException
625
     *
626
     * @return \Elastica\Bulk\ResponseSet Response object
627
     */
628
    public function bulk(array $params)
629
    {
630
        if (empty($params)) {
631
            throw new InvalidException('Array has to consist of at least one param');
632
        }
633
634
        $bulk = new Bulk($this);
635
636
        $bulk->addRawData($params);
637
638
        return $bulk->send();
639
    }
640
641
    /**
642
     * Makes calls to the elasticsearch server based on this index.
643
     *
644
     * It's possible to make any REST query directly over this method
645
     *
646
     * @param string       $path   Path to call
647
     * @param string       $method Rest method to use (GET, POST, DELETE, PUT)
648
     * @param array|string $data   OPTIONAL Arguments as array or pre-encoded string
649
     * @param array        $query  OPTIONAL Query params
650
     *
651
     * @throws Exception\ConnectionException|\Exception
652
     *
653
     * @return Response Response object
654
     */
655
    public function request($path, $method = Request::GET, $data = [], array $query = [])
656
    {
657
        $connection = $this->getConnection();
658
        $request = $this->_lastRequest = new Request($path, $method, $data, $query, $connection);
0 ignored issues
show
It seems like $data defined by parameter $data on line 655 can also be of type string; however, Elastica\Request::__construct() does only seem to accept array, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
659
        $this->_lastResponse = null;
660
661
        try {
662
            $response = $this->_lastResponse = $request->send();
663
        } catch (ConnectionException $e) {
664
            $this->_connectionPool->onFail($connection, $e, $this);
665
666
            $this->_log($e);
667
668
            // In case there is no valid connection left, throw exception which caused the disabling of the connection.
669
            if (!$this->hasConnection()) {
670
                throw $e;
671
            }
672
673
            return $this->request($path, $method, $data, $query);
674
        }
675
676
        $this->_log($request);
677
678
        return $response;
679
    }
680
681
    /**
682
     * logging.
683
     *
684
     * @deprecated Overwriting Client->_log is deprecated. Handle logging functionality by using a custom LoggerInterface.
685
     *
686
     * @param mixed $context
687
     */
688
    protected function _log($context)
689
    {
690
        if ($context instanceof ConnectionException) {
691
            $this->_logger->error('Elastica Request Failure', [
692
                'exception' => $context,
693
                'request' => $context->getRequest()->toArray(),
694
                'retry' => $this->hasConnection(),
695
            ]);
696
697
            return;
698
        }
699
700
        if ($context instanceof Request) {
701
            $this->_logger->debug('Elastica Request', [
702
                'request' => $context->toArray(),
703
                'response' => $this->_lastResponse ? $this->_lastResponse->getData() : null,
704
                'responseStatus' => $this->_lastResponse ? $this->_lastResponse->getStatus() : null,
705
            ]);
706
707
            return;
708
        }
709
710
        $this->_logger->debug('Elastica Request', [
711
            'message' => $context,
712
        ]);
713
    }
714
715
    /**
716
     * Optimizes all search indices.
717
     *
718
     * @param array $args OPTIONAL Optional arguments
719
     *
720
     * @return \Elastica\Response Response object
721
     *
722
     * @link https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-optimize.html
723
     */
724
    public function optimizeAll($args = [])
725
    {
726
        return $this->request('_optimize', Request::POST, [], $args);
727
    }
728
729
    /**
730
     * Refreshes all search indices.
731
     *
732
     * @return \Elastica\Response Response object
733
     *
734
     * @link https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-refresh.html
735
     */
736
    public function refreshAll()
737
    {
738
        return $this->request('_refresh', Request::POST);
739
    }
740
741
    /**
742
     * @return Request|null
743
     */
744
    public function getLastRequest()
745
    {
746
        return $this->_lastRequest;
747
    }
748
749
    /**
750
     * @return Response|null
751
     */
752
    public function getLastResponse()
753
    {
754
        return $this->_lastResponse;
755
    }
756
757
    /**
758
     * Replace the existing logger.
759
     *
760
     * @param LoggerInterface $logger
761
     *
762
     * @return $this
763
     */
764
    public function setLogger(LoggerInterface $logger)
765
    {
766
        $this->_logger = $logger;
767
768
        return $this;
769
    }
770
}
771