Pipeline   A
last analyzed

Complexity

Total Complexity 6

Size/Duplication

Total Lines 63
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 3

Test Coverage

Coverage 0%

Importance

Changes 1
Bugs 0 Features 0
Metric Value
wmc 6
c 1
b 0
f 0
lcom 1
cbo 3
dl 0
loc 63
ccs 0
cts 33
cp 0
rs 10

4 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 4 1
A pipe() 0 14 2
A process() 0 17 2
A __invoke() 0 4 1
1
<?php
2
3
namespace tomzx\Dataflow;
4
5
use tomzx\Dataflow\Processor\Sequential;
6
7
class Pipeline {
8
	/**
9
	 * @var array
10
	 */
11
	private $stages = [];
12
13
	/**
14
	 * @param array $stages
15
	 */
16
	public function __construct(array $stages = [])
17
	{
18
		$this->stages = $stages;
19
	}
20
21
	/**
22
	 * @param callable $stage
23
	 * @return static
24
	 */
25
	public function pipe(callable $stage)
26
	{
27
		$stages = $this->stages;
28
		if ( ! empty($stages)) {
29
			end($this->stages);
30
			$lastStageIndex = key($this->stages);
31
			reset($this->stages);
32
			$stages[] = [$stage, $lastStageIndex];
33
		} else {
34
			$stages[] = [$stage];
35
		}
36
37
		return new static($stages);
38
	}
39
40
	/**
41
	 * @param mixed|null $payload
42
	 * @return mixed|null
43
	 */
44
	public function process($payload = null)
45
	{
46
		if (empty($this->stages)) {
47
			return null;
48
		}
49
50
		// Call all stages which depend only on the input
51
		$graph = new Graph($this->stages, new Sequential());
52
53
		$results = $graph->process($payload);
54
55
		end($this->stages);
56
		$lastStageIndex = key($this->stages);
57
		reset($this->stages);
58
59
		return $results->output($lastStageIndex);
60
	}
61
62
	/**
63
	 * @param mixed $payload
64
	 */
65
	public function __invoke($payload)
66
	{
67
		$this->process($payload);
68
	}
69
}
70