Completed
Push — master ( 2a17e7...e610ee )
by Arne
04:45
created

AmberjackVaultLayout::writeFileIndexObject()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 11
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 11
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 6
nc 1
nop 1
1
<?php
2
3
namespace Storeman\VaultLayout\Amberjack;
4
5
use Ramsey\Uuid\Uuid;
6
use Storeman\Exception;
7
use Storeman\FileReader;
8
use Storeman\Index\Index;
9
use Storeman\Index\IndexObject;
10
use Storeman\StorageAdapter\StorageAdapterInterface;
11
use Storeman\Synchronization;
12
use Storeman\SynchronizationList;
13
use Storeman\Config\VaultConfiguration;
14
use Storeman\VaultLayout\LazyLoadedIndex;
15
use Storeman\VaultLayout\VaultLayoutInterface;
16
17
class AmberjackVaultLayout implements VaultLayoutInterface
18
{
19
    protected const SYNCHRONIZATION_LIST_FILE_NAME = 'sync.log';
20
21
    /**
22
     * @var StorageAdapterInterface
23
     */
24
    protected $storageAdapter;
25
26
    /**
27
     * @var VaultConfiguration
28
     */
29
    protected $vaultConfiguration;
30
31
    public function __construct(StorageAdapterInterface $storageAdapter, VaultConfiguration $vaultConfiguration)
32
    {
33
        $this->storageAdapter = $storageAdapter;
34
        $this->vaultConfiguration = $vaultConfiguration;
35
    }
36
37
    /**
38
     * {@inheritdoc}
39
     */
40
    public function getSynchronizations(): SynchronizationList
41
    {
42
        if ($this->storageAdapter->exists(static::SYNCHRONIZATION_LIST_FILE_NAME))
43
        {
44
            $stream = $this->storageAdapter->getReadStream(static::SYNCHRONIZATION_LIST_FILE_NAME);
45
46
            stream_filter_append($stream, 'zlib.inflate', STREAM_FILTER_READ);
47
48
            $list = new SynchronizationList();
49
50
            while (is_array($row = fgetcsv($stream)))
51
            {
52
                $synchronization = Synchronization::fromScalarArray($row);
53
                $synchronization->setIndex(new LazyLoadedIndex(function() use ($synchronization) {
54
55
                    return $this->readIndex($synchronization);
56
                }));
57
58
                $list->addSynchronization($synchronization);
59
            }
60
61
            if (!feof($stream))
62
            {
63
                throw new Exception("Corrupt synchronization list detected");
64
            }
65
66
            fclose($stream);
67
68
            return $list;
69
        }
70
71
        return new SynchronizationList();
72
    }
73
74
    /**
75
     * {@inheritdoc}
76
     */
77
    public function getLastSynchronization(): ?Synchronization
78
    {
79
        return $this->getSynchronizations()->getLastSynchronization();
80
    }
81
82
    /**
83
     * {@inheritdoc}
84
     */
85
    public function getSynchronization(int $revision): Synchronization
86
    {
87
        return $this->getSynchronizations()->getSynchronization($revision);
88
    }
89
90
    /**
91
     * {@inheritdoc}
92
     */
93
    public function readBlob(string $blobId)
94
    {
95
        $stream = $this->storageAdapter->getReadStream($blobId);
96
97
        return $stream;
98
    }
99
100
    /**
101
     * {@inheritdoc}
102
     */
103
    public function writeSynchronization(Synchronization $synchronization, FileReader $fileReader)
104
    {
105
        foreach ($synchronization->getIndex() as $indexObject)
106
        {
107
            /** @var IndexObject $indexObject */
108
109
            if ($indexObject->isFile() && $indexObject->getBlobId() === null)
110
            {
111
                $indexObject->setBlobId($this->generateNewBlobId($synchronization->getIndex()));
112
113
                $this->storageAdapter->writeStream($indexObject->getBlobId(), $fileReader->getReadStream($indexObject));
114
            }
115
        }
116
117
        $this->writeIndex($synchronization);
118
119
        $synchronizationList = $this->getSynchronizations();
120
        $synchronizationList->addSynchronization($synchronization);
121
122
        $this->writeSynchronizationList($synchronizationList);
123
    }
124
125
    protected function readIndex(Synchronization $synchronization): Index
126
    {
127
        $stream = $this->storageAdapter->getReadStream($this->getIndexFileName($synchronization));
128
129
        stream_filter_append($stream, 'zlib.inflate');
130
131
        $index = new Index();
132
        while (($row = fgetcsv($stream)) !== false)
133
        {
134
            $index->addObject(IndexObject::fromScalarArray($row));
135
        }
136
137
        fclose($stream);
138
139
        return $index;
140
    }
141
142
    protected function writeIndex(Synchronization $synchronization)
143
    {
144
        // write to local temp file
145
        $stream = tmpfile();
146
        $filterHandle = stream_filter_append($stream, 'zlib.deflate', STREAM_FILTER_WRITE);
147
        foreach ($synchronization->getIndex() as $object)
148
        {
149
            /** @var IndexObject $object */
150
151
            if (fputcsv($stream, $object->toScalarArray()) === false)
152
            {
153
                throw new \RuntimeException();
154
            }
155
        }
156
        stream_filter_remove($filterHandle);
157
        rewind($stream);
158
159
        // upload local file to vault
160
        $this->storageAdapter->writeStream($this->getIndexFileName($synchronization), $stream);
161
162
        fclose($stream);
163
    }
164
165
    protected function writeSynchronizationList(SynchronizationList $synchronizationList)
166
    {
167
        // write to local temp file
168
        $stream = tmpfile();
169
        $filterHandle = stream_filter_append($stream, 'zlib.deflate', STREAM_FILTER_WRITE);
170
        foreach ($synchronizationList as $synchronization)
171
        {
172
            /** @var Synchronization $synchronization */
173
174
            if (fputcsv($stream, $synchronization->toScalarArray()) === false)
175
            {
176
                throw new \RuntimeException();
177
            }
178
        }
179
        stream_filter_remove($filterHandle);
180
        rewind($stream);
181
182
183
        // upload local file to vault
184
        $this->storageAdapter->writeStream(static::SYNCHRONIZATION_LIST_FILE_NAME, $stream);
185
186
        fclose($stream);
187
    }
188
189
    protected function generateNewBlobId(Index $index = null): string
190
    {
191
        do
192
        {
193
            $blobId = Uuid::uuid4()->toString();
194
        }
195
        while (($index && $index->getObjectByBlobId($blobId)) || $this->storageAdapter->exists($blobId));
196
197
        return $blobId;
198
    }
199
200
    protected function getIndexFileName(Synchronization $synchronization): string
201
    {
202
        return "index_{$synchronization->getRevision()}";
203
    }
204
}
205