Completed
Push — master ( 76b8a7...47120d )
by Miguel
20s
created

Subscriber::batch()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 6
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 1
1
<?php
2
namespace Cmp\Queues\Domain\Event;
3
4
use Cmp\Queues\Domain\Event\Exception\DomainEventException;
5
use Cmp\Queues\Domain\Queue\Exception\GracefulStopException;
6
use Cmp\Queues\Domain\Queue\Exception\TimeoutReaderException;
7
use Cmp\Queues\Domain\Queue\QueueReader;
8
use Psr\Log\LoggerInterface;
9
10
class Subscriber
11
{
12
    /**
13
     * @var QueueReader
14
     */
15
    protected $queueReader;
16
17
    /**
18
     * @var LoggerInterface
19
     */
20
    protected $logger;
21
22
    /**
23
     * @var EventSubscriptor[]
24
     */
25
    protected $subscriptors = [];
26
27
    /**
28
     * Subscriber constructor.
29
     * @param QueueReader $queueReader
30
     * @param LoggerInterface $logger
31
     */
32
    public function __construct(QueueReader $queueReader, LoggerInterface $logger)
33
    {
34
        $this->queueReader = $queueReader;
35
        $this->logger = $logger;
36
    }
37
38
    /**
39
     * @param EventSubscriptor $eventSubscriptor
40
     * @return $this
41
     */
42
    public function subscribe(EventSubscriptor $eventSubscriptor)
43
    {
44
        $this->subscriptors[] = $eventSubscriptor;
45
        return $this;
46
    }
47
48
    /**
49
     * @param int $timeout
50
     * @throws DomainEventException
51
     * @throws \Cmp\Queues\Domain\Queue\Exception\ReaderException
52
     */
53
    public function start($timeout=0)
54
    {
55
        $this->checkHasSubscriptors();
56
57
        while(true) {
58
            try {
59
                $this->queueReader->read(array($this, 'notify'), $timeout);
60
            } catch(TimeoutReaderException $e) {
61
                break;
62
            } catch(GracefulStopException $e) {
63
                break;
64
            }
65
        }
66
    }
67
68
    /**
69
     * @param int $timeout
70
     * @throws DomainEventException
71
     * @throws \Cmp\Queues\Domain\Queue\Exception\ReaderException
72
     */
73
    public function batch($timeout=0)
74
    {
75
        $this->checkHasSubscriptors();
76
77
        $this->queueReader->read(array($this, 'notify'), $timeout);
78
    }
79
80
    /**
81
     * @param DomainEvent $domainEvent
82
     */
83
    public function notify(DomainEvent $domainEvent)
84
    {
85
        $this->logger->debug('Domain Event received, notifying subscribers');
86
        foreach($this->subscriptors as $subscriptor) {
87
            if($subscriptor->isSubscribed($domainEvent)) {
88
                $subscriptor->notify($domainEvent);
89
            }
90
        }
91
    }
92
93
    /**
94
     * @return EventSubscriptor[]
95
     */
96
    public function getSubscriptors()
97
    {
98
        return $this->subscriptors;
99
    }
100
101
    /**
102
     * @throws DomainEventException
103
     */
104
    private function checkHasSubscriptors()
105
    {
106
        if(empty($this->getSubscriptors())) {
107
            throw new DomainEventException('You must add at least 1 EventSubscriptor in order to publish start reading from queue.');
108
        }
109
    }
110
}