Pipeline::__construct()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 5
ccs 0
cts 5
cp 0
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 1
crap 2
1
<?php
2
3
/**
4
 * @copyright  Copyright (c) Flipbox Digital Limited
5
 * @license    https://github.com/flipbox/pipeline/blob/master/LICENSE.md
6
 * @link       https://github.com/flipbox/pipeline
7
 */
8
9
namespace Flipbox\Pipeline\Pipelines;
10
11
use Flipbox\Pipeline\Processors\Processor;
12
use Flipbox\Skeleton\Helpers\ArrayHelper;
13
use Flipbox\Skeleton\Helpers\ObjectHelper;
14
use Flipbox\Skeleton\Object\AbstractObject;
15
use League\Pipeline\PipelineInterface;
16
use League\Pipeline\ProcessorInterface;
17
use League\Pipeline\StageInterface;
18
use Psr\Log\InvalidArgumentException;
19
20
/**
21
 * An extensible pipeline
22
 *
23
 * @author Flipbox Digital <[email protected]>
24
 * @since 1.0.0
25
 */
26
class Pipeline extends AbstractObject implements PipelineInterface
27
{
28
    /**
29
     * @var StageInterface[]|callable[]
30
     */
31
    protected $stages = [];
32
33
    /**
34
     * @var ProcessorInterface|null
35
     */
36
    protected $processor = Processor::class;
37
38
    /**
39
     * @inheritdoc
40
     */
41
    public function __construct($config = [])
42
    {
43
        $this->stages = ArrayHelper::remove($config, 'stages', []);
44
        parent::__construct($config);
45
    }
46
47
    /**
48
     * @return callable[]|StageInterface[]|mixed|null
49
     */
50
    protected function getStages()
51
    {
52
        foreach ($this->stages as $key => $stage) {
53
            $this->stages[$key] = $this->resolveStage($stage);
54
        }
55
56
        return $this->stages;
57
    }
58
59
    /**
60
     * @param $stage
61
     * @return callable
62
     * @throws InvalidArgumentException
63
     */
64
    protected function resolveStage($stage): callable
65
    {
66
        if (is_callable($stage)) {
67
            return $stage;
68
        }
69
70
        try {
71
            if (!is_object($stage)) {
72
                return $this->resolveStage(
73
                    ObjectHelper::create($stage)
74
                );
75
            }
76
        } catch (\Exception $e) {
77
            throw new InvalidArgumentException($e->getMessage());
78
        }
79
80
        throw new InvalidArgumentException('All stages should be callable.');
81
    }
82
83
    /**
84
     * @return ProcessorInterface
85
     * @throws InvalidArgumentException
86
     */
87
    public function getProcessor(): ProcessorInterface
88
    {
89
        if (!$this->processor instanceof ProcessorInterface) {
90
            $this->processor = $this->resolveProcessor($this->processor);
91
        }
92
93
        return $this->processor;
94
    }
95
96
    /**
97
     * @param $processor
98
     * @return $this
99
     */
100
    public function setProcessor($processor)
101
    {
102
        $this->processor = $processor;
103
        return $this;
104
    }
105
106
    /**
107
     * @param $processor
108
     * @return ProcessorInterface
109
     * @throws InvalidArgumentException
110
     */
111
    protected function resolveProcessor($processor): ProcessorInterface
112
    {
113
        if ($processor instanceof ProcessorInterface) {
114
            return $processor;
115
        }
116
117
        try {
118
            if (!is_object($processor)) {
119
                return $this->resolveProcessor(
120
                    ObjectHelper::create($processor)
121
                );
122
            }
123
        } catch (\Exception $e) {
124
            throw new InvalidArgumentException($e->getMessage());
125
        }
126
127
        throw new InvalidArgumentException(sprintf(
128
            "Processor must be an instance of '%s'.",
129
            ProcessorInterface::class
130
        ));
131
    }
132
133
    /**
134
     * @inheritdoc
135
     */
136
    public function pipe(callable $stage)
137
    {
138
        $pipeline = clone $this;
139
        $pipeline->stages[] = $stage;
140
141
        return $pipeline;
142
    }
143
144
    /**
145
     * @param mixed $payload
146
     * @param array $extra
147
     * @return mixed
148
     * @throws InvalidArgumentException
149
     */
150
    public function process($payload, $extra = [])
151
    {
152
        if (!is_array($extra)) {
153
            $extra = ['source' => $extra];
154
        }
155
156
        return $this->getProcessor()->process($this->getStages(), $payload, $extra);
157
    }
158
159
    /**
160
     * @inheritdoc
161
     */
162
    public function __invoke($payload, $extra = [])
163
    {
164
        return $this->process($payload, $extra);
165
    }
166
}
167