Pipeline::pipe()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 14
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 14
ccs 0
cts 13
cp 0
rs 9.4286
cc 2
eloc 10
nc 2
nop 1
crap 6
1
<?php
2
3
namespace tomzx\Dataflow;
4
5
use tomzx\Dataflow\Processor\Sequential;
6
7
class Pipeline {
8
	/**
9
	 * @var array
10
	 */
11
	private $stages = [];
12
13
	/**
14
	 * @param array $stages
15
	 */
16
	public function __construct(array $stages = [])
17
	{
18
		$this->stages = $stages;
19
	}
20
21
	/**
22
	 * @param callable $stage
23
	 * @return static
24
	 */
25
	public function pipe(callable $stage)
26
	{
27
		$stages = $this->stages;
28
		if ( ! empty($stages)) {
29
			end($this->stages);
30
			$lastStageIndex = key($this->stages);
31
			reset($this->stages);
32
			$stages[] = [$stage, $lastStageIndex];
33
		} else {
34
			$stages[] = [$stage];
35
		}
36
37
		return new static($stages);
38
	}
39
40
	/**
41
	 * @param mixed|null $payload
42
	 * @return mixed|null
43
	 */
44
	public function process($payload = null)
45
	{
46
		if (empty($this->stages)) {
47
			return null;
48
		}
49
50
		// Call all stages which depend only on the input
51
		$graph = new Graph($this->stages, new Sequential());
52
53
		$results = $graph->process($payload);
54
55
		end($this->stages);
56
		$lastStageIndex = key($this->stages);
57
		reset($this->stages);
58
59
		return $results->output($lastStageIndex);
60
	}
61
62
	/**
63
	 * @param mixed $payload
64
	 */
65
	public function __invoke($payload)
66
	{
67
		$this->process($payload);
68
	}
69
}
70