Completed
Push — master ( 1e5a44...44a0d0 )
by Gabriel
02:15
created

StreamHandler   A

Complexity

Total Complexity 35

Size/Duplication

Total Lines 208
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 4
Metric Value
wmc 35
lcom 1
cbo 4
dl 0
loc 208
rs 9

9 Methods

Rating   Name   Duplication   Size   Complexity  
C src() 0 43 14
B forStream() 0 22 4
A forStreams() 0 18 2
A getCallback() 0 12 4
A push() 0 4 1
A temp() 0 5 1
A toDir() 0 4 1
A end() 0 13 3
B recoursiveGlob() 0 25 5
1
<?php 
2
/**
3
 * Junty
4
 *
5
 * @author Gabriel Jacinto aka. GabrielJMJ <[email protected]>
6
 * @license MIT License
7
 */
8
9
namespace Junty\Stream;
10
11
use Psr\Http\Message\StreamInterface;
12
use GuzzleHttp\Psr7;
13
use Junty\Stream\Stream;
14
use Junty\Plugin\PluginInterface;
15
use Junty\ToDir\ToDirPlugin;
16
17
class StreamHandler
18
{
19
    private $globs = [];
20
21
    private $toPush = [];
22
23
    private $temp = [];
24
25
    /**
26
     * Provides streams by the pattern passed
27
     *
28
     * @param string|array      $accept
29
     * @param string|array|null $exclude
30
     *
31
     * @return self
32
     */
33
    public function src($accept, $exclude = null) : self
34
    {
35
        if ((!is_string($accept) && !is_array($accept)) || ($exclude !== null && !is_string($exclude) && !is_array($exclude))) {
36
            throw new \InvalidArgumentException('You can only pass a string pattern or array with patterns');
37
        }
38
39
        if (is_array($accept)) {
40
            $fileGroups = [];
41
42
            foreach ($accept as $pattern) {
43
                $fileGroups[] = $this->recoursiveGlob($pattern, GLOB_ERR);
44
            }
45
46
            $this->globs = call_user_func_array('array_merge', $fileGroups);
0 ignored issues
show
Documentation Bug introduced by
It seems like call_user_func_array('array_merge', $fileGroups) of type * is incompatible with the declared type array of property $globs.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
47
        } else {
48
            $this->globs = $this->recoursiveGlob($accept, GLOB_ERR);
49
        }
50
51
        if ($exclude !== null) {
52
            $cbFilter = function () {return true;};
53
54
            if (is_array($exclude) && count($exclude) > 0) {
55
                $cbFilter = function ($glob) use ($exclude) {
56
                    foreach ($exclude as $pattern) {
57
                        if (preg_match($pattern, $glob)) {
58
                            return false;
59
                        }
60
                    }
61
62
                    return true;
63
                };echo 'a';
64
            } elseif (is_string($exclude)) {
65
                echo 'a';
66
                $cbFilter = function ($glob) use ($exclude) {
67
                    return !preg_match($exclude, $glob);
68
                };
69
            }
70
71
            $this->globs = array_filter($this->globs, $cbFilter);
72
        }
73
74
        return $this;
75
    }
76
77
    /**
78
     * Handle each stream
79
     *
80
     * @param callable|PluginInterface $callback
81
     *
82
     * @return self
83
     */
84
    public function forStream($callback) : self
85
    {
86
        $cb = \Closure::bind($this->getCallback($callback), $this);
87
88
        if (count($this->toPush)) {
89
            foreach ($this->toPush as $stream) {
90
                $cb($stream);
91
            }
92
93
            return $this;
94
        }
95
96
        $streams = array_map(function ($file) {
97
            return new Stream(fopen($file, 'r+'));
98
        }, $this->globs);
99
100
        foreach ($streams as $stream) {
101
            $cb($stream);
102
        }
103
104
        return $this;
105
    }
106
107
    /**
108
     * Handle all streams
109
     *
110
     * @param callable|PluginInterface $callback
111
     *
112
     * @return self
113
     */
114
    public function forStreams($callback) : self
115
    {
116
        $cb = \Closure::bind($this->getCallback($callback), $this);
117
118
        if (count($this->toPush)) {
119
            $cb($this->toPush);
120
121
            return $this;
122
        }
123
124
        $streams = array_map(function ($file) {
125
            return new Stream(fopen($file, 'r+'));
126
        }, $this->globs);
127
128
        $cb($streams);
129
130
        return $this;
131
    }
132
133
    private function getCallback($cb) : callable
134
    {
135
        if (!($cb instanceof PluginInterface) && !is_callable($cb)) {
136
            throw new \InvalidArgumentException('Invalid callback type: ' + gettype($cb));
137
        }
138
139
        if ($cb instanceof PluginInterface) {
140
            return $cb->getCallback();
141
        }
142
143
        return $cb;
144
    }
145
146
    /**
147
     * Pushes a stream to be used on destination
148
     *
149
     * @param StreamInterface $stream
150
     */
151
    public function push(StreamInterface $stream)
152
    {
153
        $this->toPush[] = $stream;
154
    }
155
156
    /**
157
     * Creates a temporary stream
158
     * It will be deleted in the end of task execution
159
     *
160
     * @param StreamInterface $stream
161
     */
162
    public function temp(StreamInterface $stream)
163
    {
164
        $this->push($stream);
165
        $this->temp[] = $stream->getMetadata('uri');
166
    }
167
168
    /**
169
     * Sends pushed streams to a directory (plugin)
170
     *
171
     * @param string $dest
172
     *
173
     * @return callable
174
     */
175
    public function toDir(string $dest) : ToDirPlugin
176
    {
177
        return new ToDirPlugin($dest);
178
    }
179
180
    /**
181
     * Cleans up pushed streams and delete indicated temporary files
182
     *
183
     * @return self
184
     */
185
    public function end() : self
186
    {
187
        foreach ($this->toPush as $stream) {
188
            $stream->close();
189
        }
190
191
        foreach ($this->temp as $tempf) {
192
            unlink($tempf);
193
        }
194
195
        $this->toPush = [];
196
        return $this;
197
    }
198
199
    private function recoursiveGlob($pattern, $flags = 0) : array
200
    {
201
        $globs = glob($pattern, $flags);
202
        $hasDir = false;
203
204
        foreach ($globs as $glob) {
205
            if (is_dir($glob)) {
206
                $hasDir = true;
207
            }
208
        }
209
210
        if (!$hasDir) {
211
            return $globs;
212
        }
213
214
        foreach (glob(dirname($pattern).'/*', GLOB_ONLYDIR|GLOB_NOSORT) as $dir) {
215
            $globs = array_merge($globs, $this->recoursiveGlob($dir.'/'.basename($pattern), $flags));
216
        }
217
218
        $globs = array_filter($globs, function ($res) {
219
            return !is_dir($res);
220
        });
221
222
        return $globs;
223
    }
224
}