Completed
Push — master ( 171b4c...103bf7 )
by GBProd
02:19
created

BulkProvider::hasBulk()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 1
Bugs 1 Features 0
Metric Value
c 1
b 1
f 0
dl 0
loc 5
ccs 3
cts 3
cp 1
rs 9.4285
cc 1
eloc 2
nc 1
nop 0
crap 1
1
<?php
2
3
namespace GBProd\ElasticaProviderBundle\Provider;
4
5
use Elastica\Client;
6
use GBProd\ElasticaProviderBundle\Event\HasProvidedDocument;
7
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
8
9
/**
10
 * Abstract class for data providing
11
 *
12
 * @author gbprod <[email protected]>
13
 */
14
abstract class BulkProvider implements Provider
15
{
16
    const DEFAULT_BULK_SIZE = 1000;
17
18
    /**
19
     * @var Client
20
     */
21
    private $client;
22
23
    /**
24
     * @var string
25
     */
26
    private $index;
27
28
    /**
29
     * @var string
30
     */
31
    private $type;
32
33
    /**
34
     * @var EventDispatcherInterface
35
     */
36
    private $dispatcher;
37
38
    /**
39
     * @var int
40
     */
41
    private $bulkSize = self::DEFAULT_BULK_SIZE;
42
43
    /**
44
     * @var array
45
     */
46
    private $currentBulk;
47
48
    /**
49
     * @var int
50
     */
51
    private $currentBulkSize;
52
53
    /**
54
     * {@inheritdoc}
55
     */
56 6
    public function run(Client $client, $index, $type, EventDispatcherInterface $dispatcher)
57
    {
58 6
        $this->initialize($client, $index, $type, $dispatcher);
59 6
        $this->populate();
60 6
        $this->commit();
61 6
    }
62
63 6
    private function initialize(Client $client, $index, $type, EventDispatcherInterface $dispatcher)
64
    {
65 6
        $this->client     = $client;
66 6
        $this->index      = $index;
67 6
        $this->type       = $type;
68 6
        $this->dispatcher = $dispatcher;
69
70 6
        $this->currentBulkSize = 0;
71 6
        $this->currentBulk     = ['body' => []];
72 6
    }
73
74
    /**
75
     * Populate
76
     *
77
     * @return null
78
     */
79
    abstract protected function populate();
80
81 6
    private function commit()
82
    {
83 6
        if ($this->hasBulk()) {
84 5
            $this->flushBulk();
85 5
        }
86
87 6
        $this->client->refreshAll();
88 6
    }
89
90
    /**
91
     * Index document
92
     *
93
     * @param string $id
94
     * @param array  $body
95
     */
96 2
    public function index($id, array $body)
97
    {
98 2
        $this->addBulkAction('index', $id, $body);
99 2
    }
100
101 5
    private function addBulkAction($action, $id, array $body = null)
102
    {
103 5
        $this->addBulkData($action, $id, $body);
104
105 5
        $this->incrementBulk();
106
107 5
        $this->dispatcher->dispatch(
108 5
            'elasticsearch.has_provided_document',
109 5
            new HasProvidedDocument($id)
110 5
        );
111 5
    }
112
113 5
    private function addBulkData($action, $id, array $body = null)
114
    {
115 5
        $this->currentBulk['body'][] = [
116
            $action => [
117 5
                '_index' => $this->index,
118 5
                '_type'  => $this->type,
119 5
                '_id'    => $id,
120
            ]
121 5
        ];
122
123 5
        if (null !== $body) {
124 4
            $this->currentBulk['body'][] = $body;
125 4
        }
126 5
    }
127
128 5
    private function incrementBulk()
129
    {
130 5
        if ($this->shouldFlushBulk()) {
131 1
            $this->flushBulk();
132 1
        }
133
134 5
        $this->currentBulkSize++;
135 5
    }
136
137 5
    private function shouldFlushBulk()
138
    {
139 5
        return $this->currentBulkSize >= $this->bulkSize
140 5
            && $this->hasBulk()
141 5
        ;
142
    }    
143
    
144 6
    private function hasBulk()
145
    {
146 6
        return $this->currentBulkSize > 0
147 6
        ;
148
    }
149
150 5
    private function flushBulk()
151
    {
152 5
        $this->client->bulk($this->currentBulk);
153
154 5
        $this->currentBulkSize = 0;
155 5
        $this->currentBulk     = ['body' => []];
156 5
    }
157
158
159
    /**
160
     * Delete document
161
     *
162
     * @param string $id
163
     */
164 1
    public function delete($id)
165
    {
166 1
        $this->addBulkAction('delete', $id);
167 1
    }
168
169
    /**
170
     * Create a document
171
     *
172
     * @param string $id
173
     * @param array  $body
174
     */
175 1
    public function create($id, array $body)
176
    {
177 1
        $this->addBulkAction('create', $id, $body);
178 1
    }
179
180
    /**
181
     * Update a document
182
     *
183
     * @param string $id
184
     * @param array  $body
185
     */
186 1
    public function update($id, array $body)
187
    {
188 1
        $this->addBulkAction('update', $id, ['doc' => $body]);
189 1
    }
190
191
    /**
192
     * {@inheritdoc}
193
     */
194 1
    public function count()
195
    {
196 1
        return null;
197
    }
198
199
    /**
200
     * Change bulk size
201
     *
202
     * @param int $bulkSize
203
     *
204
     * @return self
205
     */
206 1
    public function changeBulkSize($bulkSize = self::DEFAULT_BULK_SIZE)
207
    {
208 1
        $this->bulkSize = $bulkSize;
209
210 1
        return $this;
211
    }
212
}
213