Completed
Pull Request — master (#1803)
by Maciej
15:26 queued 06:04
created

Builder::prepareIterator()   A

Complexity

Conditions 3
Paths 4

Size

Total Lines 13
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 3

Importance

Changes 0
Metric Value
dl 0
loc 13
ccs 7
cts 7
cp 1
rs 9.4285
c 0
b 0
f 0
cc 3
eloc 7
nc 4
nop 1
crap 3
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Doctrine\ODM\MongoDB\Aggregation;
6
7
use Doctrine\ODM\MongoDB\DocumentManager;
8
use Doctrine\ODM\MongoDB\Iterator\CachingIterator;
9
use Doctrine\ODM\MongoDB\Iterator\HydratingIterator;
10
use Doctrine\ODM\MongoDB\Iterator\Iterator;
11
use Doctrine\ODM\MongoDB\Mapping\ClassMetadata;
12
use Doctrine\ODM\MongoDB\Persisters\DocumentPersister;
13
use Doctrine\ODM\MongoDB\Query\Expr as QueryExpr;
14
use GeoJson\Geometry\Point;
15
use MongoDB\Collection;
16
use MongoDB\Driver\Cursor;
17
use function array_map;
18
use function array_merge;
19
use function array_unshift;
20
use function assert;
21
use function is_array;
22
use function sprintf;
23
24
/**
25
 * Fluent interface for building aggregation pipelines.
26
 */
27
class Builder
28
{
29
    /**
30
     * The DocumentManager instance for this query
31
     *
32
     * @var DocumentManager
33
     */
34
    private $dm;
35
36
    /**
37
     * The ClassMetadata instance.
38
     *
39
     * @var ClassMetadata
40
     */
41
    private $class;
42
43
    /** @var string */
44
    private $hydrationClass;
45
46
    /**
47
     * The Collection instance.
48
     *
49
     * @var Collection
50
     */
51
    private $collection;
52
53
    /** @var Stage[] */
54
    private $stages = [];
55
56
    /**
57
     * Create a new aggregation builder.
58
     *
59
     * @param string $documentName
60
     */
61 254
    public function __construct(DocumentManager $dm, $documentName)
62
    {
63 254
        $this->dm = $dm;
64 254
        $this->class = $this->dm->getClassMetadata($documentName);
65 254
        $this->collection = $this->dm->getDocumentCollection($documentName);
66 254
    }
67
68
    /**
69
     * Adds new fields to documents. $addFields outputs documents that contain all
70
     * existing fields from the input documents and newly added fields.
71
     *
72
     * The $addFields stage is equivalent to a $project stage that explicitly specifies
73
     * all existing fields in the input documents and adds the new fields.
74
     *
75
     * If the name of the new field is the same as an existing field name (including _id),
76
     * $addFields overwrites the existing value of that field with the value of the
77
     * specified expression.
78
     *
79
     * @see http://docs.mongodb.com/manual/reference/operator/aggregation/addFields/
80
     *
81
     * @return Stage\AddFields
82
     */
83 1
    public function addFields()
84
    {
85 1
        $stage = $this->addStage(new Stage\AddFields($this));
86 1
        assert($stage instanceof Stage\AddFields);
87 1
        return $stage;
88
    }
89
90
    /**
91
     * Categorizes incoming documents into groups, called buckets, based on a
92
     * specified expression and bucket boundaries.
93
     *
94
     * Each bucket is represented as a document in the output. The document for
95
     * each bucket contains an _id field, whose value specifies the inclusive
96
     * lower bound of the bucket and a count field that contains the number of
97
     * documents in the bucket. The count field is included by default when the
98
     * output is not specified.
99
     *
100
     * @see https://docs.mongodb.com/manual/reference/operator/aggregation/bucket/
101
     *
102
     * @return Stage\Bucket
103
     */
104 2
    public function bucket()
105
    {
106 2
        $stage = $this->addStage(new Stage\Bucket($this, $this->dm, $this->class));
107 2
        assert($stage instanceof Stage\Bucket);
108 2
        return $stage;
109
    }
110
111
    /**
112
     * Categorizes incoming documents into a specific number of groups, called
113
     * buckets, based on a specified expression.
114
     *
115
     * Bucket boundaries are automatically determined in an attempt to evenly
116
     * distribute the documents into the specified number of buckets. Each
117
     * bucket is represented as a document in the output. The document for each
118
     * bucket contains an _id field, whose value specifies the inclusive lower
119
     * bound and the exclusive upper bound for the bucket, and a count field
120
     * that contains the number of documents in the bucket. The count field is
121
     * included by default when the output is not specified.
122
     *
123
     * @see https://docs.mongodb.com/manual/reference/operator/aggregation/bucketAuto/
124
     *
125
     * @return Stage\BucketAuto
126
     */
127 2
    public function bucketAuto()
128
    {
129 2
        $stage = $this->addStage(new Stage\BucketAuto($this, $this->dm, $this->class));
130 2
        assert($stage instanceof Stage\BucketAuto);
131 2
        return $stage;
132
    }
133
134
    /**
135
     * Returns statistics regarding a collection or view.
136
     *
137
     * $collStats must be the first stage in an aggregation pipeline, or else
138
     * the pipeline returns an error.
139
     *
140
     * @see http://docs.mongodb.org/manual/reference/operator/aggregation/collStats/
141
     * @return Stage\CollStats
142
     */
143 1
    public function collStats()
144
    {
145 1
        $stage = $this->addStage(new Stage\CollStats($this));
146 1
        assert($stage instanceof Stage\CollStats);
147 1
        return $stage;
148
    }
149
150
    /**
151
     * Returns a document that contains a count of the number of documents input
152
     * to the stage.
153
     *
154
     * @see https://docs.mongodb.com/manual/reference/operator/aggregation/count/
155
     *
156
     * @return Stage\Count
157
     */
158 1
    public function count($fieldName)
159
    {
160 1
        $stage = $this->addStage(new Stage\Count($this, $fieldName));
161 1
        assert($stage instanceof Stage\Count);
162 1
        return $stage;
163
    }
164
165
    /**
166
     * Executes the aggregation pipeline
167
     *
168
     * @param array $options
169
     * @return Iterator
170
     */
171 17
    public function execute($options = [])
172
    {
173
        // Force cursor to be used
174 17
        $options = array_merge($options, ['cursor' => true]);
175
176 17
        $cursor = $this->collection->aggregate($this->getPipeline(), $options);
177
178 17
        return $this->prepareIterator($cursor);
179
    }
180
181
    /**
182
     * @return Expr
183
     */
184 147
    public function expr()
185
    {
186 147
        return new Expr($this->dm, $this->class);
187
    }
188
189
    /**
190
     * Processes multiple aggregation pipelines within a single stage on the
191
     * same set of input documents.
192
     *
193
     * Each sub-pipeline has its own field in the output document where its
194
     * results are stored as an array of documents.
195
     *
196
     * @return Stage\Facet
197
     */
198 1
    public function facet()
199
    {
200 1
        $stage = $this->addStage(new Stage\Facet($this));
201 1
        assert($stage instanceof Stage\Facet);
202 1
        return $stage;
203
    }
204
205
    /**
206
     * Outputs documents in order of nearest to farthest from a specified point.
207
     *
208
     * A GeoJSON point may be provided as the first and only argument for
209
     * 2dsphere queries. This single parameter may be a GeoJSON point object or
210
     * an array corresponding to the point's JSON representation. If GeoJSON is
211
     * used, the "spherical" option will default to true.
212
     *
213
     * You can only use this as the first stage of a pipeline.
214
     *
215
     * @see http://docs.mongodb.org/manual/reference/operator/aggregation/geoNear/
216
     *
217
     * @param float|array|Point $x
218
     * @param float             $y
219
     * @return Stage\GeoNear
220
     */
221 4
    public function geoNear($x, $y = null)
222
    {
223 4
        $stage = $this->addStage(new Stage\GeoNear($this, $x, $y));
224 4
        assert($stage instanceof Stage\GeoNear);
225 4
        return $stage;
226
    }
227
228
    /**
229
     * Returns the assembled aggregation pipeline
230
     *
231
     * For pipelines where the first stage is a $geoNear stage, it will apply
232
     * the document filters and discriminator queries to the query portion of
233
     * the geoNear operation. For all other pipelines, it prepends a $match stage
234
     * containing the required query.
235
     *
236
     * @return array
237
     */
238 55
    public function getPipeline()
239
    {
240 55
        $pipeline = array_map(
241
            function (Stage $stage) {
242 55
                return $stage->getExpression();
243 55
            },
244 55
            $this->stages
245
        );
246
247 55
        if ($this->getStage(0) instanceof Stage\GeoNear) {
248 4
            $pipeline[0]['$geoNear']['query'] = $this->applyFilters($pipeline[0]['$geoNear']['query']);
249
        } else {
250 51
            $matchExpression = $this->applyFilters([]);
251 51
            if ($matchExpression !== []) {
252 1
                array_unshift($pipeline, ['$match' => $matchExpression]);
253
            }
254
        }
255
256 55
        return $pipeline;
257
    }
258
259
    /**
260
     * Returns a certain stage from the pipeline
261
     *
262
     * @param int $index
263
     * @return Stage
264
     */
265 55
    public function getStage($index)
266
    {
267 55
        if (! isset($this->stages[$index])) {
268
            throw new \OutOfRangeException(sprintf('Could not find stage with index %d.', $index));
269
        }
270
271 55
        return $this->stages[$index];
272
    }
273
274
    /**
275
     * Performs a recursive search on a collection, with options for restricting
276
     * the search by recursion depth and query filter.
277
     *
278
     * @see https://docs.mongodb.org/manual/reference/operator/aggregation/graphLookup/
279
     *
280
     * @param string $from Target collection for the $graphLookup operation to
281
     * search, recursively matching the connectFromField to the connectToField.
282
     * @return Stage\GraphLookup
283
     */
284 10
    public function graphLookup($from)
285
    {
286 10
        $stage = $this->addStage(new Stage\GraphLookup($this, $from, $this->dm, $this->class));
287 9
        assert($stage instanceof Stage\GraphLookup);
288 9
        return $stage;
289
    }
290
291
    /**
292
     * Groups documents by some specified expression and outputs to the next
293
     * stage a document for each distinct grouping.
294
     *
295
     * @see http://docs.mongodb.org/manual/reference/operator/aggregation/group/
296
     *
297
     * @return Stage\Group
298
     */
299 4
    public function group()
300
    {
301 4
        $stage = $this->addStage(new Stage\Group($this));
302 4
        assert($stage instanceof Stage\Group);
303 4
        return $stage;
304
    }
305
306
    /**
307
     * Set which class to use when hydrating results as document class instances.
308
     *
309
     * @param string $className
310
     *
311
     * @return self
312
     */
313 4
    public function hydrate($className)
314
    {
315 4
        $this->hydrationClass = $className;
316
317 4
        return $this;
318
    }
319
320
    /**
321
     * Returns statistics regarding the use of each index for the collection.
322
     *
323
     * @see https://docs.mongodb.org/manual/reference/operator/aggregation/indexStats/
324
     *
325
     * @return Stage\IndexStats
326
     */
327 1
    public function indexStats()
328
    {
329 1
        $stage = $this->addStage(new Stage\IndexStats($this));
330 1
        assert($stage instanceof Stage\IndexStats);
331 1
        return $stage;
332
    }
333
334
    /**
335
     * Limits the number of documents passed to the next stage in the pipeline.
336
     *
337
     * @see http://docs.mongodb.org/manual/reference/operator/aggregation/limit/
338
     *
339
     * @param int $limit
340
     * @return Stage\Limit
341
     */
342 2
    public function limit($limit)
343
    {
344 2
        $stage = $this->addStage(new Stage\Limit($this, $limit));
345 2
        assert($stage instanceof Stage\Limit);
346 2
        return $stage;
347
    }
348
349
    /**
350
     * Performs a left outer join to an unsharded collection in the same
351
     * database to filter in documents from the “joined” collection for
352
     * processing.
353
     *
354
     * @see https://docs.mongodb.org/manual/reference/operator/aggregation/lookup/
355
     *
356
     * @param string $from
357
     * @return Stage\Lookup
358
     */
359 14
    public function lookup($from)
360
    {
361 14
        $stage = $this->addStage(new Stage\Lookup($this, $from, $this->dm, $this->class));
362 12
        assert($stage instanceof Stage\Lookup);
363 12
        return $stage;
364
    }
365
366
    /**
367
     * Filters the documents to pass only the documents that match the specified
368
     * condition(s) to the next pipeline stage.
369
     *
370
     * @see http://docs.mongodb.org/manual/reference/operator/aggregation/match/
371
     *
372
     * @return Stage\Match
373
     */
374 8
    public function match()
375
    {
376 8
        $stage = $this->addStage(new Stage\Match($this));
377 8
        assert($stage instanceof Stage\Match);
378 8
        return $stage;
379
    }
380
381
    /**
382
     * Returns a query expression to be used in match stages
383
     *
384
     * @return QueryExpr
385
     */
386 60
    public function matchExpr()
387
    {
388 60
        $expr = new QueryExpr($this->dm);
389 60
        $expr->setClassMetadata($this->class);
390
391 60
        return $expr;
392
    }
393
394
    /**
395
     * Takes the documents returned by the aggregation pipeline and writes them
396
     * to a specified collection. This must be the last stage in the pipeline.
397
     *
398
     * @see http://docs.mongodb.org/manual/reference/operator/aggregation/out/
399
     *
400
     * @param string $from
401
     * @return Stage\Out
402
     */
403 6
    public function out($from)
404
    {
405 6
        $stage = $this->addStage(new Stage\Out($this, $from, $this->dm));
406 5
        assert($stage instanceof Stage\Out);
407 5
        return $stage;
408
    }
409
410
    /**
411
     * Passes along the documents with only the specified fields to the next
412
     * stage in the pipeline. The specified fields can be existing fields from
413
     * the input documents or newly computed fields.
414
     *
415
     * @see http://docs.mongodb.org/manual/reference/operator/aggregation/project/
416
     *
417
     * @return Stage\Project
418
     */
419 4
    public function project()
420
    {
421 4
        $stage = $this->addStage(new Stage\Project($this));
422 4
        assert($stage instanceof Stage\Project);
423 4
        return $stage;
424
    }
425
426
    /**
427
     * Restricts the contents of the documents based on information stored in
428
     * the documents themselves.
429
     *
430
     * @see http://docs.mongodb.org/manual/reference/operator/aggregation/redact/
431
     *
432
     * @return Stage\Redact
433
     */
434 2
    public function redact()
435
    {
436 2
        $stage = $this->addStage(new Stage\Redact($this));
437 2
        assert($stage instanceof Stage\Redact);
438 2
        return $stage;
439
    }
440
441
    /**
442
     * Promotes a specified document to the top level and replaces all other
443
     * fields.
444
     *
445
     * The operation replaces all existing fields in the input document,
446
     * including the _id field. You can promote an existing embedded document to
447
     * the top level, or create a new document for promotion.
448
     *
449
     * @param string|null $expression Optional. A replacement expression that
450
     * resolves to a document.
451
     *
452
     * @return Stage\ReplaceRoot
453
     */
454 6
    public function replaceRoot($expression = null)
455
    {
456 6
        $stage = $this->addStage(new Stage\ReplaceRoot($this, $this->dm, $this->class, $expression));
457 6
        assert($stage instanceof Stage\ReplaceRoot);
458 6
        return $stage;
459
    }
460
461
    /**
462
     * Randomly selects the specified number of documents from its input.
463
     *
464
     * @see https://docs.mongodb.org/manual/reference/operator/aggregation/sample/
465
     *
466
     * @param int $size
467
     * @return Stage\Sample
468
     */
469 2
    public function sample($size)
470
    {
471 2
        $stage = $this->addStage(new Stage\Sample($this, $size));
472 2
        assert($stage instanceof Stage\Sample);
473 2
        return $stage;
474
    }
475
476
    /**
477
     * Skips over the specified number of documents that pass into the stage and
478
     * passes the remaining documents to the next stage in the pipeline.
479
     *
480
     * @see http://docs.mongodb.org/manual/reference/operator/aggregation/skip/
481
     *
482
     * @param int $skip
483
     * @return Stage\Skip
484
     */
485 2
    public function skip($skip)
486
    {
487 2
        $stage = $this->addStage(new Stage\Skip($this, $skip));
488 2
        assert($stage instanceof Stage\Skip);
489 2
        return $stage;
490
    }
491
492
    /**
493
     * Sorts all input documents and returns them to the pipeline in sorted
494
     * order.
495
     *
496
     * If sorting by multiple fields, the first argument should be an array of
497
     * field name (key) and order (value) pairs.
498
     *
499
     * @see http://docs.mongodb.org/manual/reference/operator/aggregation/sort/
500
     *
501
     * @param array|string $fieldName Field name or array of field/order pairs
502
     * @param int|string   $order     Field order (if one field is specified)
503
     * @return Stage\Sort
504
     */
505 7
    public function sort($fieldName, $order = null)
506
    {
507 7
        $fields = is_array($fieldName) ? $fieldName : [$fieldName => $order];
508
        // fixme: move to sort stage
509 7
        $stage = $this->addStage(new Stage\Sort($this, $this->getDocumentPersister()->prepareSort($fields)));
510 7
        assert($stage instanceof Stage\Sort);
511 7
        return $stage;
512
    }
513
514
    /**
515
     * Groups incoming documents based on the value of a specified expression,
516
     * then computes the count of documents in each distinct group.
517
     *
518
     * @see http://docs.mongodb.org/manual/reference/operator/aggregation/sortByCount/
519
     *
520
     * @param string $expression The expression to group by
521
     * @return Stage\SortByCount
522
     */
523 3
    public function sortByCount($expression)
524
    {
525 3
        $stage = $this->addStage(new Stage\SortByCount($this, $expression, $this->dm, $this->class));
526 3
        assert($stage instanceof Stage\SortByCount);
527 3
        return $stage;
528
    }
529
530
    /**
531
     * Deconstructs an array field from the input documents to output a document
532
     * for each element. Each output document is the input document with the
533
     * value of the array field replaced by the element.
534
     *
535
     * @see http://docs.mongodb.org/manual/reference/operator/aggregation/unwind/
536
     *
537
     * @param string $fieldName The field to unwind. It is automatically prefixed with the $ sign
538
     * @return Stage\Unwind
539
     */
540 7
    public function unwind($fieldName)
541
    {
542
        // Fixme: move field name translation to stage
543 7
        $stage = $this->addStage(new Stage\Unwind($this, $this->getDocumentPersister()->prepareFieldName($fieldName)));
544 7
        assert($stage instanceof Stage\Unwind);
545 7
        return $stage;
546
    }
547
548
    /**
549
     * @return Stage
550
     */
551 61
    protected function addStage(Stage $stage)
552
    {
553 61
        $this->stages[] = $stage;
554
555 61
        return $stage;
556
    }
557
558
    /**
559
     * Applies filters and discriminator queries to the pipeline
560
     *
561
     * @param array $query
562
     * @return array
563
     */
564 55
    private function applyFilters(array $query)
565
    {
566 55
        $documentPersister = $this->dm->getUnitOfWork()->getDocumentPersister($this->class->name);
567
568 55
        $query = $documentPersister->addDiscriminatorToPreparedQuery($query);
569 55
        $query = $documentPersister->addFilterToPreparedQuery($query);
570
571 55
        return $query;
572
    }
573
574
    /**
575
     * @return DocumentPersister
576
     */
577 11
    private function getDocumentPersister()
578
    {
579 11
        return $this->dm->getUnitOfWork()->getDocumentPersister($this->class->name);
580
    }
581
582 17
    private function prepareIterator(Cursor $cursor): Iterator
583
    {
584 17
        $class = null;
585 17
        if ($this->hydrationClass) {
586 4
            $class = $this->dm->getClassMetadata($this->hydrationClass);
587
        }
588
589 17
        if ($class) {
590 4
            $cursor = new HydratingIterator($cursor, $this->dm->getUnitOfWork(), $class);
591
        }
592
593 17
        return new CachingIterator($cursor);
594
    }
595
}
596