Completed
Push — master ( 712720...2189d0 )
by Juuso
02:53
created

DataLoader::dispatchQueueBatch()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 23
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 15
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 23
ccs 15
cts 15
cp 1
rs 9.0856
c 0
b 0
f 0
cc 2
eloc 14
nc 2
nop 1
crap 2
1
<?php
2
3
namespace leinonen\DataLoader;
4
5
use React\Promise\Promise;
6
use React\EventLoop\LoopInterface;
7
8
class DataLoader implements DataLoaderInterface
9
{
10
    /**
11
     * @var callable
12
     */
13
    private $batchLoadFunction;
14
15
    /**
16
     * @var array
17
     */
18
    private $promiseQueue = [];
19
20
    /**
21
     * @var CacheMapInterface
22
     */
23
    private $promiseCache;
24
25
    /**
26
     * @var LoopInterface
27
     */
28
    private $eventLoop;
29
30
    /**
31
     * @var DataLoaderOptions
32
     */
33
    private $options;
34
35
    /**
36
     * Initiates a new DataLoader.
37
     *
38
     * @param callable $batchLoadFunction The function which will be called for the batch loading.
39
     * It must accept an array of keys and returns a Promise which resolves to an array of values.
40
     * @param LoopInterface $loop
41
     * @param CacheMapInterface $cacheMap
42
     * @param null|DataLoaderOptions $options
43
     */
44 32
    public function __construct(
45
        callable $batchLoadFunction,
46
        LoopInterface $loop,
47
        CacheMapInterface $cacheMap,
48
        DataLoaderOptions $options = null
49
    ) {
50 32
        $this->batchLoadFunction = $batchLoadFunction;
51 32
        $this->eventLoop = $loop;
52 32
        $this->promiseCache = $cacheMap;
53 32
        $this->options = empty($options) ? new DataLoaderOptions() : $options;
54 32
    }
55
56
    /**
57
     * {@inheritdoc}
58
     */
59 32
    public function load($key)
60
    {
61 32
        if ($key === null) {
62 2
            throw new \InvalidArgumentException(self::class . '::load must be called with a value, but got null');
63
        }
64
65 30
        if ($this->options->shouldCache() && $this->promiseCache->get($key)) {
66 9
            return $this->promiseCache->get($key);
67
        }
68
69 29
        $promise = new Promise(
70
            function (callable $resolve, callable $reject) use ($key) {
71 29
                $this->promiseQueue[] = [
72 29
                    'key' => $key,
73 29
                    'resolve' => $resolve,
74 29
                    'reject' => $reject,
75
                ];
76
77 29
                if (\count($this->promiseQueue) === 1) {
78 29
                    $this->scheduleDispatch();
79 29
                }
80 29
            }
81 29
        );
82
83 29
        if ($this->options->shouldCache()) {
84 28
            $this->promiseCache->set($key, $promise);
85 28
        }
86
87 29
        return $promise;
88
    }
89
90
    /**
91
     * {@inheritdoc}
92
     */
93 2
    public function loadMany(array $keys)
94
    {
95 2
        return \React\Promise\all(
96 2
            \array_map(
97
                function ($key) {
98 2
                    return $this->load($key);
99 2
                },
100
                $keys
101 2
            )
102 1
        );
103
    }
104
105
    /**
106
     * {@inheritdoc}
107
     */
108 11
    public function clear($key)
109
    {
110 11
        $this->promiseCache->delete($key);
111
112 11
        return $this;
113
    }
114
115
    /**
116
     * {@inheritdoc}
117
     */
118 1
    public function clearAll()
119
    {
120 1
        $this->promiseCache->clear();
121
122 1
        return $this;
123
    }
124
125
    /**
126
     * {@inheritdoc}
127
     */
128 4
    public function prime($key, $value)
129
    {
130 4
        if (! $this->promiseCache->get($key)) {
131
            // Cache a rejected promise if the value is an Exception, in order to match
132
            // the behavior of load($key).
133 4
            $promise = $value instanceof \Exception ? \React\Promise\reject($value) : \React\Promise\resolve($value);
134
135 4
            $this->promiseCache->set($key, $promise);
136 4
        }
137
138 4
        return $this;
139
    }
140
141
    /**
142
     * Schedules the dispatch to happen on the next tick of the EventLoop
143
     * If batching is disabled, schedule the dispatch immediately.
144
     *
145
     * @return void
146
     */
147 29
    private function scheduleDispatch()
148
    {
149 29
        if ($this->options->shouldBatch()) {
150 28
            $this->eventLoop->nextTick(
151
                function () {
152 27
                    $this->dispatchQueue();
153 27
                }
154 28
            );
155
156 28
            return;
157
        }
158
159 1
        $this->dispatchQueue();
160 1
    }
161
162
    /**
163
     * Resets and dispatches the DataLoaders queue.
164
     *
165
     * @return void
166
     */
167 28
    private function dispatchQueue()
168
    {
169 28
        $queue = $this->promiseQueue;
170 28
        $this->promiseQueue = [];
171
172 28
        $maxBatchSize = $this->options->getMaxBatchSize();
173
174 28
        if ($maxBatchSize !== null && $maxBatchSize > 0 && $maxBatchSize < count($queue)) {
175 1
            $this->dispatchQueueInMultipleBatches($queue, $maxBatchSize);
176 1
        } else {
177 27
            $this->dispatchQueueBatch($queue);
178
        }
179 28
    }
180
181
    /**
182
     * Dispatches a batch of a queue. The given batch can also be the whole queue.
183
     *
184
     * @param array $batch
185
     */
186 28
    private function dispatchQueueBatch($batch)
187
    {
188 28
        $keys = \array_column($batch, 'key');
189 28
        $batchLoadFunction = $this->batchLoadFunction;
190
191
        /** @var Promise $batchPromise */
192 28
        $batchPromise = $batchLoadFunction($keys);
193
194
        try {
195 28
            $this->validateBatchPromise($batchPromise);
196 28
        } catch (DataLoaderException $exception) {
197 4
            return $this->handleFailedDispatch($batch, $exception);
198
        }
199
200 24
        $batchPromise->then(
201
            function ($values) use ($batch, $keys) {
202 23
                $this->validateBatchPromiseOutput($values, $keys);
203 21
                $this->handleSuccessfulDispatch($batch, $values);
204 21
            }
205 24
        )->then(null, function ($error) use ($batch) {
206 3
            $this->handleFailedDispatch($batch, $error);
207 24
        });
208 24
    }
209
210
    /**
211
     * Dispatches the given queue in multiple batches.
212
     *
213
     * @param array $queue
214
     * @param int $maxBatchSize
215
     *
216
     * @return void
217
     */
218 1
    private function dispatchQueueInMultipleBatches(array $queue, $maxBatchSize)
219
    {
220 1
        $numberOfBatchesToDispatch = \count($queue) / $maxBatchSize;
221
222 1
        for ($i = 0; $i < $numberOfBatchesToDispatch; $i++) {
223 1
            $this->dispatchQueueBatch(
224 1
                \array_slice($queue, $i * $maxBatchSize, $maxBatchSize)
225 1
            );
226 1
        }
227 1
    }
228
229
    /**
230
     * Handles the batch by resolving the promises and rejecting ones that return Exceptions.
231
     *
232
     * @param array $batch
233
     * @param array $values
234
     */
235 21
    private function handleSuccessfulDispatch(array $batch, array $values)
236
    {
237 21
        foreach ($batch as $index => $queueItem) {
238 21
            $value = $values[$index];
239 21
            if ($value instanceof \Exception) {
240 4
                $queueItem['reject']($value);
241 4
            } else {
242 19
                $queueItem['resolve']($value);
243
            }
244 21
        }
245 21
    }
246
247
    /**
248
     * Handles the failed batch dispatch.
249
     *
250
     * @param array $batch
251
     * @param \Exception $error
252
     */
253 7
    private function handleFailedDispatch(array $batch, \Exception $error)
254
    {
255 7
        foreach ($batch as $index => $queueItem) {
256
            // We don't want to cache individual loads if the entire batch dispatch fails.
257 7
            $this->clear($queueItem['key']);
258 7
            $queueItem['reject']($error);
259 7
        }
260 7
    }
261
262
    /**
263
     * Validates the batch promise's output.
264
     *
265
     * @param array $values Values from resolved promise.
266
     * @param array $keys Keys which the DataLoaders load was called with
267
     */
268 23
    private function validateBatchPromiseOutput($values, $keys)
269
    {
270 23
        if (! \is_array($values)) {
271 1
            throw new DataLoaderException(
272 1
                self::class . ' must be constructed with a function which accepts ' .
273 1
                'an array of keys and returns a Promise which resolves to an array of values ' .
274 1
                \sprintf('not return a Promise: %s.', \gettype($values))
275 1
            );
276
        }
277
278 22
        if (\count($values) !== \count($keys)) {
279 1
            throw new DataLoaderException(
280 1
                self::class . ' must be constructed with a function which accepts ' .
281 1
                'an array of keys and returns a Promise which resolves to an array of values, but ' .
282 1
                'the function did not return a Promise of an array of the same length as the array of keys.' .
283 1
                \sprintf("\n Keys: %s\n Values: %s\n", \count($keys), \count($values))
284 1
            );
285
        }
286 21
    }
287
288
    /**
289
     * Validates the batch promise returned from the batch load function.
290
     *
291
     * @param $batchPromise
292
     *
293
     * @throws DataLoaderException
294
     */
295 28
    private function validateBatchPromise($batchPromise)
296
    {
297 28
        if (! $batchPromise || ! \is_callable([$batchPromise, 'then'])) {
298 4
            throw new DataLoaderException(
299 4
                self::class . ' must be constructed with a function which accepts ' .
300 4
                'an array of keys and returns a Promise which resolves to an array of values ' .
301 4
                \sprintf('the function returned %s.', \gettype($batchPromise))
302 4
            );
303
        }
304 24
    }
305
}
306