Completed
Push — master ( 8d12ff...458df2 )
by Guillaume
02:56
created

EachPromise   B

Complexity

Total Complexity 38

Size/Duplication

Total Lines 222
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 1

Importance

Changes 0
Metric Value
dl 0
loc 222
rs 8.3999
c 0
b 0
f 0
wmc 38
lcom 1
cbo 1

8 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 16 4
A promise() 0 18 4
B createPromise() 0 29 5
D refillPending() 0 27 9
B addPending() 0 30 5
B advanceIterator() 0 24 4
A step() 0 17 4
A checkIfFinished() 0 10 3
1
<?php
2
namespace GuzzleHttp\Promise;
3
4
/**
5
 * Represents a promise that iterates over many promises and invokes
6
 * side-effect functions in the process.
7
 */
8
class EachPromise implements PromisorInterface
9
{
10
    private $pending = [];
11
12
    /** @var \Iterator */
13
    private $iterable;
14
15
    /** @var callable|int */
16
    private $concurrency;
17
18
    /** @var callable */
19
    private $onFulfilled;
20
21
    /** @var callable */
22
    private $onRejected;
23
24
    /** @var Promise */
25
    private $aggregate;
26
27
    /** @var bool */
28
    private $mutex;
29
30
    /**
31
     * Configuration hash can include the following key value pairs:
32
     *
33
     * - fulfilled: (callable) Invoked when a promise fulfills. The function
34
     *   is invoked with three arguments: the fulfillment value, the index
35
     *   position from the iterable list of the promise, and the aggregate
36
     *   promise that manages all of the promises. The aggregate promise may
37
     *   be resolved from within the callback to short-circuit the promise.
38
     * - rejected: (callable) Invoked when a promise is rejected. The
39
     *   function is invoked with three arguments: the rejection reason, the
40
     *   index position from the iterable list of the promise, and the
41
     *   aggregate promise that manages all of the promises. The aggregate
42
     *   promise may be resolved from within the callback to short-circuit
43
     *   the promise.
44
     * - concurrency: (integer) Pass this configuration option to limit the
45
     *   allowed number of outstanding concurrently executing promises,
46
     *   creating a capped pool of promises. There is no limit by default.
47
     *
48
     * @param mixed    $iterable Promises or values to iterate.
49
     * @param array    $config   Configuration options
50
     */
51
    public function __construct($iterable, array $config = [])
52
    {
53
        $this->iterable = iter_for($iterable);
54
55
        if (isset($config['concurrency'])) {
56
            $this->concurrency = $config['concurrency'];
57
        }
58
59
        if (isset($config['fulfilled'])) {
60
            $this->onFulfilled = $config['fulfilled'];
61
        }
62
63
        if (isset($config['rejected'])) {
64
            $this->onRejected = $config['rejected'];
65
        }
66
    }
67
68
    public function promise()
69
    {
70
        if ($this->aggregate) {
71
            return $this->aggregate;
72
        }
73
74
        try {
75
            $this->createPromise();
76
            $this->iterable->rewind();
77
            $this->refillPending();
78
        } catch (\Throwable $e) {
0 ignored issues
show
Bug introduced by
The class Throwable does not exist. Did you forget a USE statement, or did you not list all dependencies?

Scrutinizer analyzes your composer.json/composer.lock file if available to determine the classes, and functions that are defined by your dependencies.

It seems like the listed class was neither found in your dependencies, nor was it found in the analyzed files in your repository. If you are using some other form of dependency management, you might want to disable this analysis.

Loading history...
79
            $this->aggregate->reject($e);
80
        } catch (\Exception $e) {
81
            $this->aggregate->reject($e);
82
        }
83
84
        return $this->aggregate;
85
    }
86
87
    private function createPromise()
88
    {
89
        $this->mutex = false;
90
        $this->aggregate = new Promise(function () {
91
            reset($this->pending);
92
            if (empty($this->pending) && !$this->iterable->valid()) {
93
                $this->aggregate->resolve(null);
94
                return;
95
            }
96
97
            // Consume a potentially fluctuating list of promises while
98
            // ensuring that indexes are maintained (precluding array_shift).
99
            while ($promise = current($this->pending)) {
100
                next($this->pending);
101
                $promise->wait();
102
                if ($this->aggregate->getState() !== PromiseInterface::PENDING) {
103
                    return;
104
                }
105
            }
106
        });
107
108
        // Clear the references when the promise is resolved.
109
        $clearFn = function () {
110
            $this->iterable = $this->concurrency = $this->pending = null;
0 ignored issues
show
Documentation Bug introduced by
It seems like null of type null is incompatible with the declared type array of property $pending.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
111
            $this->onFulfilled = $this->onRejected = null;
112
        };
113
114
        $this->aggregate->then($clearFn, $clearFn);
115
    }
116
117
    private function refillPending()
118
    {
119
        if (!$this->concurrency) {
120
            // Add all pending promises.
121
            while ($this->addPending() && $this->advanceIterator());
122
            return;
123
        }
124
125
        // Add only up to N pending promises.
126
        $concurrency = is_callable($this->concurrency)
127
            ? call_user_func($this->concurrency, count($this->pending))
128
            : $this->concurrency;
129
        $concurrency = max($concurrency - count($this->pending), 0);
130
        // Concurrency may be set to 0 to disallow new promises.
131
        if (!$concurrency) {
132
            return;
133
        }
134
        // Add the first pending promise.
135
        $this->addPending();
136
        // Note this is special handling for concurrency=1 so that we do
137
        // not advance the iterator after adding the first promise. This
138
        // helps work around issues with generators that might not have the
139
        // next value to yield until promise callbacks are called.
140
        while (--$concurrency
141
            && $this->advanceIterator()
142
            && $this->addPending());
143
    }
144
145
    private function addPending()
146
    {
147
        if (!$this->iterable || !$this->iterable->valid()) {
148
            return false;
149
        }
150
151
        $promise = promise_for($this->iterable->current());
152
        $idx = $this->iterable->key();
153
154
        $this->pending[$idx] = $promise->then(
155
            function ($value) use ($idx) {
156
                if ($this->onFulfilled) {
157
                    call_user_func(
158
                        $this->onFulfilled, $value, $idx, $this->aggregate
159
                    );
160
                }
161
                $this->step($idx);
162
            },
163
            function ($reason) use ($idx) {
164
                if ($this->onRejected) {
165
                    call_user_func(
166
                        $this->onRejected, $reason, $idx, $this->aggregate
167
                    );
168
                }
169
                $this->step($idx);
170
            }
171
        );
172
173
        return true;
174
    }
175
176
    private function advanceIterator()
177
    {
178
        // Place a lock on the iterator so that we ensure to not recurse,
179
        // preventing fatal generator errors.
180
        if ($this->mutex) {
181
            return false;
182
        }
183
184
        $this->mutex = true;
185
186
        try {
187
            $this->iterable->next();
188
            $this->mutex = false;
189
            return true;
190
        } catch (\Throwable $e) {
0 ignored issues
show
Bug introduced by
The class Throwable does not exist. Did you forget a USE statement, or did you not list all dependencies?

Scrutinizer analyzes your composer.json/composer.lock file if available to determine the classes, and functions that are defined by your dependencies.

It seems like the listed class was neither found in your dependencies, nor was it found in the analyzed files in your repository. If you are using some other form of dependency management, you might want to disable this analysis.

Loading history...
191
            $this->aggregate->reject($e);
192
            $this->mutex = false;
193
            return false;
194
        } catch (\Exception $e) {
195
            $this->aggregate->reject($e);
196
            $this->mutex = false;
197
            return false;
198
        }
199
    }
200
201
    private function step($idx)
202
    {
203
        // If the promise was already resolved, then ignore this step.
204
        if ($this->aggregate->getState() !== PromiseInterface::PENDING) {
205
            return;
206
        }
207
208
        unset($this->pending[$idx]);
209
210
        // Only refill pending promises if we are not locked, preventing the
211
        // EachPromise to recursively invoke the provided iterator, which
212
        // cause a fatal error: "Cannot resume an already running generator"
213
        if ($this->advanceIterator() && !$this->checkIfFinished()) {
214
            // Add more pending promises if possible.
215
            $this->refillPending();
216
        }
217
    }
218
219
    private function checkIfFinished()
220
    {
221
        if (!$this->pending && !$this->iterable->valid()) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->pending of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
222
            // Resolve the promise if there's nothing left to do.
223
            $this->aggregate->resolve(null);
224
            return true;
225
        }
226
227
        return false;
228
    }
229
}
230