Passed
Push — master ( eb17ef...8c52a0 )
by Anton
01:52
created

Transaction::getIndexes()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 10
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 5
nc 2
nop 1
dl 0
loc 10
rs 10
c 0
b 0
f 0
1
<?php
2
/**
3
 * Cycle DataMapper ORM
4
 *
5
 * @license   MIT
6
 * @author    Anton Titov (Wolfy-J)
7
 */
8
declare(strict_types=1);
9
10
namespace Cycle\ORM;
11
12
use Cycle\ORM\Command\CommandInterface;
13
use Cycle\ORM\Exception\TransactionException;
14
use Cycle\ORM\Heap\Node;
15
use Cycle\ORM\Transaction\Runner;
16
use Cycle\ORM\Transaction\RunnerInterface;
17
18
/**
19
 * Transaction provides ability to define set of entities to be stored or deleted within one transaction. Transaction
20
 * can operate as UnitOfWork. Multiple transactions can co-exists in one application.
21
 *
22
 * Internally, upon "run", transaction will request mappers to generate graph of linked commands to create, update or
23
 * delete entities.
24
 */
25
final class Transaction implements TransactionInterface
26
{
27
    /** @var ORMInterface */
28
    private $orm;
29
30
    /** @var \SplObjectStorage */
31
    private $known;
32
33
    /** @var array */
34
    private $persist = [];
35
36
    /** @var array */
37
    private $delete = [];
38
39
    /** @var RunnerInterface */
40
    private $runner;
41
42
    /** @var array */
43
    private $indexes = [];
44
45
    /**
46
     * @param ORMInterface         $orm
47
     * @param RunnerInterface|null $runner
48
     */
49
    public function __construct(ORMInterface $orm, RunnerInterface $runner = null)
50
    {
51
        $this->orm = $orm;
52
        $this->known = new \SplObjectStorage();
53
        $this->runner = $runner ?? new Runner();
54
    }
55
56
    /**
57
     * {@inheritdoc}
58
     */
59
    public function persist($entity, int $mode = self::MODE_CASCADE): self
60
    {
61
        if ($this->known->offsetExists($entity)) {
62
            return $this;
63
        }
64
65
        $this->known->offsetSet($entity, true);
66
        $this->persist[] = [$entity, $mode];
67
68
        return $this;
69
    }
70
71
    /**
72
     * {@inheritdoc}
73
     */
74
    public function delete($entity, int $mode = self::MODE_CASCADE): self
75
    {
76
        if ($this->known->offsetExists($entity)) {
77
            return $this;
78
        }
79
80
        $this->known->offsetSet($entity, true);
81
        $this->delete[] = [$entity, $mode];
82
83
        return $this;
84
    }
85
86
    /**
87
     * {@inheritdoc}
88
     */
89
    public function run()
90
    {
91
        try {
92
            $commands = $this->initCommands();
93
94
            while ($commands !== []) {
95
                $pending = [];
96
                $lastExecuted = count($this->runner);
97
98
                foreach ($this->sort($commands) as $wait => $do) {
99
                    if ($wait !== null) {
100
                        if (!in_array($wait, $pending, true)) {
101
                            $pending[] = $wait;
102
                        }
103
104
                        continue;
105
                    }
106
107
                    $this->runner->run($do);
108
                }
109
110
                if (count($this->runner) === $lastExecuted && !empty($pending)) {
111
                    throw new TransactionException("Unable to complete: " . $this->listCommands($pending));
112
                }
113
114
                $commands = $pending;
115
            }
116
        } catch (\Throwable $e) {
117
            $this->runner->rollback();
118
119
            // no calculations must be kept in node states, resetting
120
            // this will keep entity data as it was before transaction run
121
            $this->resetHeap();
122
123
            throw $e;
124
        } finally {
125
            if (!isset($e)) {
126
                // we are ready to commit all changes to our representation layer
127
                $this->syncHeap();
128
            }
129
        }
130
131
        $this->runner->complete();
132
133
        // resetting the scope
134
        $this->persist = $this->delete = [];
135
        $this->known = new \SplObjectStorage();
136
    }
137
138
    /**
139
     * Sync all entity states with generated changes.
140
     */
141
    protected function syncHeap()
142
    {
143
        $heap = $this->orm->getHeap();
144
        foreach ($heap as $e) {
145
            $node = $heap->get($e);
146
147
            // marked as being deleted and has no external claims (GC like approach)
148
            if ($node->getStatus() == Node::SCHEDULED_DELETE && !$node->getState()->hasClaims()) {
149
                $heap->detach($e);
150
                continue;
151
            }
152
153
            // sync the current entity data with newly generated data
154
            $this->orm->getMapper($node->getRole())->hydrate($e, $node->syncState());
155
156
            // reindex the entity
157
            $heap->attach($e, $node, $this->getIndexes($node->getRole()));
0 ignored issues
show
Bug introduced by
It seems like $node can also be of type null; however, parameter $node of Cycle\ORM\Heap\HeapInterface::attach() does only seem to accept Cycle\ORM\Heap\Node, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

157
            $heap->attach($e, /** @scrutinizer ignore-type */ $node, $this->getIndexes($node->getRole()));
Loading history...
158
        }
159
    }
160
161
    /**
162
     * Reset heap to it's initial state and remove all the changes.
163
     */
164
    protected function resetHeap()
165
    {
166
        $heap = $this->orm->getHeap();
167
        foreach ($heap as $e) {
168
            $heap->get($e)->resetState();
169
        }
170
    }
171
172
    /**
173
     * Return flattened list of commands required to store and delete associated entities.
174
     *
175
     * @return array
176
     */
177
    protected function initCommands(): array
178
    {
179
        $commands = [];
180
        foreach ($this->persist as $pair) {
181
            $commands[] = $this->orm->queueStore($pair[0], $pair[1]);
182
        }
183
184
        // other commands?
185
186
        foreach ($this->delete as $pair) {
187
            $commands[] = $this->orm->queueDelete($pair[0], $pair[1]);
188
        }
189
190
        return $commands;
191
    }
192
193
    /**
194
     * Fetch commands which are ready for the execution. Provide ready commands
195
     * as generated value and delayed commands as the key.
196
     *
197
     * @param iterable $commands
198
     * @return \Generator
199
     */
200
    protected function sort(iterable $commands): \Generator
201
    {
202
        /** @var CommandInterface $command */
203
        foreach ($commands as $command) {
204
            if (!$command->isReady()) {
205
                // command or command branch is not ready
206
                yield $command => null;
207
                continue;
208
            }
209
210
            if ($command instanceof \Traversable) {
211
                // deepening (cut-off on first not-ready level)
212
                yield from $this->sort($command);
213
                continue;
214
            }
215
216
            yield null => $command;
217
        }
218
    }
219
220
    /**
221
     * @param array $commands
222
     * @return string
223
     */
224
    private function listCommands(array $commands): string
225
    {
226
        $errors = [];
227
        foreach ($commands as $command) {
228
            // i miss you Go
229
            if (method_exists($command, '__toError')) {
230
                $errors[] = $command->__toError();
231
            } else {
232
                $errors[] = get_class($command);
233
            }
234
        }
235
236
        return join(', ', $errors);
237
    }
238
239
    /**
240
     * Indexable node fields.
241
     *
242
     * @param string $role
243
     * @return array
244
     */
245
    private function getIndexes(string $role): array
246
    {
247
        if (isset($this->indexes[$role])) {
248
            return $this->indexes[$role];
249
        }
250
251
        $pk = $this->orm->getSchema()->define($role, Schema::PRIMARY_KEY);
252
        $keys = $this->orm->getSchema()->define($role, Schema::FIND_BY_KEYS) ?? [];
253
254
        return $this->indexes[$role] = array_merge([$pk], $keys);
255
    }
256
}