Completed
Push — master ( b2e536...a4e6dd )
by Arne
02:05
created

AmberjackVaultLayout::writeSynchronization()   A

Complexity

Conditions 4
Paths 3

Size

Total Lines 21
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 21
rs 9.0534
c 0
b 0
f 0
cc 4
eloc 9
nc 3
nop 1
1
<?php
2
3
namespace Storeman\VaultLayout\Amberjack;
4
5
use Ramsey\Uuid\Uuid;
6
use Storeman\Index;
7
use Storeman\IndexObject;
8
use Storeman\StorageAdapter\StorageAdapterInterface;
9
use Storeman\Synchronization;
10
use Storeman\SynchronizationList;
11
use Storeman\VaultConfiguration;
12
use Storeman\VaultLayout\LazyLoadedIndex;
13
use Storeman\VaultLayout\VaultLayoutInterface;
14
15
class AmberjackVaultLayout implements VaultLayoutInterface
16
{
17
    protected const SYNCHRONIZATION_LIST_FILE_NAME = 'sync.log';
18
19
    /**
20
     * @var StorageAdapterInterface
21
     */
22
    protected $storageAdapter;
23
24
    /**
25
     * @var VaultConfiguration
26
     */
27
    protected $vaultConfiguration;
28
29
    public function __construct(StorageAdapterInterface $storageAdapter, VaultConfiguration $vaultConfiguration)
30
    {
31
        $this->storageAdapter = $storageAdapter;
32
        $this->vaultConfiguration = $vaultConfiguration;
33
    }
34
35
    /**
36
     * {@inheritdoc}
37
     */
38
    public function getSynchronizations(): SynchronizationList
39
    {
40
        if ($this->storageAdapter->exists(static::SYNCHRONIZATION_LIST_FILE_NAME))
41
        {
42
            $stream = $this->storageAdapter->getReadStream(static::SYNCHRONIZATION_LIST_FILE_NAME);
43
44
            stream_filter_append($stream, 'zlib.inflate', STREAM_FILTER_READ);
45
46
            $list = new SynchronizationList();
47
48
            while (($row = fgetcsv($stream)) !== false)
49
            {
50
                $synchronization = Synchronization::fromScalarArray($row);
51
                $synchronization->setIndex(new LazyLoadedIndex(function() use ($synchronization) {
52
53
                    return $this->readIndex($synchronization);
54
                }));
55
56
                $list->addSynchronization($synchronization);
57
            }
58
59
            fclose($stream);
60
61
            return $list;
62
        }
63
64
        return new SynchronizationList();
65
    }
66
67
    /**
68
     * {@inheritdoc}
69
     */
70
    public function getLastSynchronization(): ?Synchronization
71
    {
72
        return $this->getSynchronizations()->getLastSynchronization();
73
    }
74
75
    /**
76
     * {@inheritdoc}
77
     */
78
    public function getSynchronization(int $revision): Synchronization
79
    {
80
        return $this->getSynchronizations()->getSynchronization($revision);
81
    }
82
83
    /**
84
     * {@inheritdoc}
85
     */
86
    public function readBlob(string $blobId)
87
    {
88
        $stream = $this->storageAdapter->getReadStream($blobId);
89
90
        return $stream;
91
    }
92
93
    /**
94
     * {@inheritdoc}
95
     */
96
    public function writeSynchronization(Synchronization $synchronization)
97
    {
98
        foreach ($synchronization->getIndex() as $indexObject)
99
        {
100
            /** @var IndexObject $indexObject */
101
102
            if ($indexObject->isFile() && $indexObject->getBlobId() === null)
103
            {
104
                $indexObject->setBlobId($this->generateNewBlobId($synchronization->getIndex()));
105
106
                $this->writeFileIndexObject($indexObject);
107
            }
108
        }
109
110
        $this->writeIndex($synchronization);
111
112
        $synchronizationList = $this->getSynchronizations();
113
        $synchronizationList->addSynchronization($synchronization);
114
115
        $this->writeSynchronizationList($synchronizationList);
116
    }
117
118
    protected function readIndex(Synchronization $synchronization): Index
119
    {
120
        $stream = $this->storageAdapter->getReadStream($this->getIndexFileName($synchronization));
121
122
        stream_filter_append($stream, 'zlib.inflate');
123
124
        $index = new Index();
125
        while (($row = fgetcsv($stream)) !== false)
126
        {
127
            $index->addObject(IndexObject::fromScalarArray($row));
128
        }
129
130
        fclose($stream);
131
132
        return $index;
133
    }
134
135
    protected function writeIndex(Synchronization $synchronization)
136
    {
137
        // write to local temp file
138
        $tempPath = tempnam(sys_get_temp_dir(), 'index');
139
        $stream = fopen($tempPath, 'w+b');
140
        foreach ($synchronization->getIndex() as $object)
141
        {
142
            /** @var IndexObject $object */
143
144
            if (fputcsv($stream, $object->toScalarArray()) === false)
145
            {
146
                throw new \RuntimeException();
147
            }
148
        }
149
        rewind($stream);
150
151
        // upload local file to vault
152
        stream_filter_append($stream, 'zlib.deflate');
153
        $this->storageAdapter->writeStream($this->getIndexFileName($synchronization), $stream);
154
155
        fclose($stream);
156
    }
157
158
    protected function writeFileIndexObject(IndexObject $indexObject)
159
    {
160
        assert($indexObject->isFile());
161
        assert($indexObject->getBlobId() !== null);
162
163
        $localPath = $this->vaultConfiguration->getConfiguration()->getPath() . $indexObject->getRelativePath();
164
165
        $stream = fopen($localPath, 'rb');
166
167
        $this->storageAdapter->writeStream($indexObject->getBlobId(), $stream);
168
    }
169
170
    protected function writeSynchronizationList(SynchronizationList $synchronizationList)
171
    {
172
        // write to local temp file
173
        $tempPath = tempnam(sys_get_temp_dir(), 'synchronizationList');
174
        $stream = fopen($tempPath, 'w+b');
175
        foreach ($synchronizationList as $synchronization)
176
        {
177
            /** @var Synchronization $synchronization */
178
179
            if (fputcsv($stream, $synchronization->toScalarArray()) === false)
180
            {
181
                throw new \RuntimeException();
182
            }
183
        }
184
        rewind($stream);
185
186
        // upload local file to vault
187
        stream_filter_append($stream, 'zlib.deflate');
188
        $this->storageAdapter->writeStream(static::SYNCHRONIZATION_LIST_FILE_NAME, $stream);
189
190
        fclose($stream);
191
    }
192
193
    protected function generateNewBlobId(Index $index = null): string
194
    {
195
        do
196
        {
197
            $blobId = Uuid::uuid4()->toString();
198
        }
199
        while (($index && $index->getObjectByBlobId($blobId)) || $this->storageAdapter->exists($blobId));
200
201
        return $blobId;
202
    }
203
204
    protected function getIndexFileName(Synchronization $synchronization): string
205
    {
206
        return "index_{$synchronization->getRevision()}";
207
    }
208
}
209