BulkProvider   A
last analyzed

Complexity

Total Complexity 19

Size/Duplication

Total Lines 197
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 3

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
wmc 19
lcom 1
cbo 3
dl 0
loc 197
ccs 71
cts 71
cp 1
rs 10
c 0
b 0
f 0

16 Methods

Rating   Name   Duplication   Size   Complexity  
A run() 0 6 1
A initialize() 0 10 1
populate() 0 1 ?
A commit() 0 7 2
A index() 0 4 1
A addBulkAction() 0 11 1
A addBulkData() 0 14 2
A incrementBulk() 0 8 2
A shouldFlushBulk() 0 6 2
A hasBulk() 0 4 1
A flushBulk() 0 7 1
A delete() 0 4 1
A create() 0 4 1
A update() 0 4 1
A count() 0 4 1
A changeBulkSize() 0 6 1
1
<?php
2
3
namespace GBProd\ElasticaProviderBundle\Provider;
4
5
use Elastica\Client;
6
use GBProd\ElasticaProviderBundle\Event\HasProvidedDocument;
7
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
8
9
/**
10
 * Abstract class for data providing
11
 *
12
 * @author gbprod <[email protected]>
13
 */
14
abstract class BulkProvider implements Provider
15
{
16
    const DEFAULT_BULK_SIZE = 1000;
17
18
    /**
19
     * @var Client
20
     */
21
    private $client;
22
23
    /**
24
     * @var string
25
     */
26
    private $index;
27
28
    /**
29
     * @var string
30
     */
31
    private $type;
32
33
    /**
34
     * @var EventDispatcherInterface
35
     */
36
    private $dispatcher;
37
38
    /**
39
     * @var int
40
     */
41
    private $bulkSize = self::DEFAULT_BULK_SIZE;
42
43
    /**
44
     * @var array
45
     */
46
    private $currentBulk;
47
48
    /**
49
     * @var int
50
     */
51
    private $currentBulkSize;
52
53
    /**
54
     * {@inheritdoc}
55
     */
56 6
    public function run(Client $client, $index, $type, EventDispatcherInterface $dispatcher)
57
    {
58 6
        $this->initialize($client, $index, $type, $dispatcher);
59 6
        $this->populate();
60 6
        $this->commit();
61 6
    }
62
63 6
    private function initialize(Client $client, $index, $type, EventDispatcherInterface $dispatcher)
64
    {
65 6
        $this->client     = $client;
66 6
        $this->index      = $index;
67 6
        $this->type       = $type;
68 6
        $this->dispatcher = $dispatcher;
69
70 6
        $this->currentBulkSize = 0;
71 6
        $this->currentBulk     = [];
72 6
    }
73
74
    /**
75
     * Populate
76
     *
77
     * @return null
78
     */
79
    abstract protected function populate();
80
81 6
    private function commit()
82
    {
83 6
        if ($this->hasBulk()) {
84 5
            $this->flushBulk();
85 5
            $this->client->refreshAll();
86 5
        }
87 6
    }
88
89
    /**
90
     * Index document
91
     *
92
     * @param string $id
93
     * @param array  $body
94
     */
95 2
    public function index($id, array $body)
96
    {
97 2
        $this->addBulkAction('index', $id, $body);
98 2
    }
99
100 5
    private function addBulkAction($action, $id, array $body = null)
101
    {
102 5
        $this->addBulkData($action, $id, $body);
103
104 5
        $this->incrementBulk();
105
106 5
        $this->dispatcher->dispatch(
107 5
            'elasticsearch.has_provided_document',
108 5
            new HasProvidedDocument($id)
109 5
        );
110 5
    }
111
112 5
    private function addBulkData($action, $id, array $body = null)
113
    {
114 5
        $this->currentBulk[] = [
115
            $action => [
116 5
                '_index' => $this->index,
117 5
                '_type'  => $this->type,
118 5
                '_id'    => $id,
119
            ]
120 5
        ];
121
122 5
        if (null !== $body) {
123 4
            $this->currentBulk[] = $body;
124 4
        }
125 5
    }
126
127 5
    private function incrementBulk()
128
    {
129 5
        if ($this->shouldFlushBulk()) {
130 1
            $this->flushBulk();
131 1
        }
132
133 5
        $this->currentBulkSize++;
134 5
    }
135
136 5
    private function shouldFlushBulk()
137
    {
138 5
        return $this->currentBulkSize >= $this->bulkSize
139 5
            && $this->hasBulk()
140 5
        ;
141
    }
142
143 6
    private function hasBulk()
144
    {
145 6
        return $this->currentBulkSize > 0;
146
    }
147
148 5
    private function flushBulk()
149
    {
150 5
        $this->client->bulk($this->currentBulk);
151
152 5
        $this->currentBulkSize = 0;
153 5
        $this->currentBulk     = [];
154 5
    }
155
156
157
    /**
158
     * Delete document
159
     *
160
     * @param string $id
161
     */
162 1
    public function delete($id)
163
    {
164 1
        $this->addBulkAction('delete', $id);
165 1
    }
166
167
    /**
168
     * Create a document
169
     *
170
     * @param string $id
171
     * @param array  $body
172
     */
173 1
    public function create($id, array $body)
174
    {
175 1
        $this->addBulkAction('create', $id, $body);
176 1
    }
177
178
    /**
179
     * Update a document
180
     *
181
     * @param string $id
182
     * @param array  $body
183
     */
184 1
    public function update($id, array $body)
185
    {
186 1
        $this->addBulkAction('update', $id, ['doc' => $body]);
187 1
    }
188
189
    /**
190
     * {@inheritdoc}
191
     */
192 1
    public function count()
193
    {
194 1
        return null;
195
    }
196
197
    /**
198
     * Change bulk size
199
     *
200
     * @param int $bulkSize
201
     *
202
     * @return self
203
     */
204 1
    public function changeBulkSize($bulkSize = self::DEFAULT_BULK_SIZE)
205
    {
206 1
        $this->bulkSize = $bulkSize;
207
208 1
        return $this;
209
    }
210
}
211