Completed
Push — master ( 7d5317...877556 )
by Arne
02:32
created

FileReader::getReadStream()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 19

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 19
rs 9.6333
c 0
b 0
f 0
cc 2
nc 2
nop 1
1
<?php
2
3
namespace Storeman;
4
5
use Clue\StreamFilter;
6
use Psr\Log\LoggerAwareInterface;
7
use Psr\Log\LoggerAwareTrait;
8
use Psr\Log\NullLogger;
9
use Storeman\Config\Configuration;
10
use Storeman\Hash\AggregateHashAlgorithm;
11
use Storeman\Hash\Algorithm\HashAlgorithmInterface;
12
use Storeman\Index\IndexObject;
13
14
/**
15
 * Provides read streams for index objects.
16
 */
17
class FileReader implements LoggerAwareInterface
18
{
19
    use LoggerAwareTrait;
20
21
    /**
22
     * @var Configuration
23
     */
24
    protected $configuration;
25
26
    /**
27
     * @var HashAlgorithmInterface[]
28
     */
29
    protected $hashFunctions;
30
31
    public function __construct(Configuration $configuration, array $hashFunctions)
32
    {
33
        $this->configuration = $configuration;
34
        $this->hashFunctions = $hashFunctions;
35
        $this->logger = new NullLogger();
36
    }
37
38
    /**
39
     * Returns a readable stream for the given file index object.
40
     *
41
     * @param IndexObject $indexObject
42
     * @return resource
43
     */
44
    public function getReadStream(IndexObject $indexObject)
45
    {
46
        assert($indexObject->isFile());
47
48
        $absolutePath = "{$this->configuration->getPath()}/{$indexObject->getRelativePath()}";
49
50
        $this->logger->debug("Setting up read-stream for {$absolutePath}");
51
52
        $stream = fopen($absolutePath, 'rb');
53
54
        if ($stream === false)
55
        {
56
            throw new Exception("fopen() failed for '{$absolutePath}'");
57
        }
58
59
        $this->setupStreamHashing($stream, $indexObject);
60
61
        return $stream;
62
    }
63
64
    /**
65
     * Sets up transparent file hashing.
66
     *
67
     * @param resource $stream
68
     * @param IndexObject $indexObject
69
     */
70
    protected function setupStreamHashing($stream, IndexObject $indexObject)
71
    {
72
        $knownHashes = iterator_to_array($indexObject->getHashes());
73
        $configuredHashes = $this->configuration->getFileChecksums();
74
75
        if ($missingHashes = array_diff_key($configuredHashes, $knownHashes))
76
        {
77
            $this->logger->debug("Setting up stream hashing for missing hashes: " . implode(',', $missingHashes));
78
79
            $aggregateHashAlgorithm = new AggregateHashAlgorithm(array_intersect_key($this->hashFunctions, array_flip($missingHashes)));
80
            $aggregateHashAlgorithm->initialize();
81
82
            StreamFilter\prepend($stream, function(string $chunk = null) use ($aggregateHashAlgorithm, $indexObject) {
83
84
                // eof
85
                if ($chunk === null)
86
                {
87
                    $hashes = $aggregateHashAlgorithm->finalize();
88
89
                    foreach ($hashes as $algorithm => $hash)
90
                    {
91
                        $indexObject->getHashes()->addHash($algorithm, $hash);
92
                    }
93
                }
94
95
                // digest chunk
96
                else
97
                {
98
                    $aggregateHashAlgorithm->digest($chunk);
99
                }
100
101
                return $chunk;
102
103
            }, STREAM_FILTER_READ);
104
        }
105
    }
106
}
107