Completed
Push — master ( 626563...1e5a44 )
by Gabriel
02:16
created

StreamHandler   A

Complexity

Total Complexity 26

Size/Duplication

Total Lines 184
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 4
Metric Value
wmc 26
lcom 1
cbo 4
dl 0
loc 184
rs 10

9 Methods

Rating   Name   Duplication   Size   Complexity  
B src() 0 20 5
B forStream() 0 22 4
A forStreams() 0 18 2
A getCallback() 0 12 4
A push() 0 4 1
A temp() 0 5 1
A toDir() 0 4 1
A end() 0 13 3
B recoursiveGlob() 0 25 5
1
<?php 
2
/**
3
 * Junty
4
 *
5
 * @author Gabriel Jacinto aka. GabrielJMJ <[email protected]>
6
 * @license MIT License
7
 */
8
9
namespace Junty\Stream;
10
11
use Psr\Http\Message\StreamInterface;
12
use GuzzleHttp\Psr7;
13
use Junty\Stream\Stream;
14
use Junty\Plugin\PluginInterface;
15
use Junty\ToDir\ToDirPlugin;
16
17
class StreamHandler
18
{
19
    private $globs = [];
20
21
    private $toPush = [];
22
23
    private $temp = [];
24
25
    /**
26
     * Provides streams by the pattern passed
27
     *
28
     * @param string|array $accept
29
     *
30
     * @return self
31
     */
32
    public function src($accept) : self
33
    {
34
        if (!is_string($accept) && !is_array($accept)) {
35
            throw new InvalidArgumentException('You can only pass a string pattern or array with patterns');
36
        }
37
38
        if (is_array($accept)) {
39
            $fileGroups = [];
40
41
            foreach ($accept as $pattern) {
42
                $fileGroups[] = $this->recoursiveGlob($pattern, GLOB_ERR);
43
            }
44
45
            $this->globs = call_user_func_array('array_merge', $fileGroups);
0 ignored issues
show
Documentation Bug introduced by
It seems like call_user_func_array('array_merge', $fileGroups) of type * is incompatible with the declared type array of property $globs.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
46
        } else {
47
            $this->globs = $this->recoursiveGlob($accept, GLOB_ERR);
48
        }
49
50
        return $this;
51
    }
52
53
    /**
54
     * Handle each stream
55
     *
56
     * @param callable|PluginInterface $callback
57
     *
58
     * @return self
59
     */
60
    public function forStream($callback) : self
61
    {
62
        $cb = \Closure::bind($this->getCallback($callback), $this);
63
64
        if (count($this->toPush)) {
65
            foreach ($this->toPush as $stream) {
66
                $cb($stream);
67
            }
68
69
            return $this;
70
        }
71
72
        $streams = array_map(function ($file) {
73
            return new Stream(fopen($file, 'r+'));
74
        }, $this->globs);
75
76
        foreach ($streams as $stream) {
77
            $cb($stream);
78
        }
79
80
        return $this;
81
    }
82
83
    /**
84
     * Handle all streams
85
     *
86
     * @param callable|PluginInterface $callback
87
     *
88
     * @return self
89
     */
90
    public function forStreams($callback) : self
91
    {
92
        $cb = \Closure::bind($this->getCallback($callback), $this);
93
94
        if (count($this->toPush)) {
95
            $cb($this->toPush);
96
97
            return $this;
98
        }
99
100
        $streams = array_map(function ($file) {
101
            return new Stream(fopen($file, 'r+'));
102
        }, $this->globs);
103
104
        $cb($streams);
105
106
        return $this;
107
    }
108
109
    private function getCallback($cb) : callable
110
    {
111
        if (!($cb instanceof PluginInterface) && !is_callable($cb)) {
112
            throw new \InvalidArgumentException('Invalid callback type: ' + gettype($cb));
113
        }
114
115
        if ($cb instanceof PluginInterface) {
116
            return $cb->getCallback();
117
        }
118
119
        return $cb;
120
    }
121
122
    /**
123
     * Pushes a stream to be used on destination
124
     *
125
     * @param StreamInterface $stream
126
     */
127
    public function push(StreamInterface $stream)
128
    {
129
        $this->toPush[] = $stream;
130
    }
131
132
    /**
133
     * Creates a temporary stream
134
     * It will be deleted in the end of task execution
135
     *
136
     * @param StreamInterface $stream
137
     */
138
    public function temp(StreamInterface $stream)
139
    {
140
        $this->push($stream);
141
        $this->temp[] = $stream->getMetadata('uri');
142
    }
143
144
    /**
145
     * Sends pushed streams to a directory (plugin)
146
     *
147
     * @param string $dest
148
     *
149
     * @return callable
150
     */
151
    public function toDir(string $dest) : ToDirPlugin
152
    {
153
        return new ToDirPlugin($dest);
154
    }
155
156
    /**
157
     * Cleans up pushed streams and delete indicated temporary files
158
     *
159
     * @return self
160
     */
161
    public function end() : self
162
    {
163
        foreach ($this->toPush as $stream) {
164
            $stream->close();
165
        }
166
167
        foreach ($this->temp as $tempf) {
168
            unlink($tempf);
169
        }
170
171
        $this->toPush = [];
172
        return $this;
173
    }
174
175
    private function recoursiveGlob($pattern, $flags = 0) : array
176
    {
177
        $globs = glob($pattern, $flags);
178
        $hasDir = false;
179
180
        foreach ($globs as $glob) {
181
            if (is_dir($glob)) {
182
                $hasDir = true;
183
            }
184
        }
185
186
        if (!$hasDir) {
187
            return $globs;
188
        }
189
190
        foreach (glob(dirname($pattern).'/*', GLOB_ONLYDIR|GLOB_NOSORT) as $dir) {
191
            $globs = array_merge($globs, $this->recoursiveGlob($dir.'/'.basename($pattern), $flags));
192
        }
193
194
        $globs = array_filter($globs, function ($res) {
195
            return !is_dir($res);
196
        });
197
198
        return $globs;
199
    }
200
}