Completed
Push — bugfix/clear-entity-manager-af... ( 88025d )
by A.
04:10
created

BufferedEventBus::publish()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 6
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 6
rs 9.4285
cc 2
eloc 3
nc 2
nop 1
1
<?php
2
3
/**
4
 * Copyright 2014 SURFnet bv
5
 *
6
 * Licensed under the Apache License, Version 2.0 (the "License");
7
 * you may not use this file except in compliance with the License.
8
 * You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing, software
13
 * distributed under the License is distributed on an "AS IS" BASIS,
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
 * See the License for the specific language governing permissions and
16
 * limitations under the License.
17
 */
18
19
namespace Surfnet\StepupMiddleware\CommandHandlingBundle\EventHandling;
20
21
use Broadway\Domain\DomainEventStreamInterface;
22
use Broadway\Domain\DomainMessage;
23
use Broadway\EventHandling\EventBusInterface;
24
use Broadway\EventHandling\EventListenerInterface;
25
use Doctrine\ORM\EntityManagerInterface;
26
use Exception;
27
28
class BufferedEventBus implements EventBusInterface
29
{
30
    /**
31
     * @var EventListenerInterface[]
32
     */
33
    private $eventListeners = [];
34
35
    /**
36
     * @var DomainMessage[]
37
     */
38
    private $buffer = [];
39
40
    /**
41
     * Flag to ensure only one loop is publishing domain messages from the buffer.
42
     *
43
     * @var bool
44
     */
45
    private $isFlushing = false;
46
47
    /**
48
     * @var EntityManagerInterface
49
     */
50
    private $entityManager;
51
52
    public function __construct(EntityManagerInterface $entityManager)
53
    {
54
        $this->entityManager = $entityManager;
55
    }
56
57
    public function subscribe(EventListenerInterface $eventListener)
58
    {
59
        $this->eventListeners[] = $eventListener;
60
    }
61
62
    public function publish(DomainEventStreamInterface $domainMessages)
63
    {
64
        foreach ($domainMessages as $domainMessage) {
65
            $this->buffer[] = $domainMessage;
66
        }
67
    }
68
69
    /**
70
     * Flushes the buffered domain messages to all event listeners.
71
     */
72
    public function flush()
73
    {
74
        if ($this->isFlushing) {
75
            // If already flushing, we're in a nested pipeline. This means that an event that is currently being
76
            // handled, triggered a command. This command caused events, which are collected in the buffer.
77
            // These events may only be flushed when all current events have been handled.
78
            // Therefore, we return here and check if there are events in the buffer after handling all current events.
79
            return;
80
        }
81
82
        $this->isFlushing = true;
83
84
        // swap the buffer so we can still publish new events, during or after flush
85
        $buffer = $this->buffer;
86
        $this->buffer = [];
87
88
        try {
89
            while ($domainMessage = array_shift($buffer)) {
90
                foreach ($this->eventListeners as $eventListener) {
91
                    $eventListener->handle($domainMessage);
92
                }
93
94
                // After handling an event, clear the entity manager to prevent collisions in Doctrine's object tracking
95
                // This comes with a caveat: event listeners cannot hold references to certain entities between events
96
                $this->entityManager->clear();
97
            }
98
        } catch (Exception $e) {
99
            $this->isFlushing = false;
100
101
            array_splice($this->buffer, 0, 0, $buffer);
102
103
            throw $e;
104
        }
105
106
        $this->isFlushing = false;
107
        unset($buffer);
108
109
        // if during the handling of events new events have been queued, we need to flush them
110
        if (!empty($this->buffer)) {
111
            $this->flush();
112
        }
113
    }
114
}
115