BulkDataProvider   A
last analyzed

Complexity

Total Complexity 17

Size/Duplication

Total Lines 193
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 4

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
wmc 17
lcom 1
cbo 4
dl 0
loc 193
ccs 69
cts 69
cp 1
rs 10
c 0
b 0
f 0

15 Methods

Rating   Name   Duplication   Size   Complexity  
A run() 0 6 1
A initialize() 0 10 1
populate() 0 1 ?
A commit() 0 8 1
A index() 0 4 1
A addBulkAction() 0 11 1
A addBulkData() 0 14 2
A incrementBulk() 0 8 2
A shouldFlushBulk() 0 6 2
A flushBulk() 0 7 1
A delete() 0 4 1
A create() 0 4 1
A update() 0 4 1
A count() 0 4 1
A changeBulkSize() 0 6 1
1
<?php
2
3
namespace GBProd\ElasticsearchDataProviderBundle\DataProvider;
4
5
use Elasticsearch\Client;
6
use GBProd\ElasticsearchDataProviderBundle\Event\HasProvidedDocument;
7
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
8
9
/**
10
 * Abstract class for data providing
11
 *
12
 * @author gbprod <[email protected]>
13
 */
14
abstract class BulkDataProvider implements DataProvider
15
{
16
    const DEFAULT_BULK_SIZE = 1000;
17
18
    /**
19
     * @var Client
20
     */
21
    private $client;
22
23
    /**
24
     * @var string
25
     */
26
    private $index;
27
28
    /**
29
     * @var string
30
     */
31
    private $type;
32
33
    /**
34
     * @var EventDispatcherInterface
35
     */
36
    private $dispatcher;
37
38
    /**
39
     * @var int
40
     */
41
    private $bulkSize = self::DEFAULT_BULK_SIZE;
42
43
    /**
44
     * @var array
45
     */
46
    private $currentBulk;
47
48
    /**
49
     * @var int
50
     */
51
    private $currentBulkSize;
52
53
    /**
54
     * {@inheritdoc}
55
     */
56 6
    public function run(Client $client, $index, $type, EventDispatcherInterface $dispatcher)
57
    {
58 6
        $this->initialize($client, $index, $type, $dispatcher);
59 6
        $this->populate();
60 6
        $this->commit();
61 6
    }
62
63 6
    private function initialize(Client $client, $index, $type, EventDispatcherInterface $dispatcher)
64
    {
65 6
        $this->client     = $client;
66 6
        $this->index      = $index;
67 6
        $this->type       = $type;
68 6
        $this->dispatcher = $dispatcher;
69
70 6
        $this->currentBulkSize = 0;
71 6
        $this->currentBulk     = ['body' => []];
72 6
    }
73
74
    /**
75
     * Populate
76
     *
77
     * @return null
78
     */
79
    abstract protected function populate();
80
81 6
    private function commit()
82
    {
83 6
        $this->flushBulk();
84
85 6
        $this->client->indices()->refresh([
86 6
            'index' => $this->index,
87 6
        ]);
88 6
    }
89
90
    /**
91
     * Index document
92
     *
93
     * @param string $id
94
     * @param array  $body
95
     */
96 2
    public function index($id, array $body)
97
    {
98 2
        $this->addBulkAction('index', $id, $body);
99 2
    }
100
101 5
    private function addBulkAction($action, $id, array $body = null)
102
    {
103 5
        $this->addBulkData($action, $id, $body);
104
105 5
        $this->incrementBulk();
106
107 5
        $this->dispatcher->dispatch(
108 5
            'elasticsearch.has_provided_document',
109 5
            new HasProvidedDocument($id)
110 5
        );
111 5
    }
112
113 5
    private function addBulkData($action, $id, array $body = null)
114
    {
115 5
        $this->currentBulk['body'][] = [
116
            $action => [
117 5
                '_index' => $this->index,
118 5
                '_type'  => $this->type,
119 5
                '_id'    => $id,
120
            ]
121 5
        ];
122
123 5
        if (null !== $body) {
124 4
            $this->currentBulk['body'][] = $body;
125 4
        }
126 5
    }
127
128 5
    private function incrementBulk()
129
    {
130 5
        if ($this->shouldFlushBulk()) {
131 1
            $this->flushBulk();
132 1
        }
133
134 5
        $this->currentBulkSize++;
135 5
    }
136
137 5
    private function shouldFlushBulk()
138
    {
139 5
        return $this->currentBulkSize >= $this->bulkSize
140 5
            && $this->currentBulkSize > 0
141 5
        ;
142
    }
143
144 6
    private function flushBulk()
145
    {
146 6
        $this->client->bulk($this->currentBulk);
147
148 6
        $this->currentBulkSize = 0;
149 6
        $this->currentBulk     = ['body' => []];
150 6
    }
151
152
153
    /**
154
     * Delete document
155
     *
156
     * @param string $id
157
     */
158 1
    public function delete($id)
159
    {
160 1
        $this->addBulkAction('delete', $id);
161 1
    }
162
163
    /**
164
     * Create a document
165
     *
166
     * @param string $id
167
     * @param array  $body
168
     */
169 1
    public function create($id, array $body)
170
    {
171 1
        $this->addBulkAction('create', $id, $body);
172 1
    }
173
174
    /**
175
     * Update a document
176
     *
177
     * @param string $id
178
     * @param array  $body
179
     */
180 1
    public function update($id, array $body)
181
    {
182 1
        $this->addBulkAction('update', $id, ['doc' => $body]);
183 1
    }
184
185
    /**
186
     * {@inheritdoc}
187
     */
188 1
    public function count()
189
    {
190 1
        return null;
191
    }
192
193
    /**
194
     * Change bulk size
195
     *
196
     * @param int $bulkSize
197
     *
198
     * @return self
199
     */
200 1
    public function changeBulkSize($bulkSize = self::DEFAULT_BULK_SIZE)
201
    {
202 1
        $this->bulkSize = $bulkSize;
203
204 1
        return $this;
205
    }
206
}
207