Issues (6)

src/Stream/Stream.php (1 issue)

Severity
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Blixit\EventSourcing\Stream;
6
7
use Blixit\EventSourcing\Event\EventAccessor;
8
use Blixit\EventSourcing\Event\EventInterface;
9
use Countable;
10
use SplQueue;
11
use function call_user_func;
12
13
class Stream implements Countable
14
{
15
    /** @var StreamName $streamName */
16
    private $streamName;
17
18
    /** @var SplQueue $queue */
19
    private $queue;
20
21
    /** @var int $lastEnqueuedSequenceNumber */
22
    private $lastEnqueuedSequenceNumber;
23
24
    /** @var callable $beforeEnqueue */
25
    private $beforeEnqueue;
26
27
    /**
28
     * Event accessor doesnt require to create many instances if
29
     * many instances of stream are created. The same accessor will be used.
30
     *
31
     * @var EventAccessor $eventAccessor
32
     */
33
    private static $eventAccessor;
34
35
    /**
36
     * @param EventInterface[] $events
37
     */
38
    public function __construct(
39
        StreamName $streamName,
40
        ?array $events = [],
41
        ?callable $beforeEnqueue = null
42
    ) {
43
        if (empty(self::$eventAccessor)) {
44
            // only one item of the event accessor should be instantiated
45
            self::$eventAccessor = EventAccessor::getInstance();
46
        }
47
48
        if (empty($beforeEnqueue)) {
49
            $beforeEnqueue = static function (EventInterface $event) : void {}; // phpcs:ignore
0 ignored issues
show
The parameter $event is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

49
            $beforeEnqueue = static function (/** @scrutinizer ignore-unused */ EventInterface $event) : void {}; // phpcs:ignore

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
50
        }
51
52
        $this->queue         = new SplQueue();
53
        $this->streamName    = $streamName;
54
        $this->beforeEnqueue = $beforeEnqueue;
55
56
        foreach ($events as $event) {
57
            $this->enqueue($event);
58
        }
59
    }
60
61
    public function getStreamName() : ?StreamName
62
    {
63
        return $this->streamName;
64
    }
65
66
    public function setStreamName(StreamName $streamName) : void
67
    {
68
        $this->streamName = $streamName;
69
    }
70
71
    public function dequeue() : EventInterface
72
    {
73
        return $this->queue->dequeue();
74
    }
75
76
    /**
77
     * @throws StreamNotOrderedFailure
78
     */
79
    public function enqueue(EventInterface $event) : void
80
    {
81
        call_user_func($this->beforeEnqueue, $event);
82
        self::$eventAccessor->setStreamName($event, (string) $this->streamName);
83
84
        if (! isset($this->lastEnqueuedSequenceNumber)) {
85
            $this->lastEnqueuedSequenceNumber = $event->getSequence();
86
        }
87
88
        // check sequence order integrity
89
        if ($event->getSequence() < $this->lastEnqueuedSequenceNumber) {
90
            throw new StreamNotOrderedFailure($this->streamName);
91
        }
92
93
        $this->queue->enqueue($event);
94
    }
95
96
    /**
97
     * Count elements of an object
98
     */
99
    public function count() : int
100
    {
101
        return $this->queue->count();
102
    }
103
104
    public function getIterator() : SplQueue
105
    {
106
        return $this->queue;
107
    }
108
}
109