Passed
Pull Request — master (#24)
by Igor
02:46
created

ParallelStream::findAny()   A

Complexity

Conditions 4
Paths 4

Size

Total Lines 16
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 20

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 10
c 1
b 0
f 0
dl 0
loc 16
ccs 0
cts 11
cp 0
rs 9.9332
cc 4
nc 4
nop 0
crap 20
1
<?php
2
/**
3
 * @author Igor Pomiluyko
4
 */
5
6
namespace WS\Utils\Collections;
7
8
use Amp\Parallel\Worker\DefaultPool;
9
use Amp\Parallel\Worker\Pool;
10
use RuntimeException;
11
12
use function Amp\ParallelFunctions\parallel;
13
use function Amp\ParallelFunctions\parallelFilter;
14
use function Amp\ParallelFunctions\parallelMap;
15
use function Amp\Promise\all;
16
use function Amp\Promise\wait;
17
18
class ParallelStream implements Stream
19
{
20
    /**
21
     * @var ListSequence
22
     */
23
    private $list;
24
    /**
25
     * @var Pool|null
26
     */
27
    private $workersPool;
28
29 10
    public function __construct(Collection $collection, $workersPool = null)
30
    {
31 10
        if ($collection instanceof ListSequence) {
32 10
            $this->list = $collection->copy();
33
        } else {
34
            $this->list = $this->emptyList();
35
            $this->list->addAll($collection->toArray());
36
        }
37
38 10
        if ($workersPool instanceof Pool) {
39
            $this->workersPool = $workersPool;
40
        }
41 10
        if (is_int($workersPool)) {
42 8
            $this->workersPool = new DefaultPool($workersPool);
43
        }
44 10
    }
45
46
    /**
47
     * @inheritDoc
48
     */
49
    public function each(callable $consumer): Stream
50
    {
51
        $i = 0;
52
        $promises = [];
53
        foreach ($this->list as $item) {
54
            $promises[] = parallel($consumer, $this->workersPool)($item, $i++);
55
        }
56
        wait(all($promises));
57
58
        return $this;
59
    }
60
61
    /**
62
     * @inheritDoc
63
     */
64 6
    public function filter(callable $predicate, $workers = null): Stream
65
    {
66 6
        $result = parallelFilter($this->list->toArray(), $predicate, 0, $this->workersPool);
67 6
        $this->list = $this->emptyList();
68 6
        $this->list->addAll(wait($result));
69
70 6
        return $this;
71
    }
72
73
    public function reorganize(callable $reorganizer): Stream
74
    {
75
        $reorganizedCollection = $reorganizer($this->list->copy());
76
        if (!$reorganizedCollection instanceof Collection) {
77
            throw new RuntimeException('Result set of reorganizer call must be instance of Collection interface');
78
        }
79
        $this->list = $reorganizedCollection;
80
81
        return $this;
82
    }
83
84
    /**
85
     * @inheritDoc
86
     */
87
    public function allMatch(callable $predicate): bool
88
    {
89
        foreach ($this->list as $item) {
90
            if (!$predicate($item)) {
91
                return false;
92
            }
93
        }
94
        return true;
95
    }
96
97
    /**
98
     * @inheritDoc
99
     */
100
    public function anyMatch(callable $predicate): bool
101
    {
102
        foreach ($this->list as $item) {
103
            if ($predicate($item)) {
104
                return true;
105
            }
106
        }
107
108
        return false;
109
    }
110
111
    /**
112
     * @inheritDoc
113
     */
114 4
    public function map(callable $converter): Stream
115
    {
116 4
        $result = parallelMap($this->list->toArray(), $converter, $this->workersPool);
117 4
        $this->list = $this->emptyList();
118 4
        $this->list->addAll(wait($result));
119
120 4
        return $this;
121
    }
122
123
    /**
124
     * @inheritDoc
125
     */
126
    public function sort(callable $comparator): Stream
127
    {
128
        $collection = $this->getCollection();
129
        $this->list = $this->emptyList();
130
131
        $array = $collection->toArray();
132
        usort($array, $comparator);
133
        foreach ($array as $item) {
134
            $this->list->add($item);
135
        }
136
137
        return $this;
138
    }
139
140
    public function sortBy(callable $extractor): Stream
141
    {
142
        $values = [];
143
        $map = [];
144
        foreach ($this->list as $item) {
145
            $value = $extractor($item);
146
            if (!is_scalar($value)) {
147
                throw new RuntimeException('Only scalar value can be as result of sort extractor');
148
            }
149
            $values[] = $value;
150
            $map[$value . ''][] = $item;
151
        }
152
153
        sort($values);
154
        $newList = $this->emptyList();
155
        foreach ($values as $value) {
156
            $els = $map[$value] ?? [];
157
            $newList->addAll($els);
158
        }
159
        $this->list = $newList;
160
161
        return $this;
162
    }
163
164
    public function sortByDesc(callable $extractor): Stream
165
    {
166
        $this->sortBy($extractor)
167
            ->reverse();
168
169
        return $this;
170
    }
171
172
    /**
173
     * @inheritDoc
174
     */
175
    public function sortDesc(callable $comparator): Stream
176
    {
177
        $this->sort($comparator)
178
            ->reverse();
179
180
        return $this;
181
    }
182
183
    public function reverse(): Stream
184
    {
185
        $array = $this->list->toArray();
186
        $reversedArray = array_reverse($array);
187
        $this->list = $this->emptyList();
188
        $this->list->addAll($reversedArray);
189
        return $this;
190
    }
191
192
    /**
193
     * @inheritDoc
194
     */
195
    public function collect(callable $collector)
196
    {
197
        return $collector($this->getCollection()->copy());
198
    }
199
200
    /**
201
     * @inheritDoc
202
     */
203
    public function findAny()
204
    {
205
        $size = $this->list->size();
206
        if ($size === 0) {
207
            return null;
208
        }
209
        /** @noinspection PhpUnhandledExceptionInspection */
210
        $rIndex = random_int(0, $size - 1);
211
        $pointer = 0;
212
        $item = null;
213
        foreach ($this->list as $item) {
214
            if ($rIndex === $pointer++) {
215
                break;
216
            }
217
        }
218
        return $item;
219
    }
220
221
    /**
222
     * @inheritDoc
223
     */
224
    public function findFirst()
225
    {
226
        /** @noinspection LoopWhichDoesNotLoopInspection */
227
        foreach ($this->list as $item) {
228
            return $item;
229
        }
230
        return null;
231
    }
232
233
    /**
234
     * @inheritDoc
235
     */
236
    public function min(callable $comparator)
237
    {
238
        $collection = $this->getCollection();
239
        if ($collection->size() === 0) {
240
            return null;
241
        }
242
243
        $array = $collection->toArray();
244
245
        $el = array_shift($array);
246
247
        foreach ($array as $item) {
248
            if ($comparator($item, $el) < 0) {
249
                $el = $item;
250
            }
251
        }
252
253
        return $el;
254
    }
255
256
    /**
257
     * @inheritDoc
258
     */
259
    public function max(callable $comparator)
260
    {
261
        $collection = $this->getCollection();
262
        if ($collection->size() === 0) {
263
            return null;
264
        }
265
266
        $array = $collection->toArray();
267
        $el = null;
268
269
        foreach ($array as $item) {
270
            if ($comparator($item, $el) > 0) {
271
                $el = $item;
272
            }
273
        }
274
275
        return $el;
276
    }
277
278
    /**
279
     * @inheritDoc
280
     */
281
    public function reduce(callable $accumulator, $initialValue = null)
282
    {
283
        $accumulate = $initialValue;
284
        foreach ($this->list as $item) {
285
            $accumulate = $accumulator($item, $accumulate);
286
        }
287
        return $accumulate;
288
    }
289
290 10
    public function getCollection(): Collection
291
    {
292 10
        return $this->list->copy();
293
    }
294
295 10
    private function emptyList(): Collection
296
    {
297 10
        return ArrayList::of();
298
    }
299
300
    public function findLast()
301
    {
302
        $array = $this->list->toArray();
303
        return array_pop($array);
304
    }
305
306
    public function walk(callable $consumer, ?int $limit = null): Stream
307
    {
308
        $iterationsCount = $limit ?? $this->list->size();
309
        foreach ($this->list as $i => $item) {
310
            $consumerRes = $consumer($item, $i);
311
            if ($consumerRes === false) {
312
                break;
313
            }
314
            if ($i + 1 >= $iterationsCount) {
315
                break;
316
            }
317
        }
318
319
        return $this;
320
    }
321
322
    public function limit(int $size): Stream
323
    {
324
        $newCollection = $this->emptyList();
325
        $this->walk(
326
            static function ($el) use ($newCollection) {
327
                $newCollection->add($el);
328
            },
329
            $size
330
        );
331
332
        $this->list = $newCollection;
333
        return $this;
334
    }
335
336
    public function when(bool $condition): Stream
337
    {
338
        if (!$condition) {
339
            return new DummyStreamDecorator($this);
340
        }
341
342
        return $this;
343
    }
344
345
    public function always(): Stream
346
    {
347
        return $this;
348
    }
349
}
350