Passed
Pull Request — master (#24)
by Igor
02:35
created

ParallelStream::emptyList()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 1
c 1
b 0
f 0
dl 0
loc 3
ccs 2
cts 2
cp 1
rs 10
cc 1
nc 1
nop 0
crap 1
1
<?php
2
/**
3
 * @author Igor Pomiluyko
4
 */
5
6
namespace WS\Utils\Collections;
7
8
use Amp\Parallel\Worker\DefaultPool;
9
use Amp\Parallel\Worker\Pool;
10
use RuntimeException;
11
12
use function Amp\ParallelFunctions\parallel;
13
use function Amp\ParallelFunctions\parallelFilter;
14
use function Amp\ParallelFunctions\parallelMap;
15
use function Amp\Promise\all;
16
use function Amp\Promise\wait;
17
18
class ParallelStream implements Stream
19
{
20
    /**
21
     * @var ListSequence
22
     */
23
    private $list;
24
    /**
25
     * @var Pool|null
26
     */
27
    private $workersPool;
28
29 10
    public function __construct(Collection $collection, $workersPool = null)
30
    {
31 10
        if ($collection instanceof ListSequence) {
32 10
            $this->list = $collection->copy();
33
        } else {
34
            $this->list = $this->emptyList();
35
            $this->list->addAll($collection->toArray());
36
        }
37
38 10
        if ($workersPool instanceof Pool) {
39
            $this->workersPool = $workersPool;
40
        }
41 10
        if (is_int($workersPool)) {
42 8
            $this->workersPool = new DefaultPool($workersPool);
43
        }
44 10
    }
45
46
    /**
47
     * @inheritDoc
48
     */
49
    public function each(callable $consumer): Stream
50
    {
51
        $i = 0;
52
        $promises = [];
53
        foreach ($this->list as $item) {
54
            $promises[] = parallel($consumer, $this->workersPool)($item, $i++);
55
        }
56
        wait(all($promises));
57
58
        return $this;
59
    }
60
61
    /**
62
     * @inheritDoc
63
     */
64 6
    public function filter(callable $predicate, $workers = null): Stream
65
    {
66 6
        $result = parallelFilter($this->list->toArray(), $predicate, 0, $this->workersPool);
67 6
        $this->list = $this->emptyList();
68 6
        $this->list->addAll(wait($result));
69
70 6
        return $this;
71
    }
72
73
    public function reorganize(callable $reorganizer): Stream
74
    {
75
        $reorganizedCollection = $reorganizer($this->list->copy());
76
        if (!$reorganizedCollection instanceof Collection) {
77
            throw new RuntimeException('Result set of reorganizer call must be instance of Collection interface');
78
        }
79
        $this->list = $reorganizedCollection;
80
81
        return $this;
82
    }
83
84
    /**
85
     * @inheritDoc
86
     */
87
    public function allMatch(callable $predicate): bool
88
    {
89
        foreach ($this->list as $item) {
90
            if (!$predicate($item)) {
91
                return false;
92
            }
93
        }
94
        return true;
95
    }
96
97
    /**
98
     * @inheritDoc
99
     */
100
    public function anyMatch(callable $predicate): bool
101
    {
102
        foreach ($this->list as $item) {
103
            if ($predicate($item)) {
104
                return true;
105
            }
106
        }
107
108
        return false;
109
    }
110
111
    /**
112
     * @inheritDoc
113
     */
114 4
    public function map(callable $converter): Stream
115
    {
116 4
        $result = parallelMap($this->list->toArray(), $converter, $this->workersPool);
117 4
        $this->list = $this->emptyList();
118 4
        $this->list->addAll(wait($result));
119
120 4
        return $this;
121
    }
122
123
    /**
124
     * @inheritDoc
125
     */
126
    public function sort(callable $comparator): Stream
127
    {
128
        $collection = $this->getCollection();
129
        $this->list = $this->emptyList();
130
131
        $array = $collection->toArray();
132
        usort($array, $comparator);
133
        foreach ($array as $item) {
134
            $this->list->add($item);
135
        }
136
137
        return $this;
138
    }
139
140
    public function sortBy(callable $extractor): Stream
141
    {
142
        $values = [];
143
        $map = [];
144
        $this->each(
145
            static function ($el) use ($extractor, & $map, & $values) {
146
                $value = $extractor($el);
147
                if (!is_scalar($value)) {
148
                    throw new RuntimeException('Only scalar value can be as result of sort extractor');
149
                }
150
                $values[] = $value;
151
                $map[$value . ''][] = $el;
152
            }
153
        );
154
        sort($values);
155
        $newList = $this->emptyList();
156
        foreach ($values as $value) {
157
            $els = $map[$value] ?? [];
158
            $newList->addAll($els);
159
        }
160
        $this->list = $newList;
161
162
        return $this;
163
    }
164
165
    public function sortByDesc(callable $extractor): Stream
166
    {
167
        $this->sortBy($extractor)
168
            ->reverse();
169
170
        return $this;
171
    }
172
173
    /**
174
     * @inheritDoc
175
     */
176
    public function sortDesc(callable $comparator): Stream
177
    {
178
        $this->sort($comparator)
179
            ->reverse();
180
181
        return $this;
182
    }
183
184
    public function reverse(): Stream
185
    {
186
        $array = $this->list->toArray();
187
        $reversedArray = array_reverse($array);
188
        $this->list = $this->emptyList();
189
        $this->list->addAll($reversedArray);
190
        return $this;
191
    }
192
193
    /**
194
     * @inheritDoc
195
     */
196
    public function collect(callable $collector)
197
    {
198
        return $collector($this->getCollection()->copy());
199
    }
200
201
    /**
202
     * @inheritDoc
203
     */
204
    public function findAny()
205
    {
206
        $size = $this->list->size();
207
        if ($size === 0) {
208
            return null;
209
        }
210
        /** @noinspection PhpUnhandledExceptionInspection */
211
        $rIndex = random_int(0, $size - 1);
212
        $pointer = 0;
213
        $item = null;
214
        foreach ($this->list as $item) {
215
            if ($rIndex === $pointer++) {
216
                break;
217
            }
218
        }
219
        return $item;
220
    }
221
222
    /**
223
     * @inheritDoc
224
     */
225
    public function findFirst()
226
    {
227
        /** @noinspection LoopWhichDoesNotLoopInspection */
228
        foreach ($this->list as $item) {
229
            return $item;
230
        }
231
        return null;
232
    }
233
234
    /**
235
     * @inheritDoc
236
     */
237
    public function min(callable $comparator)
238
    {
239
        $collection = $this->getCollection();
240
        if ($collection->size() === 0) {
241
            return null;
242
        }
243
244
        $array = $collection->toArray();
245
246
        $el = array_shift($array);
247
248
        foreach ($array as $item) {
249
            if ($comparator($item, $el) < 0) {
250
                $el = $item;
251
            }
252
        }
253
254
        return $el;
255
    }
256
257
    /**
258
     * @inheritDoc
259
     */
260
    public function max(callable $comparator)
261
    {
262
        $collection = $this->getCollection();
263
        if ($collection->size() === 0) {
264
            return null;
265
        }
266
267
        $array = $collection->toArray();
268
        $el = null;
269
270
        foreach ($array as $item) {
271
            if ($comparator($item, $el) > 0) {
272
                $el = $item;
273
            }
274
        }
275
276
        return $el;
277
    }
278
279
    /**
280
     * @inheritDoc
281
     */
282
    public function reduce(callable $accumulator, $initialValue = null)
283
    {
284
        $accumulate = $initialValue;
285
        foreach ($this->list as $item) {
286
            $accumulate = $accumulator($item, $accumulate);
287
        }
288
        return $accumulate;
289
    }
290
291 10
    public function getCollection(): Collection
292
    {
293 10
        return $this->list->copy();
294
    }
295
296 10
    private function emptyList(): Collection
297
    {
298 10
        return ArrayList::of();
299
    }
300
301
    public function findLast()
302
    {
303
        $array = $this->list->toArray();
304
        return array_pop($array);
305
    }
306
307
    public function walk(callable $consumer, ?int $limit = null): Stream
308
    {
309
        $iterationsCount = $limit ?? $this->list->size();
310
        foreach ($this->list as $i => $item) {
311
            $consumerRes = $consumer($item, $i);
312
            if ($consumerRes === false) {
313
                break;
314
            }
315
            if ($i + 1 >= $iterationsCount) {
316
                break;
317
            }
318
        }
319
320
        return $this;
321
    }
322
323
    public function limit(int $size): Stream
324
    {
325
        $newCollection = $this->emptyList();
326
        $this->walk(
327
            static function ($el) use ($newCollection) {
328
                $newCollection->add($el);
329
            },
330
            $size
331
        );
332
333
        $this->list = $newCollection;
334
        return $this;
335
    }
336
337
    public function when(bool $condition): Stream
338
    {
339
        if (!$condition) {
340
            return new DummyStreamDecorator($this);
341
        }
342
343
        return $this;
344
    }
345
346
    public function always(): Stream
347
    {
348
        return $this;
349
    }
350
}
351